Kafka源码系列之topic创建分区分配及leader选举

时间:2022-04-25
本文章向大家介绍Kafka源码系列之topic创建分区分配及leader选举,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一,基本介绍

本文讲解依然是基于kafka源码0.8.2.2。假如阅读过前面的文章应该知道,用户的admin指令都是通过Zookeeper发布给kafka的Controller,然后由Controller发布给具体的Broker。

Topic的创建过程亦是如此。本文主要是关注一下几点:

1,分区和副本是在何处,以怎样的方式分配给Broker。

2,kafka的Controller接收到Zookeeper的通知后做了哪些处理。

3,分区的leader和follower是如何选举的。

二,重要类介绍

1,TopicCommand

Topic相关操作的入口类,职责:创建,修改,更新配置,删除,查看都是经由它来向Zookeeper发布相关策略的。

2,KafkaApis

业务处理线程要使用的对象,其handle方法相当于将各种请求,交由相应的处理函数进行处理。

3,KafkaController

KafkaController作为kafka集群的控制者,有且存在一个leader,若干个follower。Leader能够发送具体的指令给follower,具体指令如:RequestKeys.LeaderAndIsrKey,RequestKeys.StopReplicaKey,RequestKeys.UpdateMetadataKey。

4,PartitionStateMachine

分区的状态机,决定者分区的当前状态及状态转移过程。

NonExistentPartition:不存在。该状态的前状态假如有的话,只能是OfflinePartition

NewPartition:分区创建后的状态,前状态是NonExistentPartition。改状态说明分区已经有副本且不存在leader/isr。

OnlinePartition:选举过leader后,处于该状态,前状态可是:OfflinePartition/NewPartition。

OfflinePartition:选举过leader以后,leader挂掉,分区就会处于当前状态,前状态可能是NewPartition/OnlinePartition

三,源码实现介绍

主要是分三个步骤:

A),command创建时Partition均匀分布于Broker的策略

副本分配有两个目标:

1,尽可能将副本均匀分配到Broker上

2,每个分区的副本都分配到不同的Broker上

为了实现这个目标kafka采取下面两个策略:

1,随机选取一个Broker位置作为分配Partition的起始位置,将Partition的第一个副本进行轮询分配

2,将其它副本以一个递增的位移分配到不同的Broker上去

源码执行的具体过程

TopicCommand.main

if(opts.options.has(opts.createOpt))
 createTopic(zkClient, opts)

AdminUtils.createTopic(zkClient, topic, partitions, replicas, configs)

进行partition和Replicas的均匀分配

val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, partitions, replicationFactor)

具体内容是如下:

val ret = new mutable.HashMap[Int, List[Int]]()
//随机选取一个Broker位置作为startIndex
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
//当前分区Id赋值为0
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0

//随机选取Broker数目范围内的位移
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
for (i <- 0 until nPartitions) {
 //只有在所有遍历过Broker数目个分区后才将位移加一
 if (currentPartitionId > 0 && (currentPartitionId % brokerList.size == 0))
    nextReplicaShift += 1
 //当前分区id加上起始位置,对Brokersize取余得到第一个副本的位置
 val firstReplicaIndex = (currentPartitionId + startIndex) % brokerList.size
 var replicaList = List(brokerList(firstReplicaIndex))
 for (j <- 0 until replicationFactor - 1)
 //计算出每个副本的位置 计算方法是replicaIndex:
    //val shift = 1 + (nextReplicaShift + j) % ( brokerList.size - 1)
    //(firstReplicaIndex + shift) %  brokerList.size
 replicaList ::= brokerList(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerList.size))
  ret.put(currentPartitionId, replicaList.reverse)
 //分区id加一
 currentPartitionId = currentPartitionId + 1
}
ret.toMap

将配置和分配策略写到Zookeeper上去

AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, replicaAssignment, topicConfig)

该方法的具体内容如下:

写配置,Zookeeper的目录是:/config/topics/TopicName

writeTopicConfig(zkClient, topic, config)

写分配策略,Zookeeper的目录是:/brokers/topics/TopicName

writeTopicPartitionAssignment(zkClient, topic, partitionReplicaAssignment, update)

B),kafka Controller监听到topic创建事件后的处理

KafkaController的PartitionStateMachine对象内部有一个Zookeeper的listener专门监听新增topic事件。TopicChangeListener。

