Kafka核心API——Consumer消费者

时间:2022-07-25
本文章向大家介绍Kafka核心API——Consumer消费者,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

Consumer之自动提交

上文中介绍了Producer API的使用,现在我们已经知道如何将消息通过API发送到Kafka中了,那么现在的生产者/消费者模型就还差一位扮演消费者的角色了。因此,本文将介绍Consumer API的使用,使用API从Kafka中消费消息,让应用成为一个消费者角色。

还是老样子,首先我们得创建一个Consumer实例,并指定相关配置项,有了这个实例对象后我们才能进行其他的操作。代码示例:

/**
 * 创建Consumer实例
 */
public static Consumer<String, String> createConsumer() {
    Properties props = new Properties();
    // 指定Kafka服务的ip地址及端口
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127。0.0.1:9092");
    // 指定group.id,Kafka中的消费者需要在消费者组里
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
    // 是否开启自动提交
    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // 自动提交的间隔,单位毫秒
    props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    // 消息key的序列化器
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    // 消息value的序列化器
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");

    return new KafkaConsumer<>(props);
}

在以上代码中,可以看到设置了group.id这个配置项,这是一个Consumer的必要配置项,因为在Kafka中,Consumer需要位于一个Consumer Group里。具体如下图所示:

在上图中是一个Consumer消费一个Partition,是一对一的关系。但Consumer Group里可以只有一个Consumer,此时该Consumer可以消费多个Partition,是一对多的关系。如下图所示:

一个Consumer可以只消费一个Partition,也可以消费多个Partition,但需要注意的是多个Consumer不能消费同一个Partition

总结一下Consumer的注意事项:

  • 单个Partition的消息只能由Consumer Group中的某个Consumer来消费
  • Consumer从Partition中消费消息是顺序的,默认从头开始消费
  • 如果Consumer Group中只有一个Consumer,那么这个Consumer会消费所有Partition中的消息

在Kafka中,当消费者消费数据后,需要提交数据的offset来告知服务端成功消费了哪些数据。然后服务端就会移动数据的offset,下一次消费的时候就是从移动后的offset位置开始消费。

这样可以在一定程度上保证数据是被消费成功的,并且由于数据不会被删除,而只是移动数据的offset,这也保证了数据不易丢失。若消费者处理数据失败时,只要不提交相应的offset,就可以在下一次重新进行消费。

和数据库的事务一样,Kafka消费者提交offset的方式也有两种,分别是自动提交和手动提交。在本例中演示的是自动提交,这也是消费数据最简单的方式。代码示例:

/**
 * 演示自动提交offset
 */
public static void autoCommitOffset() {
    Consumer<String, String> consumer = createConsumer();
    List<String> topics = List.of("MyTopic");
    // 订阅一个或多个Topic
    consumer.subscribe(topics);
    while (true) {
        // 从Topic中拉取数据,每1000毫秒拉取一次
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        // 每次拉取可能都是一组数据,需要遍历出来
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }
    }
}

Consumer之手动提交

自动提交的方式是最简单的,但不建议在实际生产中使用,因为可控性不高。所以更多时候我们使用的是手动提交,但想要使用手动提交,就需要先关闭自动提交,修改配置项如下:

props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

关闭了自动提交后,就得在代码中调用commit相关的方法来提交offset,主要就是两个方法:commitAsynccommitSync,看方法名也知道一个是异步提交一个是同步提交。

这里以commitAsync为例,实现思路主要是在发生异常的时候不要调用commitAsync方法,而在正常执行完毕后才调用commitAsync方法。代码示例:

/**
 * 演示手动提交offset
 */
