kafka学习二 -发送消息

//在消息收集器中追加信息,为批量发送消息做准备 重要 append重点
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs);
//如果消息收集器中的消息收集结果为空或者新的消息批次已经创建好,进行sender唤醒  sender重点
if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
return result.future;





public RecordAppendResult append(TopicPartition tp,
                                 long timestamp,
                                 byte[] key,
                                 byte[] value,
                                 Header[] headers,
                                 Callback callback,
                                 long maxTimeToBlock) throws InterruptedException {
    // We keep track of the number of appending thread to make sure we do not miss batches in
    // abortIncompleteBatches().
    ByteBuffer buffer = null;
    if (headers == null) headers = Record.EMPTY_HEADERS;
    try {
        // check if we have an in-progress batch
        Deque<ProducerBatch> dq = getOrCreateDeque(tp);
        synchronized (dq) {
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");
            //如果有,则执行tryAppend 追加操作
            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null)
                return appendResult;

        // we don't have an in-progress record batch try to allocate a new batch

        byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
        int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
        log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
        buffer = free.allocate(size, maxTimeToBlock);
        synchronized (dq) {
            // Need to check if producer is closed again after grabbing the dequeue lock.
            if (closed)
                throw new IllegalStateException("Cannot send after the producer is closed.");

            RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
            if (appendResult != null) {
                // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                return appendResult;

            MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
            ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
            FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));


            // Don't deallocate this buffer in the finally block as it's being used in the record batch
            // 不要在finally块中取消分配此缓冲区,因为它已在消息批次中使用
            buffer = null;
            return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
    } finally {
        if (buffer != null)



private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque<ProducerBatch> deque) {
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    return null;


public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
    if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
        return null;
    } else {
        Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
        this.maxRecordSize = Math.max(this.maxRecordSize, AbstractRecords.estimateSizeInBytesUpperBound(magic(),
                recordsBuilder.compressionType(), key, value, headers));
        this.lastAppendTime = now;
        FutureRecordMetadata future = new FutureRecordMetadata(this.produceFuture, this.recordCount,
                                                               timestamp, checksum,
                                                               key == null ? -1 : key.length,
                                                               value == null ? -1 : value.length);
        // we have to keep every future returned to the users in case the batch needs to be
        // split to several new batches and resent.
        thunks.add(new Thunk(callback, future));
        return future;


private Long appendWithOffset(long offset, boolean isControlRecord, long timestamp, ByteBuffer key,
                              ByteBuffer value, Header[] headers) {
    try {
        if (isControlRecord != isControlBatch)
            throw new IllegalArgumentException("Control records can only be appended to control batches");

        if (lastOffset != null && offset <= lastOffset)
            throw new IllegalArgumentException(String.format("Illegal offset %s following previous offset %s " +
                    "(Offsets must increase monotonically).", offset, lastOffset));

        if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
            throw new IllegalArgumentException("Invalid negative timestamp " + timestamp);

        if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0)
            throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");

        if (firstTimestamp == null)
            firstTimestamp = timestamp;

        if (magic > RecordBatch.MAGIC_VALUE_V1) {
            appendDefaultRecord(offset, timestamp, key, value, headers);
            return null;
        } else {
            return appendLegacyRecord(offset, timestamp, key, value);
    } catch (IOException e) {
        throw new KafkaException("I/O exception when writing to the append stream, closing", e);


private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
    if (compressionType == CompressionType.NONE && timestampType == TimestampType.LOG_APPEND_TIME)
        timestamp = logAppendTime;

    int size = LegacyRecord.recordSize(magic, key, value);
    AbstractLegacyRecordBatch.writeHeader(appendStream, toInnerOffset(offset), size);

    if (timestampType == TimestampType.LOG_APPEND_TIME)
        timestamp = logAppendTime;
    //进行消息写入 crc
    long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
    recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
    return crc;

LegacyRecord#write 写入消息

    public static long write(DataOutputStream out,
                             byte magic,
                             long timestamp,
                             ByteBuffer key,
                             ByteBuffer value,
                             CompressionType compressionType,
                             TimestampType timestampType) throws IOException {
        byte attributes = computeAttributes(magic, compressionType, timestampType);
        long crc = computeChecksum(magic, attributes, timestamp, key, value);
        write(out, magic, crc, attributes, timestamp, key, value);
        return crc;

LegacyRecord#write 执行写入

private static void write(DataOutputStream out,
                          byte magic,
                          long crc,
                          byte attributes,
                          long timestamp,
                          ByteBuffer key,
                          ByteBuffer value) throws IOException {
    if (magic != RecordBatch.MAGIC_VALUE_V0 && magic != RecordBatch.MAGIC_VALUE_V1)
        throw new IllegalArgumentException("Invalid magic value " + magic);
    if (timestamp < 0 && timestamp != RecordBatch.NO_TIMESTAMP)
        throw new IllegalArgumentException("Invalid message timestamp " + timestamp);

    // write crc
    out.writeInt((int) (crc & 0xffffffffL));
    // write magic value
    // write attributes

    // maybe write timestamp
    if (magic > RecordBatch.MAGIC_VALUE_V0)

    // write the key
    if (key == null) {
    } else {
        int size = key.remaining();
        Utils.writeTo(out, key, size);
    // write the value
    if (value == null) {
    } else {
        int size = value.remaining();
        Utils.writeTo(out, value, size);


private void recordWritten(long offset, long timestamp, int size) {
    if (numRecords == Integer.MAX_VALUE)
        throw new IllegalArgumentException("Maximum number of records per batch exceeded, max records: " + Integer.MAX_VALUE);
    if (offset - baseOffset > Integer.MAX_VALUE)
        throw new IllegalArgumentException("Maximum offset delta exceeded, base offset: " + baseOffset +
                ", last offset: " + offset);

    numRecords += 1;
    uncompressedRecordsSizeInBytes += size;
    lastOffset = offset;

    if (magic > RecordBatch.MAGIC_VALUE_V0 && timestamp > maxTimestamp) {
        maxTimestamp = timestamp;
        offsetOfMaxTimestamp = offset;





public final class RecordAccumulator {
    private final Logger log;
    private volatile boolean closed;
    private final AtomicInteger flushesInProgress;
    private final AtomicInteger appendsInProgress;
    private final int batchSize;
    private final CompressionType compression;
    private final long lingerMs;
    private final long retryBackoffMs;
    private final BufferPool free;
    private final Time time;
    private final ApiVersions apiVersions;
    private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
    private final IncompleteBatches incomplete;
    // The following variables are only accessed by the sender thread, so we don't need to protect them.
    private final Set<TopicPartition> muted;
    private int drainIndex;
    private final TransactionManager transactionManager;




public class Sender implements Runnable {

    private final Logger log;

    /* the state of each nodes connection */
    //kafka客户端  每个节点连接的状态
    private final KafkaClient client;

    /* the record accumulator that batches records */
    //消息收集器 批量消息
    private final RecordAccumulator accumulator;

    /* the metadata for the client */
    private final Metadata metadata;

    /* the flag indicating whether the producer should guarantee the message order on the broker or not. */
    private final boolean guaranteeMessageOrder;

    /* the maximum request size to attempt to send to the server */
    private final int maxRequestSize;

    /* the number of acknowledgements to request from the server */
    private final short acks;

    /* the number of times to retry a failed request before giving up */
    private final int retries;

    /* the clock instance used for getting the time */
    private final Time time;

    /* true while the sender thread is still running */
    private volatile boolean running;

    /* true when the caller wants to ignore all unsent/inflight messages and force close.  */
    private volatile boolean forceClose;

    /* metrics */
    //发送的度量信息 相关指标
    private final SenderMetrics sensors;

    /* the max time to wait for the server to respond to the request*/
    private final int requestTimeout;

    /* The max time to wait before retrying a request which has failed */
    private final long retryBackoffMs;

    /* current request API versions supported by the known brokers */
    private final ApiVersions apiVersions;

    /* all the state related to transactions, in particular the producer id, producer epoch, and sequence numbers */
    private final TransactionManager transactionManager;



     * Run a single iteration of sending
     * 运行一次发送
     * @param now The current POSIX time in milliseconds
    void run(long now) {
        if (transactionManager != null) {
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.

                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                    client.poll(retryBackoffMs, now);
                } else if (transactionManager.hasAbortableError()) {
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);

        //发送生产者数据  重点
        long pollTimeout = sendProducerData(now);
        //执行poll轮询操作 重点
        client.poll(pollTimeout, now);

此时会执行两个方法sendProducerData(now)和poll(pollTimeout, now)。


private long sendProducerData(long now) {
    Cluster cluster = metadata.fetch();

    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    if (!result.unknownLeaderTopics.isEmpty()) {
        for (String topic : result.unknownLeaderTopics)

    // remove any nodes we aren't ready to send to
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));

    // create produce requests
    Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
            this.maxRequestSize, now);
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<ProducerBatch> batchList : batches.values()) {
            for (ProducerBatch batch : batchList)

    List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);

     * 如果先前已将过期的批次发送到代理,则重置生产者ID。
     * 同时更新过期批次的指标。 请参阅@ TransactionState.resetProducerId的文档,
     * 以了解为什么我们需要在此处重置生产者ID。
    if (!expiredBatches.isEmpty())
        log.trace("Expired {} batches in accumulator", expiredBatches.size());
    for (ProducerBatch expiredBatch : expiredBatches) {
        failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
        if (transactionManager != null && expiredBatch.inRetry()) {
            // This ensures that no new batches are drained until the current in flight batches are fully resolved.


     * 如果我们有任何准备发送的节点+具有可发送的数据,请以0超时进行轮询,这样可以立即循环并尝试发送更多数据。
     * 否则,超时将由节点进行分区,该分区具有尚未发送的数据(例如,徘徊,回退)。
     * 请注意,这特别不包括带有可发送数据且尚未准备好发送的节点,因为它们会导致繁忙的循环。
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (!result.readyNodes.isEmpty()) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);

         * 如果已经准备好发送某些分区,则选择时间将为0;
         * 否则,如果某个分区已经积累了一些数据但尚未就绪,则选择时间将是现在与其有效期之间的时间差;
         * 否则,选择时间将是现在与元数据到期时间之间的时间差;
        pollTimeout = 0;
    //发送生产请求 重点
    sendProduceRequests(batches, now);

    return pollTimeout;


 * 从给定的消息批次创建生产请求
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();

        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

             * 必要时向下转换为使用的最小魔法。 通常,在生产者开始构建批处理的时间与我们发送请求的时间之间可能会有延迟,
             * 并且我们可能已根据过时的元数据选择了消息格式。 在最坏的情况下,我们乐观地选择使用新的消息格式,
             * 但是发现代理不支持它,因此需要在客户端上进行下转换,然后再发送。 这旨在处理集群升级周围的极端情况,
             * 在这些情况下,代理可能并不都支持相同的消息格式版本。
             * 例如,如果分区从支持新魔术版本的代理迁移到不支持新魔术版本的代理,则我们将需要转换。
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());

        String nodeId = Integer.toString(destination);
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0, callback);
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);


private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
    String nodeId = clientRequest.destination();
    if (!isInternalRequest) {
         * 如果此请求来自NetworkClient之外,请确认我们可以发送数据。
         * 如果请求是内部请求,我们相信内部代码已完成此验证。
         * 对于某些内部请求,验证会稍有不同(例如,可以在处于READY状态之前发送ApiVersionsRequests。)
        if (!canSendRequest(nodeId))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
    AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
    try {
        NodeApiVersions versionInfo = apiVersions.get(nodeId);
        short version;
        if (versionInfo == null) {
            version = builder.latestAllowedVersion();
            if (discoverBrokerVersions && log.isTraceEnabled())
                log.trace("No version information found when sending {} with correlation id {} to node {}. " +
                        "Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
        } else {
            version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
        doSend(clientRequest, isInternalRequest, now, builder.build(version));
    } catch (UnsupportedVersionException e) {
        log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
                clientRequest.correlationId(), clientRequest.destination(), e);
        ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
                clientRequest.callback(), clientRequest.destination(), now, now,
                false, e, null);


private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
    String nodeId = clientRequest.destination();
    RequestHeader header = clientRequest.makeHeader(request.version());
    if (log.isDebugEnabled()) {
        int latestClientVersion = clientRequest.apiKey().latestVersion();
        if (header.apiVersion() == latestClientVersion) {
            log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request,
                    clientRequest.correlationId(), nodeId);
        } else {
            log.debug("Using older server API v{} to send {} {} with correlation id {} to node {}",
                    header.apiVersion(), clientRequest.apiKey(), request, clientRequest.correlationId(), nodeId);
    Send send = request.toSend(nodeId, header);
    //创建flight请求 飞行中的要求
    InFlightRequest inFlightRequest = new InFlightRequest(


 * Queue the given request for sending in the subsequent {@link #poll(long)} calls
 * 将给定的请求排队,以在随后的{@link #poll(long)}调用中发送
 * @param send The request to send
public void send(Send send) {
    String connectionId = send.destination();
    KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
    if (closingChannels.containsKey(connectionId)) {
        // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
    } else {
        try {
        } catch (Exception e) {
            // update the state for consistency, the channel will be discarded after `close`
            // ensure notification via `disconnected` when `failedSends` are processed in the next poll
            close(channel, CloseMode.DISCARD_NO_NOTIFY);
            if (!(e instanceof CancelledKeyException)) {
                log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                        connectionId, e);
                throw e;


public void setSend(Send send) {
    if (this.send != null)
        throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
    this.send = send;


public List<ClientResponse> poll(long timeout, long now) {

    if (!abortedSends.isEmpty()) {
        // If there are aborted sends because of unsupported version exceptions or disconnects,
        // handle them immediately without waiting for Selector#poll.
        List<ClientResponse> responses = new ArrayList<>();
        return responses;

    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleTimedOutRequests(responses, updatedNow);

    return responses;


public void poll(long timeout) throws IOException {
    if (timeout < 0)
        throw new IllegalArgumentException("timeout should be >= 0");

    boolean madeReadProgressLastCall = madeReadProgressLastPoll;

    boolean dataInBuffers = !keysWithBufferedRead.isEmpty();

    if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
        timeout = 0;

    if (!memoryPool.isOutOfMemory() && outOfMemory) {
        //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
        log.trace("Broker no longer low on memory - unmuting incoming sockets");
        for (KafkaChannel channel : channels.values()) {
            if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
        outOfMemory = false;

    /* check ready keys */
    long startSelect = time.nanoseconds();
    int numReadyKeys = select(timeout);
    long endSelect = time.nanoseconds();
    this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

    if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
        Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();

        // Poll from channels that have buffered data (but nothing more from the underlying socket)
        if (dataInBuffers) {
            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
            Set<SelectionKey> toPoll = keysWithBufferedRead;
            keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
            pollSelectionKeys(toPoll, false, endSelect);

        // Poll from channels where the underlying socket has more data
        pollSelectionKeys(readyKeys, false, endSelect);
        // Clear all selected keys so that they are included in the ready count for the next select

        pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
    } else {
        madeReadProgressLastPoll = true; //no work is also "progress"

    long endIo = time.nanoseconds();
    this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());

    // we use the time at the end of select to ensure that we don't close any connections that
    // have just been processed in pollSelectionKeys

    // Add to completedReceives after closing expired connections to avoid removing
    // channels with completed receives until all staged receives are completed.


void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                       boolean isImmediatelyConnected,
                       long currentTimeNanos) {
    for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
        KafkaChannel channel = channel(key);
        long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;

        // register all per-connection metrics at once
        if (idleExpiryManager != null)
            idleExpiryManager.update(channel.id(), currentTimeNanos);

        boolean sendFailed = false;
        try {

            /* complete any connections that have finished their handshake (either normally or immediately) */
            if (isImmediatelyConnected || key.isConnectable()) {
                if (channel.finishConnect()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
                } else

            /* if channel is not ready finish prepare */
            if (channel.isConnected() && !channel.ready()) {
                try {
                } catch (AuthenticationException e) {
                    throw e;
                if (channel.ready())

            attemptRead(key, channel);

            if (channel.hasBytesBuffered()) {
                //this channel has bytes enqueued in intermediary buffers that we could not read
                //(possibly because no memory). it may be the case that the underlying socket will
                //not come up in the next poll() and so we need to remember this channel for the
                //next poll call otherwise data may be stuck in said buffers forever. If we attempt
                //to process buffered data and no progress is made, the channel buffered status is
                //cleared to avoid the overhead of checking every time.

            /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
            if (channel.ready() && key.isWritable()) {
                Send send = null;
                try {
                    send = channel.write();
                } catch (Exception e) {
                    sendFailed = true;
                    throw e;
                if (send != null) {
                    this.sensors.recordBytesSent(channel.id(), send.size());

            /* cancel any defunct sockets */
            if (!key.isValid())
                close(channel, CloseMode.GRACEFUL);

        } catch (Exception e) {
            String desc = channel.socketDescription();
            if (e instanceof IOException)
                log.debug("Connection with {} disconnected", desc, e);
            else if (e instanceof AuthenticationException) // will be logged later as error by clients
                log.debug("Connection with {} disconnected due to authentication exception", desc, e);
                log.warn("Unexpected error from {}; closing connection", desc, e);
            close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
        } finally {
            maybeRecordTimePerConnection(channel, channelStartTimeNanos);


private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
    //if channel is ready and has bytes to read from socket or buffer, and has no
    //previous receive(s) already staged or otherwise in progress then read from it
     * 如果通道已准备好,并且有要从套接字或缓冲区读取的字节,并且尚未暂存或没有进行任何先前的接收,则从该通道读取
    if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
        && !explicitlyMutedChannels.contains(channel)) {
        NetworkReceive networkReceive;
        while ((networkReceive = channel.read()) != null) {
            madeReadProgressLastPoll = true;
            addToStagedReceives(channel, networkReceive);
        if (channel.isMute()) {
            outOfMemory = true; //channel has muted itself due to memory pressure.
        } else {
            madeReadProgressLastPoll = true;


//网络接收 读操作
public NetworkReceive read() throws IOException {
    NetworkReceive result = null;

    if (receive == null) {
        receive = new NetworkReceive(maxReceiveSize, id, memoryPool);

    if (receive.complete()) {
        result = receive;
        receive = null;
    } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
        //pool must be out of memory, mute ourselves.
    return result;


 * adds a receive to staged receives
 * 向分段接收添加接收
private void addToStagedReceives(KafkaChannel channel, NetworkReceive receive) {
    if (!stagedReceives.containsKey(channel))
        stagedReceives.put(channel, new ArrayDeque<NetworkReceive>());

    Deque<NetworkReceive> deque = stagedReceives.get(channel);


public Send write() throws IOException {
    Send result = null;
    //重点 send(send)
    if (send != null && send(send)) {
        result = send;
        send = null;
    return result;


private boolean send(Send send) throws IOException {
    if (send.completed())

    return send.completed();

    public long writeTo(GatheringByteChannel channel) throws IOException {
        long written = channel.write(buffers);
        if (written < 0)
            throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
        remaining -= written;
        pending = TransportLayers.hasPendingWrites(channel);
        return written;