Kafka源码系列之0.10版本的Producer源码解析及性能点讲解
一,基础讲解
本文是基于kafka 0.10讲的,kafkaProducer模型和0.8的客户端模型大致是一样的,区别是0.8版本的会为每个Broker(有给定topic分区leader的Broker)创建一个SyncProducer,而0.10的Producer是用一个NioSelector实现实现了多链接的维护的。也是一个后台线程进行发送。基本步骤,也是定期获取元数据,将消息按照key进行分区后归类,每一类发送到正确的Broker上去。
再写kafka文章的原因是0.10版本后跟spark结合有了大的变动,后面会讲解多版本的sparkStreaming和StructuredStreaming 与kafka的各种结合。所以在这里会更新两篇kafka文章:一篇关于kafka 0.10版本的Producer,另一篇当然是kafka 0.10版本的Consumer了。为后面的文章打下基础。
二,重要类讲解
Cluster
代表一个当前kafka集群的nodes,topics和partitions子集
Selector
org.apache.kafka.common.network.Selector。一个nioSelector的接口,负责非阻塞多链接网络I/O操作。该类于NetworkSend和NetworkReceive协同工作,传输大小限制的网络请求和应答。一个新的链接可以被加入到该nioSelector,当然需要配上一个id,通过调用
connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize)
内部维护了一个java NIOSelector
java.nio.channels.Selector nioSelector;
NetworkClient
一个针对异步请求/应答的网络IO 的网络客户端。这是一个内部类,用来实现用户层面的生产消费者客户端。非线程安全的。
Sender
一个后台线程,主要负责发送生产请求到kafka集群。该线程会更新kafka集群的metadata,将produce Request发送到正确的节点。
RecordAccumulator
该类扮演的是一个队列的角色,将records追加到MemoryRecords实例中,用于发送到server端。
RecordBatch
一批准备发送的消息。该类是线程不安全的,需要加入外部同步加入需要修改的话。
MemoryRecords
用一个byteBuffer支撑的Records的实现。
RecordAccumulator维护了一个ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
RecordBatch维护了一个MemoryRecords。
三,源码过程
1,构建必要对象的过程
用户代码里会构建一个KafkaProducer对象。
producer = new KafkaProducer<>(props);
在构造函数里活做三个重要的的事情
A),new Selector传递给NetworkClient
B),new NetworkClient
C),new Sender
D),new KafkaThread并将构建的send对象,当做该线程的runnable。并启动该线程。
E),构建了分区器和一个Metatdata。
F),构建了一个RecordAccumulator。此时需要关注的两个配置是
batch.size:批量发送的大小。
linger.ms:超时发送的时间。
合理配置两个值,有利于我们提升kafkaProducer的性能。
2,消息加入发送队列的过程
1),用户程序里调用KafkaProducer.send发送消息
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
2),对消息按照partition策略进行分区。
//获取分区号
int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
3),将消息追加到RecordAccumulator。
//将消息追加到RecordAccumulator
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
具体追加的细节,先根据topic和partition信息获取一个recordBatch,然后在获取MemoryRecords,将消息加入其中
//根据topic和partition信息获取该partition的队列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {//RecordBatch类非安全,需要加外部同步
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
if (appendResult != null)
return appendResult;
}
tryAppend内部
// 获取最后一个RecordBatch
RecordBatch last = deque.peekLast();
if (last != null) {
// 将消息追加到该RecordBatch里面
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
Last.TryAppend
// 首先会判断是否有充足的空间
if (!this.records.hasRoomFor(key, value)) {
return null;
} else {
// 将消息加入memoryRecords
long checksum = this.records.append(offsetCounter++, timestamp, key, value);
this.maxRecordSize = Math.max(this.maxRecordSize, Record.recordSize(key, value));
this.lastAppendTime = now;
FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
timestamp, checksum,
key == null ? -1 : key.length,
value == null ? -1 : value.length);
if (callback != null)
thunks.add(new Thunk(callback, future));
this.recordCount++;
return future;
}
3,消息发送的过程
1),获取Cluster
//获取当前cluster信息,
Cluster cluster = metadata.fetch();
2),获取那些有数据待发送的分区,依据是batch.size和linger.ms
//获取当前准备好发送的有数据的分区
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
3),更新那些leader未知的分区信息
if (result.unknownLeadersExist)
this.metadata.requestUpdate();
4),移除不能发送Request的node
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
//判断连接是否能发消息
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
5),转化为list格式,以node为基准,清空那些给定的node数据
清空所有给定node的数据,然后将它们放到给定适合大小的list,以单个node为基准
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
6),以node为基准,将batches转化为ProducerRequests
//以单个node为基准,将batches数据转化为ProducerRequests
List<ClientRequest> requests = createProduceRequests(batches, now);
7),发送数据
for (ClientRequest request : requests)
client.send(request, now);
8),做真正的网络读写的动作,之前会更新元数据 this.client.poll(pollTimeout, now);
四,总结
写本文的原因是为StructuredStreaming的系列文章之kafkaSink做准备。
1,具体调优请参考kafka系列文章。
2,性能调优的参数重要的就两个
batch.size:批量发送的大小。
linger.ms:超时发送的时间。
3,具体跟0.8.2.2区别,请参考:Kafka源码系列之通过源码分析Producer性能瓶颈
- 分享微信小程序推送消息步骤
- 实例分享微信小程序项目搭建(下)
- 实例分享微信小程序项目搭建(上)
- Android6.0源码分析之蓝牙显示接收到的文件
- Android中应用调用系统权限
- Android5.0以后隐式启动ServiceBug
- Android6.0源码分析之录音功能(一)
- Android6.0源码开发之修改默认音量default及max和min
- Android源码开发之添加/删除系统应用
- 按键事件处理
- Android6.0锁屏源码分析之界面布局分析
- Android6.0源码分析之menu键弹出popupwindow菜单流程分析
- Android中初步自定义view
- Android中View研究自学之路 Android6.0源码分析之View(一)Android6.0源码分析之View(二)
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- JavaEE中的el 表达式以及JSTL详解
- 基于Jsp和Servlet的简单项目
- 基于Servlet实现的简单登陆练习
- JavaScript初识
- 聊聊dubbo-go的DubboInvoker
- 如何在云开发Cloudbase中使用MySQL数据库
- Kubernetes笔记之基本概念
- 腾讯云语音识别之一句话识别
- 创建线程都有哪些方式?— Callable篇
- UiAutomator源码学习(3)-- UiObject
- RTSP协议视频流媒体播放器EasyPlayer-RTSP-OCX接口文档API接口函数定义
- Python逐行写入
- C++ this 指针
- C++ push方法与push_back方法
- 别人变强靠天赋,而我,靠思维导图