kafka-Producer API

时间:2020-03-10
本文章向大家介绍kafka-Producer API,主要包括kafka-Producer API使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

1.消息发送流程

  kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。

    

2.异步发送API

  2.1 导入依赖

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

  2.2 普通生产者

    不带回调函数,其中像端口号等配置项都封装在了ProducerConfig这个类中,也可以使用

ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 这种方式去设置属性

    2.2.1 编写代码

package com.wn.Test01;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args){
        //创建kafka生产者的配置信息
        Properties properties = new Properties();
        //kafka集群
        properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
        //ack应答级别
        properties.put("acks","all");
        //重试次数
        properties.put("retries",3);
        //批次大小 16K
        properties.put("batch.size",16384);
        //等待时间
        properties.put("linger.ms",1);
        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory",33554432);
        //key,value序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //创建生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
        //发送数据
        for (int i=0;i<10;i++){
            producer.send(new ProducerRecord<String, String>("wang","wnwn--"+i));
        }
        //关闭资源
        producer.close();

    }
}

    2.2.2 启动zookeeper和kafka

    2.2.3 创建消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.62181,192.168.138.77:2181 --topic wang

    2.2.4 执行方法,查看接收数据

    

  2.3 带回调函数的生产者

    在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数

    2.3.1 编写测试代码

package com.wn.Test01;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/*带回调函数的生产者*/
//在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数
public class CallBackProducer {
    public static void main(String[] args){
        //创建配置信息
        Properties properties = new Properties();
        //kafka服务端的主机名和端口号
        properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
        //ack应答级别
        properties.put("acks","all");
        //重试次数
        properties.put("retries",3);
        //一批消息处理大小
        properties.put("batch.size",16384);
        //请求延迟
        properties.put("linger.ms",1);
        //发送缓存区内存大小
        properties.put("buffer.memory",33554432);
        //key,value序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //创建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i=0;i<50;i++){
            producer.send(new ProducerRecord<String, String>("wang", Integer.toString(i), "hello word-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (recordMetadata!=null){
                        System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
                    }
                }
            });
        }
        //关闭资源
        producer.close();
    }
}

    2.3.2 启动zookeeper和kafka

    2.3.3 创建消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic wang

    2.3.4 执行方法,查看结果

    

  2.4 自定义分区的生产者

    2.4.1 创建自定义分区

package com.wn.Test01;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/*自定义分区*/
public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes1, Cluster cluster) {
        //Integer integer = cluster.partitionCountForTopic(topic);
        //return key.toString().hashCode()%integer;
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

    2.4.2 创建自定义分区的生产者

package com.wn.Test01;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

/*自定义分区的生产者*/
public class PartitionProducer {
    public static void main(String[] args){
        //创建kafka生产者的配置信息
        Properties properties = new Properties();
        //kafka集群   ProducerConfig
        properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
        //ack应答级别
        properties.put("acks","all");
        //重试次数
        properties.put("retries",3);
        //批次大小 16K
        properties.put("batch.size",16384);
        //等待时间
        properties.put("linger.ms",1);
        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory",33554432);
        //key,value序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //添加分区器
        properties.put("partitioner.class","com.wn.Test01.MyPartitioner");
        //创建生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
        //发送数据
        for (int i=0;i<10;i++){
            producer.send(new ProducerRecord<String, String>("aaa", Integer.toString(i), "word-" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (recordMetadata!=null){
                        System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
                    }
                }
            });
        }
        //关闭资源
        producer.close();

    }
}

    2.4.3 启动zookeeper+kafka

    2.4.4 创建消费者

bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic aaa

    2.4.5 执行方法,查看结果

      

      

       将所有的消息都在1号分区;

4.同步发送API

  同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack;

  由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方法即可;

package com.wn.Test01;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/*同步发送生产者*/
public class TongProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建kafka生产者的配置信息
        Properties properties = new Properties();
        //kafka集群   ProducerConfig
        properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
        //ack应答级别
        properties.put("acks","all");
        //重试次数
        properties.put("retries",3);
        //批次大小 16K
        properties.put("batch.size",16384);
        //等待时间
        properties.put("linger.ms",1);
        //RecordAccumulator缓冲区大小 32M
        properties.put("buffer.memory",33554432);
        //key,value序列化类
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        //创建生产者对象
        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
        //发送数据
        for (int i=0;i<5;i++){
            producer.send(new ProducerRecord<String, String>("aaa","wnwn--"+i)).get();
        }
        //关闭资源
        producer.close();

    }
}

 

原文地址:https://www.cnblogs.com/wnwn/p/12422485.html