Spring Boot 中使用 Kafka
时间:2022-04-28
本文章向大家介绍Spring Boot 中使用 Kafka,主要内容包括准备、测试用例、Github 代码、启用 kafka、消息生产者、消息消费者、参数配置、启动服务、单元测试、遇到一些坑、Contact、基本概念、基础应用、原理机制和需要注意的事项等,并结合实例形式分析了其使用技巧,希望通过本文能帮助到大家理解应用这部分内容。
Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区、多副本、冗余,因此被广泛用于大规模消息数据处理应用。Kafka 支持Java 及多种其它语言客户端,可与Hadoop、Storm、Spark等其它大数据工具结合使用。
准备
测试用例
Github 代码
代码我已放到 Github ,导入spring-boot-kafka
项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-kafka
添加依赖
在项目中添加 kafka-clients
依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
启用 kafka
@Configuration
@EnableKafka
public class KafkaConfiguration {
}
消息生产者
@Component
public class MsgProducer {
private static final Logger log = LoggerFactory.getLogger(MsgProducer.class);
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topicName, String jsonData) {
log.info("向kafka推送数据:[{}]", jsonData);
try {
kafkaTemplate.send(topicName, jsonData);
} catch (Exception e) {
log.error("发送数据出错!!!{}{}", topicName, jsonData);
log.error("发送数据出错=====>", e);
}
//消息发送的监听器,用于回调返回信息
kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
@Override
public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
}
@Override
public void onError(String topic, Integer partition, String key, String value, Exception exception) {
}
@Override
public boolean isInterestedInSuccess() {
log.info("数据发送完毕");
return false;
}
});
}
}
消息消费者
@Component
public class MsgConsumer {
@KafkaListener(topics = {"topic-1","topic-2"})
public void processMessage(String content) {
System.out.println("消息被消费"+content);
}
}
参数配置
application.properties
#kafka
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=YZ-PTEST-APP-HADOOP-02:9092,YZ-PTEST-APP-HADOOP-04:9092
# 指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
# 指定默认消费者group id
spring.kafka.consumer.group-id=myGroup
# 指定默认topic id
spring.kafka.template.default-topic=topic-1
启动服务
@SpringBootApplication
@ComponentScan(value = {"io.ymq.kafka"})
public class Startup {
public static void main(String[] args) {
SpringApplication.run(Startup.class, args);
}
}
单元测试
import io.ymq.kafka.MsgProducer;
import io.ymq.kafka.run.Startup;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 描述: 测试 kafka
*
* @author yanpenglei
* @create 2017-10-16 18:45
**/
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Startup.class)
public class BaseTest {
@Autowired
private MsgProducer msgProducer;
@Test
public void test() throws Exception {
msgProducer.sendMessage("topic-1", "topic--------1");
msgProducer.sendMessage("topic-2", "topic--------2");
}
}
消息生产者,响应
2017-10-17 15:54:44.814 INFO 2960 --- [ main] io.ymq.kafka.MsgProducer : 向kafka推送数据:[topic--------1]
2017-10-17 15:54:44.860 INFO 2960 --- [ main] io.ymq.kafka.MsgProducer : 向kafka推送数据:[topic--------2]
2017-10-17 15:54:44.878 INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer : 数据发送完毕
2017-10-17 15:54:44.878 INFO 2960 --- [ad | producer-1] io.ymq.kafka.MsgProducer : 数据发送完毕
消息消费者,响应
消息被消费topic--------1
消息被消费topic--------2
代码我已放到 Github ,导入spring-boot-kafka
项目
github https://github.com/souyunku/spring-boot-examples/tree/master/spring-boot-kafka
遇到一些坑
[2017-10-16 19:20:08.340] - 14884 严重 [main] --- org.springframework.kafka.support.LoggingProducerListener: Exception thrown when sending a message with key='null' and payload='topic--------2' to topic topic-2:
经调试发现 kafka 连接是用的主机名,所以修改 hosts
C:WindowsSystem32driversetchosts
10.32.32.149 YZ-PTEST-APP-HADOOP-02
10.32.32.154 YZ-PTEST-APP-HADOOP-04
Contact
- 作者:鹏磊
- 出处:http://www.ymq.io
- Email:admin@souyunku.com
- 版权归作者所有,转载请注明出处
- Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享
- 小顶堆Java实现
- Tomcat源码分析一:源码导入
- 如何使用Metasploit对安卓手机进行控制
- 关于MySQL DNS解析探究之二:unauthenticated user
- Thrift Direct Memory OOM问题解决方法
- Mapreduce程序中reduce的Iterable参数迭代出是同一个对象
- 内部威胁那些事儿(二):系统破坏
- 从用户行为去理解内容-item2vec及其应用
- Dubbo与Zookeeper、SpringMVC整合和使用(入门级)
- Websocket HandShake Sec-WebSocket-Accept 生成策略
- 关于JVM CPU资源占用过高的问题排查
- ActiveMQ简单介绍以及安装
- Java Process destroy方法kill进程,返回码测试
- 百度人脸识别API Java调用
- java教程
- Java快速入门
- Java 开发环境配置
- Java基本语法
- Java 对象和类
- Java 基本数据类型
- Java 变量类型
- Java 修饰符
- Java 运算符
- Java 循环结构
- Java 分支结构
- Java Number类
- Java Character类
- Java String类
- Java StringBuffer和StringBuilder类
- Java 数组
- Java 日期时间
- Java 正则表达式
- Java 方法
- Java 流(Stream)、文件(File)和IO
- Java 异常处理
- Java 继承
- Java 重写(Override)与重载(Overload)
- Java 多态
- Java 抽象类
- Java 封装
- Java 接口
- Java 包(package)
- Java 数据结构
- Java 集合框架
- Java 泛型
- Java 序列化
- Java 网络编程
- Java 发送邮件
- Java 多线程编程
- Java Applet基础
- Java 文档注释
- 双指针法:一样的道理,能解决四数之和
- 【python-opencv】轨迹栏作为调色板
- springBoot 线程池异步编程
- MPU6050姿态解算2-欧拉角&旋转矩阵
- 【python-opencv】图像的基本操作
- 3 分钟生成一个单元测试报告,这个样式爱了
- 拼多多的底层逻辑
- 详解ELF重定向原理
- 干货 | 携程基于Quasar协程的NIO实践
- 【python-opencv】图像上的算术运算
- 个人珍藏的80道多线程并发面试题(11-20答案解析)
- 【python-opencv】性能衡量和提升技术
- 【python-opencv】转换颜色空间
- 七夕,当然少不了纯CSS的点缀啦
- PHP中的垃圾回收相关函数