Spark Tips4: Kafka的Consumer Group及其在Spark Streaming中的“异动”(更新)
按照Kafka官方的说法(http://kafka.apache.org/08/introduction.html),某一特定topic对于相同group id的clients采用queuing机制,也就是说topic中的每个message只能被多个group id相同的consumer instance(process或者machine)中的一个读取一次。
使用Kafka的High Level Consumer API (kafka.javaapi.consumer.ConsumerConnector 的createMessageStreams)的确是像文档中说的,某topic中的message在同一个group id的多个consumer instances件分布,也就是说,每个instance会得到一个互相之间没有重合的被获取的全部message的子集。
不过,当同一个groupid的consumer instance的数量超过该topic partition的数量的时候,会有一部分consumer得不到任何message。这是因为在Kafka,message 在consumer instance之间被分发的最小单位是partition。一个topic的一个partition上,如果有多于一个同group id的consumer,其中只有一个真的在工作,其他都无法获得任何message。要想扩大consumer的并发性,就要增加partition数量。下面是个例子:
public class SFQConsumerGroup {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public SFQConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.execute(new ConsumerProcessor(stream, threadNumber));
threadNumber++;
}
executor.shutdown();
while (!executor.isTerminated()) {
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("auto.offset.reset", "smallest");
props.put("auto.commit.enable", "false");
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("You need to input broker and topicName as arguments. E.g: localhost:2181 newtest1 jull, or: localhost:2181 newtest1 jull 2");
}
String zooKeeper = args[0];
String topic = args[1];
String groupId = args[2];
int threadNum = 2;
if (args.length > 3) {
threadNum = Integer.parseInt(args[3]);
}
SFQConsumerGroup example = new SFQConsumerGroup(zooKeeper, groupId, topic);
example.run(threadNum);
try {
Thread.sleep(1 * 1000);
} catch (InterruptedException ie) {
}
}
}
class ConsumerProcessor implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerProcessor(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
这里有一点需要注意:在没有个专门设置的情况下,Consumer读取topic,是从Consumer启动后再进入该topic的message开始,如果想要consumer从topic的第一个message(即使那是consumer启动前就已经publish到topic的)开始读,就需要将"auto.offset.reset" 设为"smallest"。但是即使这样,某一个特定group的consumer也只能在第一次运行的时候从topic第一个message开始读。如果想要再次从头读,就要换一个group id,或者将"auto.commit.enable"设为"false"。
但是,当Spark Streaming Job使用KafkaUtils.createDirectStream()读取topic的时候,多个同一group id的job,却每个都能consume到全部message。
例如有3个实现了下面代码的同源 job(完全一样的code,不同job name)同时在线,向该topic发送100条message,这3个job会各自接收到这100条message。每次发送皆如是。
... ...
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokerList);
kafkaParams.put("group.id", groupId);
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception {
long msgNum = rdd.count();
System.out.println("There are " + msgNum + " messages read from Kafka.");
... ...
return null;}});
... ...
在Spark中要想基于相同code的多个job在使用相同group id 读取一个topic时不重复读取,分别获得补充和的子集,需要用以下code:
Map<String,Integer> topicMap=new HashMap<String,Integer>();
for (int i = 0; i < topicNames.length; i ++) {
topicMap.put(topicNames[i], partitionNum[i]);
}
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(
jssc,
zkQuorum,
groupId,
topicMap,
StorageLevel.MEMORY_ONLY());
messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
@Override
public Void call(JavaPairRDD<String, String> rdd) throws Exception { long msgNum = rdd.count();
System.out.println("There are " + msgNum + " messages read from Kafka.");
... ...
return null;
}
});
createStream()使用了Kafka的high level API,在读取message的过程中将offset存储在了zookeeper中。而createDirectStream()使用的是simple Kafa API, 该API没有使用zookeeper,因此spark streaming job需要自己负责追踪offset。
注:测试环境:Kafka 0.8.1.1 + Spark 1.3.1
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- WebView三问—B站真题
- C++核心准则T.140:为所有可能重用的操作命名
- Service三问
- 事件分发机制三问
- C++核心准则T.141:如果你需要只在一个地方使用的简单的函数对象,使用无名的lambda表达式
- 基于DOM的XML文件解析类
- C++核心准则T.143:避免无意中编写非通用代码
- XML文件解析实践(DOM解析)
- golang 多协程的同步方法总结
- C++核心准则T.144:不要特化函数模板
- 三千字带你搞懂XXL-JOB任务调度平台
- Python-科学计算-pandas-14-df按行按列进行转换
- Python实现扫码工具
- C++核心准则T.150:用static_assert检查类和概念的匹配性
- 初学者也能快速写Python脚本啦——通用功能代码分享