获取新增topic

val newTopics = currentChildren -- controllerContext.allTopics

获取分区副本分配策略HashMap[TopicAndPartition, Seq[Int]]

val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)

进入具体的操作

if(newTopics.size > 0)
 //进入具体的操作
 controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)

订阅新增topic的分区变动事件

// subscribe to partition changes 注册指定topic的分区变动事件监听器
topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))

处理新增分区onNewPartitionCreation

该方法主要做两件事:

1,将新建分区的状态转化为NewPartition状态

partitionStateMachine.handleStateChanges(newPartitions, NewPartition)

进入处理函数得到

partitions.foreach { topicAndPartition =>
  handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
}
case NewPartition =>
 //指定TopicAndPartition 获取副本
 assignReplicasToPartitions(topic, partition)
 partitionState.put(topicAndPartition, NewPartition)
 val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")

AssgnReplicasToPartition方法的具体内容,主要是先获取分区所在的Brokerid序列,然后

val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
controllerContext.partitionReplicaAssignment += TopicAndPartition(topic, partition) -> assignedReplicas

2,将新建分区的状态从NewPartition到OnlinePartition状态

partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)

在handleStateChange,中具体处理是

case OnlinePartition =>
  assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
 partitionState(topicAndPartition) match {
 case NewPartition =>
 // initialize leader and isr path for new partition
 initializeLeaderAndIsrForPartition(topicAndPartition)

在initializeLeaderAndIsrForPartition.第一个seq中的Broker当做leader

val leader = liveAssignedReplicas.head //第一个副本作为leader
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList),
 controller.epoch)

更新具体分区的状态信息

[zk: localhost:2181(CONNECTED) 0] get /brokers/topics/innerBashData/partitions/1/state
//        {"controller_epoch":6,"leader":6,"version":1,"leader_epoch":24,"isr":[7,6]}
 ZkUtils.createPersistentPath(controllerContext.zkClient,
 ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
 ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))

topic 分区 副本 放入leaderAndIsrRequestMap,以便我们可以通过Brokerid找到

brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic, topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)

发信息给需要的BrokerID

leaderAndIsrRequestMap.foreach { m =>
 val broker = m._1
 val partitionStateInfos = m._2.toMap
 val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
 val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id))
 val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId)
 for (p <- partitionStateInfos) {
 val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
 stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request %s with correlationId %d to broker %d " +
 "for partition [%s,%d]").format(controllerId, controllerEpoch, typeOfRequest,
 p._2.leaderIsrAndControllerEpoch, correlationId, broker,
 p._1._1, p._1._2))
      }
//      给具体的Broker发送LeaderAndIsrRequest
 controller.sendRequest(broker, leaderAndIsrRequest, null)
    }

C),Broker leader和follower的产生过程

在Broker接收到Controller的LeaderAndIsrRequest消息后,交由kafkaApis的handle处理

case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)

当前Broker成为副本的leader或者follower的入口函数

val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)

当前Broker能不能成为Broker,取决于Brokerid是否与leader分配的Brokerid一致,一致就会成为leader,否则follower

val partitionsTobeLeader = partitionState
  .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)

真正的进入leader或者follower的过程

if (!partitionsTobeLeader.isEmpty)
  makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
if (!partitionsToBeFollower.isEmpty)
  makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)

在接收到第一个leaderisrrequest后初始化 highwatermark 线程。这可以保证所有的分区都被填充,通过避免恶性竞争启动Checkpointing之前。

if (!hwThreadInitialized) {
  startHighWaterMarksCheckPointThread()
 hwThreadInitialized = true
}

下面具体讲解makeleaders和makeFollowers方法

使当前Broker成为给定分区的leader ,需要做以下几个处理:

* 1,停止掉这些分区的fetchers

* 2,更新缓存的当前分区的元数据

* 3,将分区加入leader 分区集合

// First stop fetchers for all the partitions replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))

// Update the partition information to be the leader partitionState.foreach{ case (partition, partitionStateInfo) => partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}

Makeleader方法具体的操作了一个副本成为leader的过程:

主要做了以下几件事情:

* 记录LeaderShip 决议的时代。在更新isr并维护Zookeeperpath的中的Controller时代

* 增加新的副本

* 移除已经被Controller移除的已分配副本

* 为新的leader副本构建高水位元数据