public static void manualCommitOffset() {
    Consumer<String, String> consumer = createConsumer();
    List<String> topics = List.of("MyTopic");
    // 订阅一个或多个Topic
    consumer.subscribe(topics);
    while (true) {
        // 从Topic中拉取数据,每1000毫秒拉取一次
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        // 每次拉取可能都是一组数据,需要遍历出来
        for (ConsumerRecord<String, String> record : records) {
            try {
                // 模拟将数据写入数据库
                Thread.sleep(1000);
                System.out.println("save to db...");
                System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
                        record.partition(), record.offset(), record.key(), record.value());
            } catch (Exception e) {
                // 写入失败则不要调用commit,这样就相当于起到回滚的作用,
                // 下次消费还是从之前的offset开始消费
                e.printStackTrace();
                return;
            }
        }
        // 写入成功则调用commit相关方法去手动提交offset
        consumer.commitAsync();
    }
}

##针对Partition提交offset

在前文中有介绍到,一个Consumer Group里可以只有一个Consumer,该Consumer可以消费多个Partition。在这种场景下,我们可能会在Consumer中开启多线程去处理多个Partition中的数据,以提高性能。

为了防止某些Partition里的数据消费成功,而某些Partition里的数据消费失败,却都一并提交了offset。我们就需要针对单个Partition去提交offset,也就是将offset的提交粒度控制在Partition级别。

这里先简单演示一下如何针对单个Partition提交offset,代码示例:

/**
 * 演示手动提交单个Partition的offset
 */
public static void manualCommitOffsetWithPartition() {
    Consumer<String, String> consumer = createConsumer();
    List<String> topics = List.of("MyTopic");
    // 订阅一个或多个Topic
    consumer.subscribe(topics);
    while (true) {
        // 从Topic中拉取数据,每1000毫秒拉取一次
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        // 单独处理每一个Partition中的数据
        for (TopicPartition partition : records.partitions()) {
            System.out.println("======partition: " + partition + " start======");
            // 从Partition中取出数据
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                try {
                    // 模拟将数据写入数据库
                    Thread.sleep(1000);
                    System.out.println("save to db...");
                    System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
                            record.partition(), record.offset(), record.key(), record.value());
                } catch (Exception e) {
                    // 发生异常直接结束,不提交offset
                    e.printStackTrace();
                    return;
                }
            }

            // 执行成功则取出当前消费到的offset
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            // 由于下一次开始消费的位置是最后一次offset+1的位置,所以这里要+1
            OffsetAndMetadata metadata = new OffsetAndMetadata(lastOffset + 1);
            // 针对Partition提交offset
            Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
            offsets.put(partition, metadata);
            // 同步提交offset
            consumer.commitSync(offsets);
            System.out.println("======partition: " + partition + " end======");
        }
    }
}

Consumer针对一个或多个Partition进行订阅

在之前的例子中,我们都是针对Topic去订阅并消费数据,实际上也可以更细粒度一些针对Partition进行订阅,这通常应用在一个Consumer多线程消费的场景下。代码示例:

/**
 * 演示将订阅粒度控制到Partition级别
 * 针对单个或多个Partition进行订阅
 */
public static void manualCommitOffsetWithPartition2() {
    Consumer<String, String> consumer = createConsumer();

    // 该Topic中有两个Partition
    TopicPartition p0 = new TopicPartition("MyTopic", 0);
    TopicPartition p1 = new TopicPartition("MyTopic", 1);

    // 订阅该Topic下的一个Partition
    consumer.assign(List.of(p0));
    // 也可以订阅该Topic下的多个Partition
    // consumer.assign(List.of(p0, p1));

    while (true) {
        ...与上一小节中的代码一致,略...
    }
}

Consumer多线程并发处理

前面两个小节的内容基本都是为了本小节所介绍的多线程并发处理消息而铺垫的,因为为了提高应用对消息的处理效率,我们通常会使用多线程来并行消费消息,从而加快消息的处理速度。

而多线程处理消息的方式主要有两种,一种是按Partition数量创建线程,然后每个线程里创建一个Consumer,多个Consumer对多个Partition进行消费。这就和之前在介绍Consumer Group时,给出的那张图所展示的一样:

