kafka生产者配置遇到的坑
时间:2019-09-18
本文章向大家介绍kafka生产者配置遇到的坑,主要包括kafka生产者配置遇到的坑使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
接入其他系统的kafka集群时,遇到了一下问题:
org.springframework.kafka.support.LoggingProducerListener [76] [http-nio-9050-exec-1]- Exception thrown when sending a message with key='null' and payload='test' to topic lapm_notice: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
起初还是照抄网上给的接入的方法,按部就班的开始写客户端,。。。结果完成时测试执行到了send方法时出现了这种问题,找度娘说时因为你的topic不可用,我去,这个时调别人的集群,我哪里能管到,所以这个问题果断pass,熬了好几天,试了几种不同的方法,终于得以见天日,----版本的问题,我使用的kafka生产者版本是1.0.1,但是人家的集群的版本是0.9.1,我去,差了这么个版本就让我挂了,果断换了之后,发消息成功.
下面是我的配置代码:
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Properties; @Component public class KafkaProducerUtil implements InitializingBean { @Resource private Properties props; @Resource private KafkaConfig kafkaConfig; private KafkaProducer<String, String> producer; public void init() { producer = new KafkaProducer<String, String>(getProps()); } public KafkaProducer<String, String> getProducer() { return producer; } private Properties getProps() { // 服务器ip:端口号,集群用逗号分隔 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getServers()); // key序列化指定类 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // value序列化指定类 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); return props; } @Override public void afterPropertiesSet() throws Exception { init(); } }
导入的maven为:
<!-- kafka客户端引入以支持kafka输出 --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>${kafka.version}</version> <exclusions> <exclusion> <artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
另外,以log4j2的方式配置发消息的方式也是相当实用的(某司给的demo,可惜我的架构不是这种的),但是这种需要你的日志框架是log4j2,代码:
<?xml version="1.0" encoding="UTF-8"?> <Configuration name="defaultConfiguration" status="off" strict="true" monitorInterval="5"> <properties> <!-- 日志输出格式配置 --> <property name="patternlayout">%date{yyyy-MM-dd HH:mm:ss,SSS}|%level|%thread|%class{36}|%M|%msg%xEx|$|$|%n</property> <!-- 测试环境监控预警平台Kafka ip --> <property name="kafkaServers">127.0.0.1:9020</property> </properties> <Appenders> <Console name="Console" target="SYSTEM_OUT"> <PatternLayout pattern="${patternlayout}" /> </Console> <!-- 预警类日志KafkaAppender, lapm_notice为预警类Kafka topic --> <Kafka name="NoticeKafkaAppender" topic="lapm_notice" syncSend="false"> <PatternLayout pattern="${patternlayout}" /> <Property name="bootstrap.servers">${kafkaServers}</Property> </Kafka> <!-- 统计类日志KafkaAppender, lapm_statistics为统计类Kafka topic --> <Kafka name="StatisticsKafkaAppender" topic="lapm_statistics" syncSend="false"> <PatternLayout pattern="${patternlayout}" /> <Property name="bootstrap.servers">${kafkaServers}</Property> </Kafka> <!-- 场景类日志KafkaAppender, lapm_scene为场景类Kafka topic --> <Kafka name="SceneKafkaAppender" topic="lapm_scene" syncSend="false"> <PatternLayout pattern="${patternlayout}" /> <Property name="bootstrap.servers">${kafkaServers}</Property> </Kafka> </Appenders> <Loggers> <AsyncRoot level="info" includeLocation="true"> <AppenderRef ref="Console" /> </AsyncRoot> <!-- 预警类日志AsyncLogger --> <AsyncLogger name="Notice" level="info" includeLocation="true" additivity="true"> <AppenderRef ref="NoticeKafkaAppender" /> </AsyncLogger> <!-- 统计类日志 AsyncLogger --> <AsyncLogger name="Statistics" level="info" includeLocation="true" additivity="true"> <AppenderRef ref="StatisticsKafkaAppender" /> </AsyncLogger> <!-- 场景类日志AsyncLogger --> <AsyncLogger name="Scene" level="info" includeLocation="true" additivity="true"> <AppenderRef ref="SceneKafkaAppender" /> </AsyncLogger> </Loggers> </Configuration>
/** 预警类日志 */ private Logger noticeLogger = LoggerFactory.getLogger("Notice"); noticeLogger.error(NoticeLogUtils.getNoticeInfo(businessCode, userCode, NoticeType.NOTICE_WARN, url, arguments, error, sysModuleCode, requestId, requestSerial) + "异常预警日志系统自定义部分");
原文地址:https://www.cnblogs.com/otways/p/11542750.html
- HDU 1166 敌兵布阵(线段树单点更新,板子题)
- 一文看懂ovirt的supervdsmd服务
- openstack如何扩展API之二:扩展原有核心API
- selenium+python自动化77-autoit文件上传
- selenium+python自动化78-autoit参数化与批量上传
- libvirt-内存分配和内存热插拔
- selenium+python自动化79-文件下载(SendKeys)
- selenium+python自动化80-文件下载(不弹询问框)
- libvirt-cpu分配和cpu热插拔
- 如何使用curl调试openstack的api
- selenium+python自动化81-报告优化
- Selenium+python自动化82-只截某个元素的图
- libvirt-TLS加密
- 在openstck中配置使用cloud-init
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 一文搞定web微信第三方登录
- python学习笔记(1)
- Swift Mutating
- 31.opengl高级光照-泛光bloom
- Swift 泛型
- Swift高阶函数map,filter,reduce
- 一文解决大批量基因相关性分析
- Swift String 与 NSString
- 30.opengl高级光照-HDR
- Angular bootstrap的一个例子
- MicrobiomeAnalyst | 零代码分析宏基因组数据
- Java8实战——通过行为参数化传递代码 顶
- Angular component的一个例子
- 初识mybatis中的缓存
- 【卷积神经网络结构专题】ResNet及其变体的结构梳理、有效性分析