kafka-Producer API
时间:2020-03-10
本文章向大家介绍kafka-Producer API,主要包括kafka-Producer API使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
1.消息发送流程
kafka的producer发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了两个线程——main线程和Sender线程,以及一个线程共享变量——RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到kafka broker。
2.异步发送API
2.1 导入依赖
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
2.2 普通生产者
不带回调函数,其中像端口号等配置项都封装在了ProducerConfig这个类中,也可以使用
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG 这种方式去设置属性
2.2.1 编写代码
package com.wn.Test01;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class MyProducer {
public static void main(String[] args){
//创建kafka生产者的配置信息
Properties properties = new Properties();
//kafka集群
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//批次大小 16K
properties.put("batch.size",16384);
//等待时间
properties.put("linger.ms",1);
//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<String, String>("wang","wnwn--"+i));
}
//关闭资源
producer.close();
}
}
2.2.2 启动zookeeper和kafka
2.2.3 创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.62181,192.168.138.77:2181 --topic wang
2.2.4 执行方法,查看接收数据
2.3 带回调函数的生产者
在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数
2.3.1 编写测试代码
package com.wn.Test01;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/*带回调函数的生产者*/
//在生产者send时,设置Callback对象,并重写里面的onCompletion方法,回调函数
public class CallBackProducer {
public static void main(String[] args){
//创建配置信息
Properties properties = new Properties();
//kafka服务端的主机名和端口号
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//一批消息处理大小
properties.put("batch.size",16384);
//请求延迟
properties.put("linger.ms",1);
//发送缓存区内存大小
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i=0;i<50;i++){
producer.send(new ProducerRecord<String, String>("wang", Integer.toString(i), "hello word-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
}
}
});
}
//关闭资源
producer.close();
}
}
2.3.2 启动zookeeper和kafka
2.3.3 创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic wang
2.3.4 执行方法,查看结果
2.4 自定义分区的生产者
2.4.1 创建自定义分区
package com.wn.Test01;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/*自定义分区*/
public class MyPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes1, Cluster cluster) {
//Integer integer = cluster.partitionCountForTopic(topic);
//return key.toString().hashCode()%integer;
return 1;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
2.4.2 创建自定义分区的生产者
package com.wn.Test01;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/*自定义分区的生产者*/
public class PartitionProducer {
public static void main(String[] args){
//创建kafka生产者的配置信息
Properties properties = new Properties();
//kafka集群 ProducerConfig
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//批次大小 16K
properties.put("batch.size",16384);
//等待时间
properties.put("linger.ms",1);
//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//添加分区器
properties.put("partitioner.class","com.wn.Test01.MyPartitioner");
//创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i=0;i<10;i++){
producer.send(new ProducerRecord<String, String>("aaa", Integer.toString(i), "word-" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata!=null){
System.out.println(recordMetadata.partition()+"---"+recordMetadata.offset());
}
}
});
}
//关闭资源
producer.close();
}
}
2.4.3 启动zookeeper+kafka
2.4.4 创建消费者
bin/kafka-console-consumer.sh --zookeeper 192.168.138.55:2181,192.168.138.66:2181,192.168.138.77:2181 --topic aaa
2.4.5 执行方法,查看结果
将所有的消息都在1号分区;
4.同步发送API
同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack;
由于send方法返回的是一个Future对象,根据Future对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方法即可;
package com.wn.Test01;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/*同步发送生产者*/
public class TongProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建kafka生产者的配置信息
Properties properties = new Properties();
//kafka集群 ProducerConfig
properties.put("bootstrap.servers","192.168.138.55:9092,192.168.138.66:9092,192.168.138.77:9092");
//ack应答级别
properties.put("acks","all");
//重试次数
properties.put("retries",3);
//批次大小 16K
properties.put("batch.size",16384);
//等待时间
properties.put("linger.ms",1);
//RecordAccumulator缓冲区大小 32M
properties.put("buffer.memory",33554432);
//key,value序列化类
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
//创建生产者对象
KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
//发送数据
for (int i=0;i<5;i++){
producer.send(new ProducerRecord<String, String>("aaa","wnwn--"+i)).get();
}
//关闭资源
producer.close();
}
}
原文地址:https://www.cnblogs.com/wnwn/p/12422485.html
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 编译安装 SeasLog 扩展
- Python3笔试实际操作基础1.md
- Python3入门学习四.md
- 通用封装函数——四则运算
- 打造自己最喜爱的 Windows10 —— 系统与软件配置优化篇
- 编译安装 IgBinary 扩展
- Python3入门学习二.md
- 编译安装 Yaml 扩展
- 一行代码不用写,就可以训练、测试、使用模型,这个star量1.5k的项目帮你做到
- 打造自己最喜爱的 Windows10 —— 纯命令安装系统篇
- Ubuntu18.04 切换 Python 版本
- Python3入门学习三.md
- Yur 主题 MarkDown 展示
- Python3入门学习一.md
- 前后端分离探索——MVC 项目升级的一个过渡方案