这种属于是经典模式,实现起来也比较简单,适用于对消息的顺序和offset控制有要求的场景。代码示例:

package com.zj.study.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 经典模式
 *
 * @author 01
 * @date 2020-05-21
 **/
public class ConsumerThreadSample {

    private final static String TOPIC_NAME = "MyTopic";

    /**
     * 这种类型是经典模式,每一个线程单独创建一个KafkaConsumer,用于保证线程安全
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);
        t1.start();

        Thread.sleep(15000);
        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable {

        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<String, String> consumer;

        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.220.128:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(List.of(p0, p1));
        }

        @Override
        public void run() {
            try {
                while (!closed.get()) {
                    //处理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 处理每个分区的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(), record.offset(), record.key(), record.value());
                        }

                        // 返回去告诉kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }
                }
            } catch (WakeupException e) {
                if (!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

另一种多线程的消费方式则是在一个线程池中只创建一个Consumer实例,然后通过这个Consumer去拉取数据后交由线程池中的线程去处理。如下图所示:

但需要注意的是在这种模式下我们无法手动控制数据的offset,也无法保证数据的顺序性,所以通常应用在流处理场景,对数据的顺序和准确性要求不高。

经过之前的例子,我们知道每拉取一次数据返回的就是一个ConsumerRecords,这里面存放了多条数据。然后我们对ConsumerRecords进行迭代,就可以将多条数据交由线程池中的多个线程去并行处理了。代码示例:

package com.zj.study.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 一个Consumer,多个hander模式
 *
 * @author 01
 * @date 2020-05-21
 **/
public class ConsumerRecordThreadSample {

    private final static String TOPIC_NAME = "MyTopic";

    public static void main(String[] args) throws InterruptedException {
        String brokerList = "192.168.220.128:9092";
        String groupId = "test";
        int workerNum = 5;

        ConsumerExecutor consumers = new ConsumerExecutor(brokerList, groupId, TOPIC_NAME);
        consumers.execute(workerNum);

        Thread.sleep(1000000);

        consumers.shutdown();

    }

    /**
     * Consumer处理
     */
    public static class ConsumerExecutor {

        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;

        public ConsumerExecutor(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(List.of(topic));
        }

        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(200);
                for (final ConsumerRecord<String, String> record : records) {
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }

            try {
                if (executors != null && !executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }
    }

    /**
     * 记录处理
     */
    public static class ConsumerRecordWorker implements Runnable {

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord<String, String> record) {
            this.record = record;
        }

        @Override
        public void run() {
            // 假如说数据入库操作
            System.out.println("Thread - " + Thread.currentThread().getName());
            System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }
    }
}

Consumer控制offset起始位置

上一小节中介绍的第二种多线程消息模式,通过Consumer拉取数据后交由多线程去处理是没法控制offset的,如果此时程序出现错误或其他意外情况导致消息没有被正确消费,我们就需要人为控制offset的起始位置重新进行消费。

通过调用seek方法可以指定从哪个Partition的哪个offset位置进行消费,代码示例:

/**
 * 手动控制offset的起始位置
 */
public static void manualCommitOffsetWithPartition2() {
    Consumer<String, String> consumer = createConsumer();
    TopicPartition p0 = new TopicPartition("MyTopic", 0);
    consumer.assign(List.of(p0));

    // 指定offset的起始位置
    consumer.seek(p0, 1);
    while (true) {
        ...与上一小节中的代码一致,略...
    }
}

实际应用中的设计思路:

  1. 第一次从某个offset的起始位置进行消费
  2. 如果本次消费了100条数据,那么offset设置为101并存入Redis等缓存数据库中
  3. 后续每次poll之前,从Redis中获取offset值,然后从这个offset的起始位置进行消费
  4. 消费完后,再次将新的offset值存入Redis,周而复始

Consumer限流

为了避免Kafka中的流量剧增导致过大的流量打到Consumer端将Consumer给压垮的情况,我们就需要针对Consumer进行限流。例如,当处理的数据量达到某个阈值时暂停消费,低于阈值时则恢复消费,这就可以让Consumer保持一定的速率去消费数据,从而避免流量剧增时将Consumer给压垮。大体思路如下:

