Spring boot with Apache Kafka
时间:2022-05-03
本文章向大家介绍Spring boot with Apache Kafka,主要内容包括5.21. Spring boot with Apache Kafka、5.21.2. maven、5.21.3. Spring boot Application、5.21.4. EnableKafka、5.21.5. KafkaListener、5.21.6. 测试、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
本文节选自电子书《Netkiller Java 手札》
5.21. Spring boot with Apache Kafka
Spring boot 1.5.1
5.21.1. 安装 kafka
一下安装仅仅适合开发环境,生产环境请使用这个脚本安装 https://github.com/oscm/shell/tree/master/mq/kafka
cd /usr/local/src
wget http://apache.communilink.net/kafka/0.10.2.0/kafka_2.12-0.10.2.0.tgz
tar zxvf kafka_2.12-0.10.2.0.tgz
mv kafka_2.12-0.10.2.0 /srv/
cp /srv/kafka_2.12-0.10.2.0/config/server.properties{,.original}
echo "advertised.host.name=localhost" >> /srv/kafka_2.12-0.10.2.0/config/server.properties
ln -s /srv/kafka_2.12-0.10.2.0 /srv/kafka
启动 Kafka 服务
/srv/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh /srv/kafka/config/server.properties
-daemon 表示守护进程方式在后台启动
/srv/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/srv/kafka/bin/kafka-server-start.sh -daemon /srv/kafka/config/server.properties
停止 Kafka 服务
/srv/kafka/bin/kafka-server-stop.sh
/srv/kafka/bin/zookeeper-server-stop.sh
5.21.2. maven
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
5.21.3. Spring boot Application
package cn.netkiller;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableAutoConfiguration
@ComponentScan
@EnableScheduling
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
5.21.4. EnableKafka
package cn.netkiller.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
public KafkaConsumerConfig() {
// TODO Auto-generated constructor stub
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory());
// factory.setConcurrency(1);
// factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<String, Object>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "www.netkiller.cn:9092");
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
5.21.5. KafkaListener
package cn.netkiller.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
import java.util.logging.Logger;
public class Listener {
public Listener() {
// TODO Auto-generated constructor stub
}
protected Logger logger = Logger.getLogger(Listener.class.getName());
public CountDownLatch getCountDownLatch1() {
return countDownLatch1;
}
private CountDownLatch countDownLatch1 = new CountDownLatch(1);
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
logger.info("Received message: " + record.toString());
System.out.println("Received message: " + record);
countDownLatch1.countDown();
}
}
5.21.6. 测试
$ cd /srv/kafka
$ bin/kafka-console-producer.sh --broker-list 47.89.35.55:9092 --topic test
This is test message.
每输入一行回车后发送到你的Spring boot kafka 程序
- 13.5 常用sql语句
- React基础之JSX语法
- 13.4 mysql用户管理
- Linux基础(day54)
- 13.3 mysql常用命令
- 13.2 连接mysql
- JDK容器学习之List: CopyOnWriteArrayList,ArrayList,LinkedList对比
- 13.1 设置更改root密码
- 12.23 open_basedir
- JDK容器学习之CopyOnWriteArrayList:线程安全保障机制
- JDK容器学习之LinkedList:底层存储&读写逻辑
- Linux基础(day53)
- 15.4 xshell使用xftp传输文件
- JDK容器学习之ArrayList:底层存储和动态扩容
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 遇到mpi worker exited on signal 9
- 容器共享GPU时查看容器使用的GPU编号
- oci runtime error: exec failed: container_linux.go:247: starting container process caused “exec: “/
- R|UpSet-集合可视化
- 美国队长的盾(一) 同心圆
- R|clusterProfiler-富集分析
- R|fastqcr QC数据处理
- R|timeROC-分析
- R|ML_code-线性回归(2)
- R|机器学习入门-多元线性回归(3)
- Bioinfo|bedtools-操作VCF文件
- 美国队长的盾(二)五角星
- 绘图系列|R-VennDiagram包绘制韦恩图
- Linux高精度定时器hrtimer使用实例
- 数据分析|R-缺失值处理