Spark Tips4: Kafka的Consumer Group及其在Spark Streaming中的“异动”(更新)

时间:2022-05-04
本文章向大家介绍Spark Tips4: Kafka的Consumer Group及其在Spark Streaming中的“异动”(更新),主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

按照Kafka官方的说法(http://kafka.apache.org/08/introduction.html),某一特定topic对于相同group id的clients采用queuing机制,也就是说topic中的每个message只能被多个group id相同的consumer instance(process或者machine)中的一个读取一次。

使用Kafka的High Level Consumer API (kafka.javaapi.consumer.ConsumerConnector 的createMessageStreams)的确是像文档中说的,某topic中的message在同一个group id的多个consumer instances件分布,也就是说,每个instance会得到一个互相之间没有重合的被获取的全部message的子集。

不过,当同一个groupid的consumer instance的数量超过该topic partition的数量的时候,会有一部分consumer得不到任何message。这是因为在Kafka,message 在consumer instance之间被分发的最小单位是partition。一个topic的一个partition上,如果有多于一个同group id的consumer,其中只有一个真的在工作,其他都无法获得任何message。要想扩大consumer的并发性,就要增加partition数量。下面是个例子:

public class SFQConsumerGroup {
    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;

    public SFQConsumerGroup(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

        // now launch all the threads
        executor = Executors.newFixedThreadPool(a_numThreads);

        // now create an object to consume the messages
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.execute(new ConsumerProcessor(stream, threadNumber));
            threadNumber++;
        }

        executor.shutdown();
        while (!executor.isTerminated()) {

        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("auto.offset.reset", "smallest");
        props.put("auto.commit.enable", "false");        
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {
        if (args.length < 3) {
            System.err.println("You need to input broker and topicName as arguments. E.g: localhost:2181 newtest1 jull, or: localhost:2181 newtest1 jull 2");
        }

        String zooKeeper = args[0];
        String topic = args[1];
        String groupId = args[2];

        int threadNum = 2;

        if (args.length  > 3) {
          threadNum = Integer.parseInt(args[3]);
        }

        SFQConsumerGroup example = new SFQConsumerGroup(zooKeeper, groupId, topic);
        example.run(threadNum);

        try {
            Thread.sleep(1 * 1000);
        } catch (InterruptedException ie) {

        }        
    }
}
class ConsumerProcessor implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerProcessor(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

这里有一点需要注意:在没有个专门设置的情况下,Consumer读取topic,是从Consumer启动后再进入该topic的message开始,如果想要consumer从topic的第一个message(即使那是consumer启动前就已经publish到topic的)开始读,就需要将"auto.offset.reset" 设为"smallest"。但是即使这样,某一个特定group的consumer也只能在第一次运行的时候从topic第一个message开始读。如果想要再次从头读,就要换一个group id,或者将"auto.commit.enable"设为"false"。

但是,当Spark Streaming Job使用KafkaUtils.createDirectStream()读取topic的时候,多个同一group id的job,却每个都能consume到全部message。

例如有3个实现了下面代码的同源 job(完全一样的code,不同job name)同时在线,向该topic发送100条message,这3个job会各自接收到这100条message。每次发送皆如是。

... ...

    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokerList);
    kafkaParams.put("group.id", groupId);

    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );

    messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {

        @Override
        public Void call(JavaPairRDD<String, String> rdd) throws Exception {
            long msgNum = rdd.count();
            System.out.println("There are " + msgNum + " messages read from Kafka.");

        ... ...

        return null;}});
... ...

在Spark中要想基于相同code的多个job在使用相同group id 读取一个topic时不重复读取,分别获得补充和的子集,需要用以下code:

Map<String,Integer> topicMap=new HashMap<String,Integer>();
for (int i  = 0; i < topicNames.length; i ++) {
    topicMap.put(topicNames[i], partitionNum[i]);
}
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(
    jssc, 
    zkQuorum, 
    groupId, 
    topicMap,
    StorageLevel.MEMORY_ONLY());    
messages.foreachRDD(new Function<JavaPairRDD<String, String>, Void>() {
    @Override
    public Void call(JavaPairRDD<String, String> rdd) throws Exception {                         long msgNum = rdd.count();
        System.out.println("There are " + msgNum + " messages read from Kafka.");
    
         ... ...
         return null;
    }
});

createStream()使用了Kafka的high level API,在读取message的过程中将offset存储在了zookeeper中。而createDirectStream()使用的是simple Kafa API, 该API没有使用zookeeper,因此spark streaming job需要自己负责追踪offset。

:测试环境:Kafka 0.8.1.1 + Spark 1.3.1