  1. poll到数据之后,先去令牌桶中拿取令牌
  2. 如果获取到令牌,则继续业务处理
  3. 如果获取不到令牌,则调用pause方法暂停Consumer,等待令牌
  4. 当令牌桶中的令牌足够,则调用resume方法恢复Consumer的消费状态

接下来编写具体的代码案例简单演示一下这个限流思路,令牌桶算法使用Guava里内置的,所以需要在项目中添加对Guava的依赖。添加的依赖项如下:

<!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>

然后我们就可以使用Guava的限流器对Consumer进行限流了,代码示例:

public class ConsumerCurrentLimiting {
    /*** 令牌生成速率,单位为秒 */
    public static final int permitsPerSecond = 1;
    /*** 限流器 */
    private static final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond);

    /**
     * 创建Consumer实例
     */
    public static Consumer<String, String> createConsumer() {
        ... 与之前小节的代码类似,略 ...
    }

    /**
     * 演示对Consumer限流
     */
    public static void currentLimiting() {
        Consumer<String, String> consumer = createConsumer();
        TopicPartition p0 = new TopicPartition("MyTopic", 0);
        TopicPartition p1 = new TopicPartition("MyTopic", 1);
        consumer.assign(List.of(p0, p1));

        while (true) {
            // 从Topic中拉取数据,每100毫秒拉取一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1));
            if (records.isEmpty()) {
                continue;
            }

            // 限流
            if (!LIMITER.tryAcquire()) {
                System.out.println("无法获取到令牌,暂停消费");
                consumer.pause(List.of(p0, p1));
            } else {
                System.out.println("获取到令牌,恢复消费");
                consumer.resume(List.of(p0, p1));
            }

            // 单独处理每一个Partition中的数据
            for (TopicPartition partition : records.partitions()) {
                System.out.println("======partition: " + partition + " start======");
                // 从Partition中取出数据
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    try {
                        // 模拟将数据写入数据库
                        Thread.sleep(1000);
                        System.out.println("save to db...");
                        System.out.printf("partition = %d, offset = %d, key = %s, value = %s%n",
                                record.partition(), record.offset(), record.key(), record.value());
                    } catch (Exception e) {
                        // 发生异常直接结束,不提交offset
                        e.printStackTrace();
                        return;
                    }
                }

                // 执行成功则取出当前消费到的offset
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                // 由于下一次开始消费的位置是最后一次offset+1的位置,所以这里要+1
                OffsetAndMetadata metadata = new OffsetAndMetadata(lastOffset + 1);
                // 针对Partition提交offset
                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                offsets.put(partition, metadata);
                // 同步提交offset
                consumer.commitSync(offsets);
                System.out.println("======partition: " + partition + " end======");
            }
        }
    }

    public static void main(String[] args) {
        currentLimiting();
    }
}

Consumer Rebalance解析

Consumer有个Rebalance的特性,即重新负载均衡,该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。

之所以要重新负载均衡,是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时,将组内其他Consumer的负载压力,重新进均匀分配,而不会说新加入一个Consumer就闲在那。

下面就用几张图简单描述一下,各种情况触发Rebalance时,组内成员是如何与协调器进行交互的。

1、新成员加入组(member join):

  • Tips:图中的Coordinator是协调器,而generation则类似于乐观锁中的版本号,每当成员入组成功就会更新,也是起到一个并发控制的作用

2、组成员崩溃/非正常退出(member failure):

3、组成员主动离组/正常退出(member leave group):

4、当Consumer提交位移(member commit offset)时,也会有类似的交互过程: