kafka学习三-broker的入口

时间:2022-07-28
本文章向大家介绍kafka学习三-broker的入口,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

前面我们通过学习scala知道通常如果想运行scala程序,必然会有一个入口,而这个入口可以通过kafka的启动脚本kafka-server-start.sh可以看到运行的就是kafka.Kafka,也即Kafka.scala。和RocketMQ一样,kafka的broker也是在服务端,而生产者和消费者在client端中。而此时的疑问是kafka的网络准备、kafka中的broker和生产者、消费者又是怎样联系起来的呢?

这里首先kafka会进行启动,此时找到入口之后,我们应该知道kafka最需要做两件事,一个是对网络上的相关操作进行启动SocketServer,此时与业务无关,而另一个则是和业务相关的KafkaApis,此时会进行模式匹配,如果匹配到哪个请求,则进行哪个请求的具体实现处理。下面来看具体的源码,重点把握KafkaServer。

Kafka.scala

 def main(args: Array[String]): Unit = {
    try {
      val serverProps = getPropsFromArgs(args)
      val kafkaServerStartable = KafkaServerStartable.fromProps(serverProps)

      try {
        if (!OperatingSystem.IS_WINDOWS && !Java.isIbmJdk)
          new LoggingSignalHandler().register()
      } catch {
        case e: ReflectiveOperationException =>
          warn("Failed to register optional signal handler that logs a message when the process is terminated " +
            s"by a signal. Reason for registration failure is: $e", e)
      }

      // attach shutdown handler to catch terminating signals as well as normal termination
      Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
        override def run(): Unit = kafkaServerStartable.shutdown()
      })

      //kafkaServer端启动
      kafkaServerStartable.startup()
      kafkaServerStartable.awaitShutdown()
    }
    catch {
      case e: Throwable =>
        fatal("Exiting Kafka due to fatal exception", e)
        Exit.exit(1)
    }
    Exit.exit(0)
  }
}

kafkaServerStartable.startup()

//启动kafka服务端
def startup() {
  try server.startup()
  catch {
    case _: Throwable =>
      // KafkaServer.startup() calls shutdown() in case of exceptions, so we invoke `exit` to set the status code
      fatal("Exiting Kafka.")
      Exit.exit(1)
  }
}

KafkaServer kafka的服务端,重要

里面启动了很多重要的方法,而这些方法是后面联系kafka的生产者和消费者的

/**
 * Start up API for bringing up a single instance of the Kafka server.
 * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
 */