* 为远程副本重置logendoffset

* 由于isr可能将为1,我们需要增加高水位

具体源码如下:

def makeLeader(controllerId: Int,
 partitionStateInfo: PartitionStateInfo, correlationId: Int,
 offsetManager: OffsetManager): Boolean = {
 inWriteLock(leaderIsrUpdateLock) {
 val allReplicas = partitionStateInfo.allReplicas
 val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
 val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
 // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
    // to maintain the decision maker controller's epoch in the zookeeper path
 controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
 // add replicas that are new
 allReplicas.foreach(replica => getOrCreateReplica(replica))
 val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
 // remove assigned replicas that have been removed by the controller
 (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
 inSyncReplicas = newInSyncReplicas
 leaderEpoch = leaderAndIsr.leaderEpoch
 zkVersion = leaderAndIsr.zkVersion
 leaderReplicaIdOpt = Some(localBrokerId)
 // construct the high watermark metadata for the new leader replica
 val newLeaderReplica = getReplica().get
    newLeaderReplica.convertHWToLocalOffsetMetadata()
 // reset log end offset for remote replicas
 assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = LogOffsetMetadata.UnknownOffsetMetadata)
 // we may need to increment high watermark since ISR could be down to 1
 maybeIncrementLeaderHW(newLeaderReplica)
 if (topic == OffsetManager.OffsetsTopicName)
      offsetManager.loadOffsetsFromLog(partitionId)
 true
 }
}

当前Broker成为给定分区的follower要做要做以下几个处理:

* 1,将分区从leader partition 集合中移除

* 2,将副本标记为follower ,目的是不让生产者继续往该副本生产消息

* 3,停止掉该分区的所有fetcher,目的是不让副本fetcher线程往该副本写数据。

* 4,清空当前分区的log和Checkpoint offsets

* 5,假如Broker没有挂掉,增加从新leader获取数据的副本fetcher线程

具体代码如下:

将分区从leader partition 集合中移除

将副本标记为follower ,目的是不让生产者继续往该副本生产消息

partitionState.foreach{ case (partition, partitionStateInfo) =>
 val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
 val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
  leaders.find(_.id == newLeaderBrokerId) match {
 // Only change partition state when the leader is available
 case Some(leaderBroker) =>
 if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
        partitionsToMakeFollower += partition

当前分区的log和Checkpoint offsets

replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))

清空当前分区的log和Checkpoint offsets

logManager.truncateTo(partitionsToMakeFollower.map(partition => 
(new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)

假如Broker没有挂掉,增加从新leader获取数据的副本fetcher线程

val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
 new TopicAndPartition(partition) -> BrokerAndInitialOffset(
    leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
 partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

具体的makeFollower方法中

通过设置leader和ISR为空,使本地副本成为Follower

主要做了以下几件事情:

* 记录LeaderShip 决议的时代。在更新isr并维护Zookeeperpath的中的Controller时代

* 增加新的副本

* 移除已经被Controller移除的已分配副本

val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
val newLeaderBrokerId: Int = leaderAndIsr.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r))
// remove assigned replicas that have been removed by the controller
(assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
inSyncReplicas = Set.empty[Replica]
leaderEpoch = leaderAndIsr.leaderEpoch
zkVersion = leaderAndIsr.zkVersion

leaderReplicaIdOpt.foreach { leaderReplica =>
 if (topic == OffsetManager.OffsetsTopicName &&
 /* if we are making a leader->follower transition */
 leaderReplica == localBrokerId)
    offsetManager.clearOffsetsInPartition(partitionId)
}

if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
 false
}
else {
 leaderReplicaIdOpt = Some(newLeaderBrokerId)
 true
}

四,总结

本文主要是以topic的创建过程,讲解分区及副本在集群Broker上的分布的实现,顺便讲解新建topic的话分区的leader的选举方法,及我们的副本成为leader和Follower的要素。

这个过程实际上也是基于Zookeeper实现了订阅发布系统,发布者是TopicCommand类,订阅者是kafka的Controller类。再由kafka的Controller进行分区leader选举(副本列表第一个),然后给TopicCommand已经指定的各个Broker Follower发送LeaderAndIsrRequest,由根据我们TopicCommand中分区的分配的具体Broker去启动副本为leader(leader的被分配的Brokerid和当前Broker的id相等)或者Follower。