使用Kafka的High Level Consumer
时间:2022-04-27
本文章向大家介绍使用Kafka的High Level Consumer,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
##为什么使用High Level Consumer
- 在某些应用场景,我们希望通过多线程读取消息,而我们并不关心从Kafka消费消息的顺序,我们仅仅关心数据能被消费就行。High Level 就是用于抽象这类消费动作的。
- 消息消费已Consumer Group为单位,每个Consumer Group中可以有多个consumer,每个consumer是一个线程,topic的每个partition同时只能被某一个consumer读取,Consumer Group对应的每个partition都有一个最新的offset的值,存储在zookeeper上的。所以不会出现重复消费的情况。
##设计High Level Consumer High Level Consumer 可以并且应该被使用在多线程的环境,线程模型中线程的数量(也代表group中consumer的数量)和topic的partition数量有关,下面列举一些规则:
- 当提供的线程数量多于partition的数量,则部分线程将不会接收到消息;
- 当提供的线程数量少于partition的数量,则部分线程将从多个partition接收消息;
- 当某个线程从多个partition接收消息时,不保证接收消息的顺序;可能出现从partition3接收5条消息,从partition4接收6条消息,接着又从partition3接收10条消息;
- 当添加更多线程时,会引起kafka做re-balance, 可能改变partition和线程的对应关系。
##代码示例 ConsumerGroupExample
package com.test.groups;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
ConsumerTest
package com.test.groups;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- nginx如何限制并发连接请求数?
- RTSP协议视频平台EasyNVR证书配置界面上传文件地址自动填写错误怎么处理?
- 开发RTSP/RTMP/GB28181/海康SDK/EHome视频融合平台EasyCVR,使用vue-cli3项目搭建多页面模式的方法
- 互联网视频直播&点播平台RTMP推流组件EasyRTMP在弱网环境下推流稳定吗?会不会有推流失败的问题?
- 安防融合视频云服务EasyCVR集成海康EHome协议实现设备录像回看返回会话ID为-1是什么情况?
- git .gitignore 忽略规则的匹配语法
- vue 初始化高德地图
- js -- 判断数组是否为空?
- 手机没网了,却还能支付,这是什么原理?
- grub异常
- 34.Python字符串替换方法translate
- 35.Python字符串格式运算符%
- 好一个 Spring Boot 开源在线考试系统!解决了我的燃眉之急
- OC基础--数据类型与表达式
- OC基础--字符串