def startup() {
  try {
    info("starting")

    if (isShuttingDown.get)
      throw new IllegalStateException("Kafka server is still shutting down, cannot re-start!")

    if (startupComplete.get)
      return

    val canStartup = isStartingUp.compareAndSet(false, true)
    if (canStartup) {
      brokerState.newState(Starting)

      /* setup zookeeper */
      //初始化zookeeper客户端  
      initZkClient(time)

      /* Get or create cluster_id */
      _clusterId = getOrGenerateClusterId(zkClient)
      info(s"Cluster ID = $clusterId")

      /* generate brokerId */
      val (brokerId, initialOfflineDirs) = getBrokerIdAndOfflineDirs
      config.brokerId = brokerId
      logContext = new LogContext(s"[KafkaServer id=${config.brokerId}] ")
      this.logIdent = logContext.logPrefix

      // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
      // applied after DynamicConfigManager starts.
      //从zookeeper中初始化动态broker配置信息  
      config.dynamicConfig.initialize(zkClient)

      /* start scheduler */
      //启动kafka定时任务  
      kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
      kafkaScheduler.startup()

      /* create and configure metrics */
      val reporters = new util.ArrayList[MetricsReporter]
      reporters.add(new JmxReporter(jmxPrefix))
      val metricConfig = KafkaServer.metricConfig(config)
      metrics = new Metrics(metricConfig, reporters, time, true)

      /* register broker metrics */
      _brokerTopicStats = new BrokerTopicStats

      quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
      notifyClusterListeners(kafkaMetricsReporters ++ metrics.reporters.asScala)

      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)

      /* start log manager */
      //启动日志管理  
      logManager = LogManager(config, initialOfflineDirs, zkClient, brokerState, kafkaScheduler, time, brokerTopicStats, logDirFailureChannel)
      logManager.startup()

      metadataCache = new MetadataCache(config.brokerId)
      // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
      // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
      tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
      credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)

      // Create and start the socket server acceptor threads so that the bound port is known.
      // Delay starting processors until the end of the initialization sequence to ensure
      // that credentials have been loaded before processing authentications.
      //启动sockerServer 重要  
      socketServer = new SocketServer(config, metrics, time, credentialProvider)
      socketServer.startup(startupProcessors = false)

      /* start replica manager */
      //启动replica管理  
      replicaManager = createReplicaManager(isShuttingDown)
      replicaManager.startup()

      val brokerInfo = createBrokerInfo
      zkClient.registerBrokerInZk(brokerInfo)

      // Now that the broker id is successfully registered, checkpoint it
      checkpointBrokerId(config.brokerId)

      /* start token manager */
      //启动token管理  
      tokenManager = new DelegationTokenManager(config, tokenCache, time , zkClient)
      tokenManager.startup()

      /* start kafka controller */
      //启动kafka控制器  
      kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, tokenManager, threadNamePrefix)
      kafkaController.startup()

      adminManager = new AdminManager(config, metrics, metadataCache, zkClient)

      /* start group coordinator */
      // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
      //启动组协调器  
      groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
      groupCoordinator.startup()

      /* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
      // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
      //启动事务协调器
      transactionCoordinator = TransactionCoordinator(config, replicaManager, new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"), zkClient, metrics, metadataCache, Time.SYSTEM)
      transactionCoordinator.startup()

      /* Get the authorizer and initialize it if one is specified.*/
      authorizer = Option(config.authorizerClassName).filter(_.nonEmpty).map { authorizerClassName =>
        val authZ = CoreUtils.createObject[Authorizer](authorizerClassName)
        authZ.configure(config.originals())
        authZ
      }

      val fetchManager = new FetchManager(Time.SYSTEM,
        new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
          KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

      /* start processing requests */
      //创建新的kafkaApi对象  
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
        kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
        fetchManager, brokerTopicStats, clusterId, time, tokenManager)

      //创建新的kafka请求处理器池  
      requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
        config.numIoThreads)

      Mx4jLoader.maybeLoad()

      /* Add all reconfigurables for config change notification before starting config handlers */
      config.dynamicConfig.addReconfigurables(this)

      /* start dynamic config manager */
      dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
                                                         ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),
                                                         ConfigType.User -> new UserConfigHandler(quotaManagers, credentialProvider),
                                                         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))

      // Create the config manager. start listening to notifications
      //启动动态配置管理  
      dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
      dynamicConfigManager.startup()

      //启动处理器  
      socketServer.startProcessors()
      brokerState.newState(RunningAsBroker)
      shutdownLatch = new CountDownLatch(1)
      //启动完整标识设置  
      startupComplete.set(true)
      isStartingUp.set(false)
      AppInfoParser.registerAppInfo(jmxPrefix, config.brokerId.toString, metrics)
      info("started")
    }
  }
  catch {
    case e: Throwable =>
      fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
      isStartingUp.set(false)
      shutdown()
      throw e
  }
}

SocketServer

def startup(startupProcessors: Boolean = true) {
  this.synchronized {
    connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
    createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
    if (startupProcessors) {
      //重要  
      startProcessors()
    }
  }

  newGauge("NetworkProcessorAvgIdlePercent",
    new Gauge[Double] {

      def value = SocketServer.this.synchronized {
        val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
          metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
        }
        ioWaitRatioMetricNames.map { metricName =>
          Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
        }.sum / processors.size
      }
    }
  )
  newGauge("MemoryPoolAvailable",
    new Gauge[Long] {
      def value = memoryPool.availableMemory()
    }
  )
  newGauge("MemoryPoolUsed",
    new Gauge[Long] {
      def value = memoryPool.size() - memoryPool.availableMemory()
    }
  )
  info("Started " + acceptors.size + " acceptor threads")
}

