kafka使用avro序列化和反序列化
时间:2022-07-23
本文章向大家介绍kafka使用avro序列化和反序列化,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。
使用avro生成entity文件可以查看这篇文章https://blog.csdn.net/u012062455/article/details/84889694
生产者代码
public static void CustomerTest() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","192.168.0.31:9092,192.168.0.32:9092,192.168.0.33:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer","org.apache.kafka.common.serialization.ByteArraySerializer");
KafkaProducer producer = new KafkaProducer<String,byte[]>(kafkaProps);
for(int i = 0;i < 1000;i++){
Customer customer = new Customer();
customer.setEmail("23132@163.com-" + i);
customer.setName("ric-" + i);
customer.setId(i);
customer.setImages(null);
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, (BinaryEncoder)null);
SpecificDatumWriter writer = new SpecificDatumWriter(customer.getSchema());
try {
writer.write(customer, encoder);
encoder.flush();
out.close();
} catch (IOException e) {
e.printStackTrace();
}
ProducerRecord<String,byte[]> record = new ProducerRecord<String, byte[]>("Customer","customer-"+i,out.toByteArray());
producer.send(record);
}
producer.close();
}
消费者代码
public static void CustomerTest() {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","192.168.0.31:9092,192.168.0.32:9092,192.168.0.33:9092");
kafkaProps.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
kafkaProps.put("value.deserializer","org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaProps.put("group.id","DemoAvroKafkaConsumer2");
kafkaProps.put("auto.offset.reset","earliest");
KafkaConsumer<String ,byte[]> consumer = new KafkaConsumer<String, byte[]>(kafkaProps);
consumer.subscribe(Collections.singletonList("Customer"));
SpecificDatumReader<Customer> reader = new SpecificDatumReader<>(Customer.getClassSchema());
try {
while (true){
ConsumerRecords<String,byte[]> records = consumer.poll(10);
for(ConsumerRecord<String,byte[]> record : records){
Decoder decoder = DecoderFactory.get().binaryDecoder(record.value(), null);
Customer customer = null;
try {
customer = reader.read(null,decoder);
System.out.println(record.key() + ":" + customer.get("id") + "t" + customer.get("name") + "t" + customer.get("email"));
} catch (IOException e) {
e.printStackTrace();
}
}
}
} finally {
consumer.close();
}
}
相关pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-tools</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>bijection-avro_2.11</artifactId>
<version>0.9.6</version>
</dependency>
- 通过企业分布式缓存共享运行时数据
- 移植SlidingMenu Android library,和安装example出现的问题解决
- 无限级分类(非递归算法/存储过程版/GUID主键)完整数据库示例_(2)插入记录
- Centos中yum方式安装java
- 微信小程序新革命催生新物种新物种带来大红利!玩转行业新玩法
- 无限级分类(非递归算法/存储过程版/GUID主键)完整数据库示例_(3)删除记录
- 部署Zipkin分布式性能追踪日志系统的操作记录
- 无限级分类(非递归算法/存储过程版/GUID主键)完整数据库示例_(4)显示记录
- Android 2.x中使用actionbar - Actionbarsherlock (2)
- python读txt和xml
- 让Jexus支持高并发请求的优化技巧
- 数据压缩算法LZO (C#)
- Html之初体验
- 基于Wolfpack开发业务监控系统
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法