Kafka源码系列之副本同步机制及isr列表更新

时间:2022-04-25
本文章向大家介绍Kafka源码系列之副本同步机制及isr列表更新,主要内容包括其使用实例、应用技巧、基本知识点总结和需要注意事项,具有一定的参考价值,需要的朋友可以参考一下。

一,基本思路

<Kafka源码系列之Consumer高级API性能分析>读过这篇文章的同学必然会对本篇文件较为熟悉,因为该篇讲的副本同步,实际上也是基于SimpleConsumer的,而且是维护了Broker总数个ReplicaFetcherThread(类似于ConsumerFetcherThread)。负责具体的数据拉取,然后追加到本地。每个ReplicaFetcherThread,维护了一个SimpleConsumer,负责固定的从一个Broker上同步本Broker存在的Follower副本的leader上的数据。

二,重要类介绍

1,kafkaServer

代表一个kafka Broker的生命周期。除了所有的必要启动和停止一个kafka node的功能

2,ReplicaManager

管理副本的动作,比如,启动副本为leader或者Follower,停止副本,从leader同步数据等。

3,ReplicaFetcherManager

继承自AbstractFetcherManager。负责创建和停止ReplicaFetcherThread。

4,ReplicaFetcherThread

继承自AbstractFetcherThread。负责从leader同步数据,追加到本地日志。拥有一个SimpleConsumer。

功能类似ConsumerFetcherThread。

三,源码过程整理

1,ReplicaManager创建和启动

在kafkaServer中创建并且启动了ReplicaManager

replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

replicaManager.startup()

启动的实际上是一个定时调度线程,周期性的检测是否有副本掉队,进而收缩isr列表

// start ISR expiration thread
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)

实际是在leader节点才会用到该功能

maybeShrinkIsr会遍历所有分区,判断是否收缩isr

allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))

进入maybeShrinkIsr会发现,它会首先判断是不是leader在本地。是的话进行处理,否则什么都不做。

leaderReplicaIfLocal() match {
 case Some(leaderReplica) =>
 val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs, replicaMaxLagMessages)
 if(outOfSyncReplicas.size > 0) {
 val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
 assert(newInSyncReplicas.size > 0)
      info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
 inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
 // update ISR in zk and in cache
 updateIsr(newInSyncReplicas)
 // we may need to increment high watermark since ISR could be down to 1
 maybeIncrementLeaderHW(leaderReplica)
      replicaManager.isrShrinkRate.mark()
    }
 case None => // do nothing if no longer leader
}

判断一个副本是否从isr列表中移除有两个点,具体实现方法是getOutOfSyncReplicas:

A),卡住了

val leaderLogEndOffset = leaderReplica.logEndOffset
val candidateReplicas = inSyncReplicas - leaderReplica
// Case 1 above
val stuckReplicas = candidateReplicas.filter(r => (time.milliseconds - r.logEndOffsetUpdateTimeMs) > keepInSyncTimeMs)

B),同步速度跟不上

val slowReplicas = candidateReplicas.filter(r =>
  r.logEndOffset.messageOffset >= 0 &&
  leaderLogEndOffset.messageOffset - r.logEndOffset.messageOffset > keepInSyncMessages)

找到,落后的副本后,就将该副本从isr列表中移除,并更新高水位

if(outOfSyncReplicas.size > 0) {
 val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
 assert(newInSyncReplicas.size > 0)
  info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
 inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
 // update ISR in zk and in cache
 updateIsr(newInSyncReplicas)
 // we may need to increment high watermark since ISR could be down to 1
 maybeIncrementLeaderHW(leaderReplica)
  replicaManager.isrShrinkRate.mark()
}

2,ReplicaFetcherThread创建的过程

每次Broker上有新的Follower产生的时候会调用makeFollowers,这个在我的另一篇文章里面可以详细的了解到<Kafka源码系列之topic创建分区分配及leader选举>。

会调用ReplicaFetcherManager的addFetcherForPartitions方法

replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)

会先判断链接到该分区leader所在Broker的ReplicaFetcherThread是否存在,存在直接将该topic的分区添加到该Broker的ReplicaFetcherThread,不存在就先创建再添加

val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => 
 BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
 var fetcherThread: AbstractFetcherThread = null
 fetcherThreadMap.get(brokerAndFetcherId) match {
 case Some(f) => fetcherThread = f
 case None =>
      fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
 fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
      fetcherThread.start
  }

 fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
    topicAndPartition -> brokerAndInitOffset.initOffset
  })
}

3,ReplicaFetcherThread同步数据的过程

在其父类AbstractFetcherThread的dowork方法中会先构建FetchRequest,然后就是具体的获取数据更新本地偏移,然后调用ReplicaFetcherThread的processPartitionData方法,将数据追加到本地日志。

构建获取数据的请求

if (partitionMap.isEmpty)
 partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
 partitionMap.foreach {
 case((topicAndPartition, offset)) =>
 fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
 offset, fetchSize)
  }
}

val fetchRequest = fetchRequestBuilder.build()

在processFetcherRequest方法中具体的去请求数据

response = simpleConsumer.fetch(fetchRequest)

然后更新本地偏移

val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
val validBytes = messages.validBytes
val newOffset = messages.shallowIterator.toSeq.lastOption match {
 case Some(m: MessageAndOffset) => m.nextOffset
 case None => currentOffset.get
}
partitionMap.put(topicAndPartition, newOffset)

调用processPartitionData方法将数据追加到本地日志,此处跟生产者生产数据到leader的时候不同之处是分配偏移未使能,原因是从leader同步的数据就已经带了偏移

replica.log.get.append(messageSet, assignOffsets = false)

标记副本的高水位

replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)

四,总结

本文主要是讲解了kafka的副本同步过程,思路基本跟kafka的java高级消费者api一样。

两个抽象类:

AbstractFetcherManager:其子类两个

1,ReplicaFetcherManager

2,ConsumerFetcherManager

AbstractFetcherThread:其子类两个

ReplicaFetcherThread

ConsumerFetcherThread

此处,希望大仔细阅读相关代码,然后从中获取到相关的编程经验,提升自己的代码能力。

本文牵涉到一个重要的概念就是ShrinkIsr,弄懂这个概念,首先是要理解什么事isr列表和如何判断一个副本是否应该被移除isr列表。

两个重要的配置

名称

默认值

含义

replica.lag.time.max.ms

10000

副本未同步数据的时间

replica.lag.max.messages

4000

副本滞后的最大消息条数