启动处理器SocketServer

def startProcessors(): Unit = synchronized {
  acceptors.values.asScala.foreach { _.startProcessors() }
  info(s"Started processors for ${acceptors.size} acceptors")
}

启动处理器

private[network] def startProcessors(): Unit = synchronized {
  if (!processorsStarted.getAndSet(true)) {
    startProcessors(processors)
  }
}

继续启动

private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
  processors.foreach { processor =>
    KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
      processor).start()
  }
}

Processor

private[kafka] class Processor(val id: Int,
                               time: Time,
                               maxRequestSize: Int,
                               requestChannel: RequestChannel,
                               connectionQuotas: ConnectionQuotas,
                               connectionsMaxIdleMs: Long,
                               listenerName: ListenerName,
                               securityProtocol: SecurityProtocol,
                               config: KafkaConfig,
                               metrics: Metrics,
                               credentialProvider: CredentialProvider,
                               memoryPool: MemoryPool,
                               logContext: LogContext) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {

  import Processor._
  private object ConnectionId {
    def fromString(s: String): Option[ConnectionId] = s.split("-") match {
      case Array(local, remote, index) => BrokerEndPoint.parseHostPort(local).flatMap { case (localHost, localPort) =>
        BrokerEndPoint.parseHostPort(remote).map { case (remoteHost, remotePort) =>
          ConnectionId(localHost, localPort, remoteHost, remotePort, Integer.parseInt(index))
        }
      }
      case _ => None
    }
  }

  private[network] case class ConnectionId(localHost: String, localPort: Int, remoteHost: String, remotePort: Int, index: Int) {
    override def toString: String = s"$localHost:$localPort-$remoteHost:$remotePort-$index"
  }

  private val newConnections = new ConcurrentLinkedQueue[SocketChannel]()
  private val inflightResponses = mutable.Map[String, RequestChannel.Response]()
  private val responseQueue = new LinkedBlockingDeque[RequestChannel.Response]()

  private[kafka] val metricTags = mutable.LinkedHashMap(
    ListenerMetricTag -> listenerName.value,
    NetworkProcessorMetricTag -> id.toString
  ).asJava

  newGauge(IdlePercentMetricName,
    new Gauge[Double] {
      def value = {
        Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)))
          .fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
      }
    },
    // for compatibility, only add a networkProcessor tag to the Yammer Metrics alias (the equivalent Selector metric
    // also includes the listener name)
    Map(NetworkProcessorMetricTag -> id.toString)
  )

  private val selector = createSelector(
    ChannelBuilders.serverChannelBuilder(listenerName,
      listenerName == config.interBrokerListenerName,
      securityProtocol,
      config,
      credentialProvider.credentialCache,
      credentialProvider.tokenCache))
  // Visible to override for testing
  protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
    channelBuilder match {
      case reconfigurable: Reconfigurable => config.addReconfigurable(reconfigurable)
      case _ =>
    }
    new KSelector(
      maxRequestSize,
      connectionsMaxIdleMs,
      metrics,
      time,
      "socket-server",
      metricTags,
      false,
      true,
      channelBuilder,
      memoryPool,
      logContext)
  }

  // Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The index is a
  // non-negative incrementing value that ensures that even if remotePort is reused after a connection is
  // closed, connection ids are not reused while requests from the closed connection are being processed.
  private var nextConnectionIndex = 0

  //重写run方法  
  override def run() {
    //启动完整  
    startupComplete()
    try {
      while (isRunning) {
        try {
          // setup any new connections that have been queued up
          configureNewConnections()
          // register any new responses for writing
          //处理新的响应  
          processNewResponses()
          //执行poll方法  
          poll()
          //处理完整接收方法  
          processCompletedReceives()
          //处理完整的发送方法  
          processCompletedSends()
          processDisconnected()
        } catch {
          // We catch all the throwables here to prevent the processor thread from exiting. We do this because
          // letting a processor exit might cause a bigger impact on the broker. This behavior might need to be
          // reviewed if we see an exception that needs the entire broker to stop. Usually the exceptions thrown would
          // be either associated with a specific socket channel or a bad request. These exceptions are caught and
          // processed by the individual methods above which close the failing channel and continue processing other
          // channels. So this catch block should only ever see ControlThrowables.
          case e: Throwable => processException("Processor got uncaught exception.", e)
        }
      }
    } finally {
      debug("Closing selector - processor " + id)
      CoreUtils.swallow(closeAll(), this, Level.ERROR)
      shutdownComplete()
    }
  }

  private def processException(errorMessage: String, throwable: Throwable) {
    throwable match {
      case e: ControlThrowable => throw e
      case e => error(errorMessage, e)
    }
  }

  private def processChannelException(channelId: String, errorMessage: String, throwable: Throwable) {
    if (openOrClosingChannel(channelId).isDefined) {
      error(s"Closing socket for $channelId because of error", throwable)
      close(channelId)
    }
    processException(errorMessage, throwable)
  }

    //处理新的响应信息
  private def processNewResponses() {
    var curr: RequestChannel.Response = null
    while ({curr = dequeueResponse(); curr != null}) {
      val channelId = curr.request.context.connectionId
      try {
        curr.responseAction match {
          case RequestChannel.NoOpAction =>
            // There is no response to send to the client, we need to read more pipelined requests
            // that are sitting in the server's socket buffer
            updateRequestMetrics(curr)
            trace("Socket server received empty response to send, registering for read: " + curr)
            openOrClosingChannel(channelId).foreach(c => selector.unmute(c.id))
          case RequestChannel.SendAction =>
            val responseSend = curr.responseSend.getOrElse(
              throw new IllegalStateException(s"responseSend must be defined for SendAction, response: $curr"))
            sendResponse(curr, responseSend)
          case RequestChannel.CloseConnectionAction =>
            updateRequestMetrics(curr)
            trace("Closing socket connection actively according to the response code.")
            close(channelId)
        }
      } catch {
        case e: Throwable =>
          processChannelException(channelId, s"Exception while processing response for $channelId", e)
      }
    }
  }

  /* `protected` for test usage */
  //发送响应  
  protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
    val connectionId = response.request.context.connectionId
    trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
    // `channel` can be None if the connection was closed remotely or if selector closed it for being idle for too long
    if (channel(connectionId).isEmpty) {
      warn(s"Attempting to send response via channel for which there is no open connection, connection id $connectionId")
      response.request.updateRequestMetrics(0L, response)
    }
    // Invoke send for closingChannel as well so that the send is failed and the channel closed properly and
    // removed from the Selector after discarding any pending staged receives.
    // `openOrClosingChannel` can be None if the selector closed the connection because it was idle for too long
    if (openOrClosingChannel(connectionId).isDefined) {
      selector.send(responseSend)
      inflightResponses += (connectionId -> response)
    }
  }

  //poll方法  
  private def poll() {
    try selector.poll(300)
    catch {
      case e @ (_: IllegalStateException | _: IOException) =>
        // The exception is not re-thrown and any completed sends/receives/connections/disconnections
        // from this poll will be processed.
        error(s"Processor $id poll failed due to illegal state or IO exception")
    }
  }

  //处理完整接收  
  private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
            val header = RequestHeader.parse(receive.payload)
            val context = new RequestContext(header, receive.source, channel.socketAddress,
              channel.principal, listenerName, securityProtocol)
            val req = new RequestChannel.Request(processor = id, context = context,
              startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
            requestChannel.sendRequest(req)
            selector.mute(receive.source)
          case None =>
            // This should never happen since completed receives are processed immediately after `poll()`
            throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
        }
      } catch {
        // note that even though we got an exception, we can assume that receive.source is valid.
        // Issues with constructing a valid receive object were handled earlier
        case e: Throwable =>
          processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
      }
    }
  }

  //处理完整发送  
  private def processCompletedSends() {
    selector.completedSends.asScala.foreach { send =>
      try {
        val resp = inflightResponses.remove(send.destination).getOrElse {
          throw new IllegalStateException(s"Send for ${send.destination} completed, but not in `inflightResponses`")
        }
        updateRequestMetrics(resp)
        selector.unmute(send.destination)
      } catch {
        case e: Throwable => processChannelException(send.destination,
            s"Exception while processing completed send to ${send.destination}", e)
      }
    }
  }

  private def updateRequestMetrics(response: RequestChannel.Response) {
    val request = response.request
    val networkThreadTimeNanos = openOrClosingChannel(request.context.connectionId).fold(0L)(_.getAndResetNetworkThreadTimeNanos())
    request.updateRequestMetrics(networkThreadTimeNanos, response)
  }

  private def processDisconnected() {
    selector.disconnected.keySet.asScala.foreach { connectionId =>
      try {
        val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
          throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
        }.remoteHost
        inflightResponses.remove(connectionId).foreach(updateRequestMetrics)
        // the channel has been closed by the selector but the quotas still need to be updated
        connectionQuotas.dec(InetAddress.getByName(remoteHost))
      } catch {
        case e: Throwable => processException(s"Exception while processing disconnection of $connectionId", e)
      }
    }
  }

  /**
   * Close the connection identified by `connectionId` and decrement the connection count.
   * The channel will be immediately removed from the selector's `channels` or `closingChannels`
   * and no further disconnect notifications will be sent for this channel by the selector.
   * If responses are pending for the channel, they are dropped and metrics is updated.
   * If the channel has already been removed from selector, no action is taken.
   */
  private def close(connectionId: String): Unit = {
    openOrClosingChannel(connectionId).foreach { channel =>
      debug(s"Closing selector connection $connectionId")
      val address = channel.socketAddress
      if (address != null)
        connectionQuotas.dec(address)
      selector.close(connectionId)

      inflightResponses.remove(connectionId).foreach(response => updateRequestMetrics(response))
    }
  }

  /**
   * Queue up a new connection for reading
   */
  def accept(socketChannel: SocketChannel) {
    newConnections.add(socketChannel)
    wakeup()
  }

  /**
   * Register any new connections that have been queued up
   */
  private def configureNewConnections() {
    while (!newConnections.isEmpty) {
      val channel = newConnections.poll()
      try {
        debug(s"Processor $id listening to new connection from ${channel.socket.getRemoteSocketAddress}")
        selector.register(connectionId(channel.socket), channel)
      } catch {
        // We explicitly catch all exceptions and close the socket to avoid a socket leak.
        case e: Throwable =>
          val remoteAddress = channel.socket.getRemoteSocketAddress
          // need to close the channel here to avoid a socket leak.
          close(channel)
          processException(s"Processor $id closed connection from $remoteAddress", e)
      }
    }
  }

  /**
   * Close the selector and all open connections
   */
  private def closeAll() {
    selector.channels.asScala.foreach { channel =>
      close(channel.id)
    }
    selector.close()
    removeMetric(IdlePercentMetricName, Map(NetworkProcessorMetricTag -> id.toString))
  }

  // 'protected` to allow override for testing
  protected[network] def connectionId(socket: Socket): String = {
    val localHost = socket.getLocalAddress.getHostAddress
    val localPort = socket.getLocalPort
    val remoteHost = socket.getInetAddress.getHostAddress
    val remotePort = socket.getPort
    val connId = ConnectionId(localHost, localPort, remoteHost, remotePort, nextConnectionIndex).toString
    nextConnectionIndex = if (nextConnectionIndex == Int.MaxValue) 0 else nextConnectionIndex + 1
    connId
  }

  private[network] def enqueueResponse(response: RequestChannel.Response): Unit = {
    responseQueue.put(response)
    wakeup()
  }

  private def dequeueResponse(): RequestChannel.Response = {
    val response = responseQueue.poll()
    if (response != null)
      response.request.responseDequeueTimeNanos = Time.SYSTEM.nanoseconds
    response
  }

  private[network] def responseQueueSize = responseQueue.size

  // Only for testing
  private[network] def inflightResponseCount: Int = inflightResponses.size

  // Visible for testing
  // Only methods that are safe to call on a disconnected channel should be invoked on 'openOrClosingChannel'.
  private[network] def openOrClosingChannel(connectionId: String): Option[KafkaChannel] =
     Option(selector.channel(connectionId)).orElse(Option(selector.closingChannel(connectionId)))

  /* For test usage */
  private[network] def channel(connectionId: String): Option[KafkaChannel] =
    Option(selector.channel(connectionId))

  // Visible for testing
  private[network] def numStagedReceives(connectionId: String): Int =
    openOrClosingChannel(connectionId).map(c => selector.numStagedReceives(c)).getOrElse(0)

  /**
   * Wakeup the thread for selection.
   */
  override def wakeup() = selector.wakeup()

  override def shutdown(): Unit = {
    super.shutdown()
    removeMetric("IdlePercent", Map("networkProcessor" -> id.toString))
  }

}

此时kafka已经启动,那还有重要的业务逻辑处理KafkaApis

kafkaApis

/**
 * Top-level method that handles all requests and multiplexes to the right api
 * 顶级方法,用于处理所有请求并多路复用到正确的api
 */
def handle(request: RequestChannel.Request) {
  try {
    //使用Scala的模式匹配,配置到每一个对应的请求,进行相应的处理
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
      s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
    request.header.apiKey match {
      //匹配生产,处理生产请求
      case ApiKeys.PRODUCE => handleProduceRequest(request)
      //匹配获取消息,处理获取请求
      case ApiKeys.FETCH => handleFetchRequest(request)
      //匹配偏移量列表,处理偏移量列表请求
      case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
      //匹配元数据,处理主体元数据请求
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      //匹配主和正在同步的副本  ISR:in sync replica 正在同步副本,AR: all replica 所有副本
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      //停止复制,处理停止复制请求
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      //更新元数据
      case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
      //控制器关闭
      case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
      //偏移量提交请求
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      //偏移量获取请求
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      //找到协调器
      case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
      //加入组请求
      case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request)
      //心跳请求
      case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
      //离开组请求
      case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
      //同步组请求
      case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request)
      //处理描述组请求
      case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
      //组列表请求
      case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
      //处理Sasl握手请求
      case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
      case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
      //创建主题请求
      case ApiKeys.CREATE_TOPICS => handleCreateTopicsRequest(request)
      //删除主题请求
      case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
      //删除消息请求
      case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
      //初始化生产者id请求
      case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request)
      //处理leaderEpoch请求的偏移
      case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
      //添加分区到事务请求
      case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request)
      //添加偏移量到事务请求
      case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request)
      case ApiKeys.END_TXN => handleEndTxnRequest(request)
      case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request)
      case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request)
      case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
      case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
      case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
      case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
      case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
      //处理更改副本日志目录请求  
      case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
      //描述日志目录请求  
      case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
      case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
      //创建新分区请求
      case ApiKeys.CREATE_PARTITIONS => handleCreatePartitionsRequest(request)
      case ApiKeys.CREATE_DELEGATION_TOKEN => handleCreateTokenRequest(request)
      case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
      case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request)
      case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
      //删除队请求
      case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request)
    }
  } catch {
    case e: FatalExitError => throw e
    case e: Throwable => handleError(request, e)
  } finally {
    request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}

说到这里,我们还是没有看到kafka中Broker存储和生产者、消费者的联系,根据我们知道的,kafka的生产者和消费者和存储的关系必然会通过LogFile和IndexFile进行联系。但此时我们依然没有看到LogFile和IndexFile。

要研究存储和生产者和消费者的联系,必然我们需要研究Log,日志相关的信息。而根据认识,我们知道一个主题,对应多个分区,而一个分区中会有多个日志目录,而一个日志目录,会有日志文件和索引文件、时间戳文件、恢复文件等,而一个Log文件有多个LogSegment。