Partitioned replica current limiting mechanism Trilogy (source code)

In this article, we mainly take a look at how partition replica reallocation flow restriction is implemented. This source code analysis is based on kafka2.5. Before we begin, let's review the process of partition reallocation, as shown in Figure 1.

  • Execute the partition reallocation script, which is equivalent to starting a kafka client. After executing the script, write the reallocation data to zk and return.
  • kafkaController listens to the data change of zk partition reallocation node, and will call the corresponding handler to process the data. Here, it is equivalent to the role of kafka server brain. It controls the main process of partition reallocation, and sends commands to other nodes for execution. The simple description is that it will process the data in the corresponding broker according to the reallocation script Create a new copy and delete the copy that needs to be removed.
  • Finally, kafkaController receives the command from each broker to complete the operation. It will perform some operations after the reallocation, including modifying zk Node data, update metadata information, etc.

You can also think about how you would design and develop the current limiting function if you gave it to you? The current limit we want to talk about today is implemented in creating a new replica. When a replica needs to be created, kafkaController will send a LeaderAndISRRequest request to the corresponding broker. Let's start here.

broker processing of LeaderAndIsr requests

The entrance is at kafka server. KafkaApis. Handleleaderandisrrequest, handle LeaderAndIsrRequest The requested code is not put away, and the processing is relatively simple. Because we are now doing partition reallocation, adding replicas first, and then deleting unwanted replicas offline, we will follow the logic of the makeFollowers method in makeFollowers The main task in is to create a corresponding folder, and then start the corresponding ReplicaFetcherThread to communicate with the leader to pull data. Let's mainly analyze the code of ReplicaFetcherThread.

The main task here is to build a FetchRequest and then process the FetchResponse. If fetchRequestOpt is empty, the thread will wait for one second,

  private def maybeFetch(): Unit = {
      //Synchronize data and acquire locks
      val fetchRequestOpt = inLock(partitionMapLock) {
        //2.1 build synchronous data request
        val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = buildFetch(partitionStates.partitionStateMap.asScala)
        handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
        if (fetchRequestOpt.isEmpty) {
          trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
          //If there is no data to be built, wait for 1 second
          partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
      //2.2 in the case of current limiting, fetchRequestOpt is empty, so no request will be sent, the sample will be reset directly, or the current limiting standard will be reached
      fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
        processFetchRequest(sessionPartitions, fetchRequest)

Build synchronous data request code analysis

Here, we see the key code shouldFollowerThrottle. Next, we enter this method to see how to implement flow restriction.

  override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = {
    val partitionsWithError = mutable.Set[TopicPartition]()

    val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)
    partitionMap.foreach { case (topicPartition, fetchState) =>
      // 2.1.1 judge whether current limiting is required
      if (fetchState.isReadyForFetch && !shouldFollowerThrottle(quota, fetchState, topicPartition)) {
        try {
          val logStartOffset = this.logStartOffset(topicPartition)
          builder.add(topicPartition, new FetchRequest.PartitionData(
            fetchState.fetchOffset, logStartOffset, fetchSize, Optional.of(fetchState.currentLeaderEpoch)))
        } catch {
          case _: KafkaStorageException =>
            // The replica has already been marked offline due to log directory failure and the original failure should have already been logged.
            // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure()
            partitionsWithError += topicPartition

    val fetchData =
    //Judge that if fetchData is empty, return None directly
    val fetchRequestOpt = if (fetchData.sessionPartitions.isEmpty && fetchData.toForget.isEmpty) {
    } else {
      val requestBuilder = FetchRequest.Builder
        .forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend)
      Some(ReplicaFetch(fetchData.sessionPartitions(), requestBuilder))

    ResultWithPartitions(fetchRequestOpt, partitionsWithError)

shouldFollowerThrottle source code analysis

The code here is actually very simple. There are three judgments. The first one is to judge whether the replica is synchronized. If it is not synchronized, then! fetchState.isReplicaInSync is true. The second one is to judge whether the topicPartition is in the current limiting memory configuration. The memory configuration here is triggered after the configuration data is written to zk in the first step of partition reallocation. The third is the real current limit, to judge whether it reaches the threshold. If the threshold value set in zk is reached, false will be returned, fetchData will be empty, and the outer code will wait for one second before requesting again.

private def shouldFollowerThrottle(quota: ReplicaQuota, fetchState: PartitionFetchState, topicPartition: TopicPartition): Boolean = {
    !fetchState.isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded

Similarly, on the leader side, the broker where the leader is located will receive the Fetch request, and the entry is in kafka server. Kafkaapis\handlefetchrequest has a similar code, as follows

 val shouldLeaderThrottleResult = shouldLeaderThrottle(quota, tp, replicaId)
          val fetchDataInfo = if (shouldLeaderThrottleResult) {
            // If the partition is being throttled, simply return an empty set.
            FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
          } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
            // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
            // progress in such cases and don't need to report a `RecordTooLargeException`
            FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
          } else {
   //Several codes are omitted here       
 def shouldLeaderThrottle(quota: ReplicaQuota, topicPartition: TopicPartition, replicaId: Int): Boolean = {
    val isReplicaInSync = nonOfflinePartition(topicPartition).exists(_.inSyncReplicaIds.contains(replicaId))
    !isReplicaInSync && quota.isThrottled(topicPartition) && quota.isQuotaExceeded

Realization of current limiting

From the above code analysis, we know in which code the current limit is controlled, but we don't know how to implement it. Here we will explain it separately. See Figure 2 for details

Description of Metrics and KafkaMetric classes

Here, I only draw a simple composition relationship. The current limiting starts with the Metrics class. The Metrics class is equivalent to the global current limiting factory class. There are two concurrentmaps loaded with KafkaMetric and Sensor respectively. Let's take a look at KafkaMetric first , which mainly initializes some parameters and provides methods to obtain the current value. KafkaMetric is used to encapsulate a current limit type.

    double measurableValue(long timeMs) {
        synchronized (this.lock) {
            if (this.metricValueProvider instanceof Measurable)
                return ((Measurable) metricValueProvider).measure(config, timeMs);
                return 0;

When the Kafka server is started, a global Metrics and QuotaManagers current limiting information management class will be initialized. You can see the quotafactory In instance, the flow restriction management classes of leader and follower are initialized respectively. QuotaManagers or ReplicationQuotaManager objects are passed to other methods layer by layer as input parameters. This is a singleton.

def startup(): Unit = {
    try {
      //Omit several codes
        metrics = new Metrics(metricConfig, reporters, time, true)

        /* register broker metrics */
        _brokerTopicStats = new BrokerTopicStats

        quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
        //Omit several codes
    catch {
     //Omit several codes
  def instantiate(cfg: KafkaConfig, metrics: Metrics, time: Time, threadNamePrefix: String): QuotaManagers = {
      val clientQuotaCallback = Option(cfg.getConfiguredInstance(KafkaConfig.ClientQuotaCallbackClassProp,
        new ClientQuotaManager(clientFetchConfig(cfg), metrics, Fetch, time, threadNamePrefix, clientQuotaCallback),
        new ClientQuotaManager(clientProduceConfig(cfg), metrics, Produce, time, threadNamePrefix, clientQuotaCallback),
        new ClientRequestQuotaManager(clientRequestConfig(cfg), metrics, time, threadNamePrefix, clientQuotaCallback),
        new ReplicationQuotaManager(replicationConfig(cfg), metrics, LeaderReplication, time),
        new ReplicationQuotaManager(replicationConfig(cfg), metrics, FollowerReplication, time),
        new ReplicationQuotaManager(alterLogDirsReplicationConfig(cfg), metrics, AlterLogDirsReplication, time),

Sensor class description

For Sensor, this class has more functions. The code is as follows. Due to the space, there is no complete code here. Two important methods are provided here. One is record, which will be called at the key code to record the traffic value, and the other is checkQuotas To judge whether the threshold value has been reached, some friends may wonder why KafkaMetric is loaded in the Metrics class and the Sensor has a Map Metrics object. It can be understood that all KafkaMetric objects are loaded in metrics, while the Sensor has a specific limit on a record value.

public final class Sensor {

    private final Metrics registry;
    private final String name;
    private final Sensor[] parents;
    private final List<Stat> stats;
    private final Map<MetricName, KafkaMetric> metrics;
    public final MetricConfig config;
    private final Time time;
    private volatile long lastRecordTime;
    private final long inactiveSensorExpirationTimeMs;
    private final Object metricLock;

    public void record(double value, long timeMs) {
        record(value, timeMs, true);
    public void checkQuotas(long timeMs) {
        for (KafkaMetric metric : this.metrics.values()) {
            MetricConfig config = metric.config();
            if (config != null) {
                Quota quota = config.quota();
                if (quota != null) {
                     //Get the current calculated flow value
                    double value = metric.measurableValue(timeMs);
                    //Compare with the configured flow value. If it is not satisfied, throw an exception
                    if (!quota.acceptable(value)) {
                        throw new QuotaViolationException(metric.metricName(), value,

Implementation of sensor\record

The code is in the record method of SampledStat. I have annotated the logic of the code, mainly to obtain the current sample object according to the incoming time. If it exceeds the config The value configured in, the sample is added, otherwise the sample data is updated. In particular, the number of samples and the time to add samples are all passed in during the initialization of MetricConfig. The number of samples is 11 by default. You can configure replication quota. window . num. the sample time is timeWindow, which is 1 second by default. You can use replication quota. window. size. Second configuration.

public void record(MetricConfig config, double value, long timeMs) {
        //Record the flow and obtain the current sample
        Sample sample = current(timeMs);
        //If 1 second has passed, increase the sample
        if (sample.isComplete(timeMs, config))
            sample = advance(config, timeMs);
        //Otherwise, update the current sample value
        update(sample, config, value, timeMs);
        sample.eventCount += 1;

Implementation of sensor\checkquotas

As we know earlier, in checkQuotas, we mainly obtain the comparison between the current traffic value and the configuration, and calculate the current traffic value is implemented in the rate\measure method. Sampledstat\measure will be called in the measure method, and purgeObsoleteSamples will be called in the sampledstat\measure method Method to reset the expired copy, and then call the specific combine method. What we call here is implemented in WindowedSum, that is, loop the samples to accumulate and return to rate\measure In, it is found that the logic is nothing more than adding the traffic in each sample, and then dividing it by time to calculate the average traffic.

    public double measure(MetricConfig config, long now) {
        double value = stat.measure(config, now);
        return value / convert(windowSize(config, now));
    public double measure(MetricConfig config, long now) {
        purgeObsoleteSamples(config, now);
        return combine(this.samples, config, now);
    protected void purgeObsoleteSamples(MetricConfig config, long now) {
            long expireAge = config.samples() * config.timeWindowMs();
            //Reset if sample data has passed 11 seconds
            for (Sample sample : samples) {
                if (now - sample.lastWindowMs >= expireAge) {
                    //I.e. reset every 11 seconds
                    System.out.println("reset====" + now + "----" + sample.lastWindowMs + "---" + expireAge);
    public double combine(List<Sample> samples, MetricConfig config, long now) {
         double total = 0.0;
         for (Sample sample : samples) {
             total += sample.value;
         return total;

Where is the traffic recorded?

I believe many people have this question after reading this. On the followers side, it is recorded when the FetchResponse returns. See Figure 3 for the code

On the leader side, the traffic value will be recorded after reading the log file

It can be seen that on the follower side, it is necessary to determine whether the traffic value is recorded in the current limiting replica, but on the leader side, there is no such restriction. I wonder if this is a kafka bug? I also tested this problem. It is really the topic that is not in the current limiting configuration when the leader side is synchronized with the replica The traffic value will also be recorded. It is difficult to guess the intention of doing so. Here we can also sort out the implementation principle of Kafka current limiting: 1. Initialize the flow restriction management class and global flow restriction configuration when the broker is started. 2. If the current limit configuration of the leader and follower is written in the ZK node, the method isQuotaExceeded will be called when synchronizing the replica to determine whether the current limit is reached. Here, I also want to explain why the leader is written in zk replication . throttled. Why should all the original replicas be included when replicatias data is synchronized? Because the leader may be disconnected and re elected when synchronizing the new replica, it is better to write all the replicas at once. 3. If the flow limit is not reached, the flow will be recorded separately in the corresponding places, and the flow values of leader and follower are recorded separately. 4. The way to judge whether the current limit is reached is to record a certain number of samples recently and calculate the average value.

Some thoughts on current limiting

After learning this, you may have a basic understanding of the logic of current limiting code. Here are a few questions to impress you.

Set the current limit to 1. Will the partition reassignment task never be completed?

Not exactly. It can be imagined that if the replica data is less than the value of a fetch, there is no traffic record between the leader and the follower, and the synchronization ends. The sample can only record the traffic this time. However, it is very critical that there is no traffic record at all, which only exists in the demo The version of kafka can be reached on the follower side. On the leader side, according to my test, a fetch without data usually returns 18k of traffic, which is about 36k of data per second. Therefore, if the data is greater than the upper limit of a fetch, it will be limited by the leader side in subsequent fetches. When setting the flow limit, we must consider the daily traffic of the replica. This is also mentioned in the official document. If it is less than the daily traffic, the migration cannot be completed.

Testing for leader s

  • Prepare three brokers, topics_ 1 and topic_2. The leader s are all on broker1. topic_1 to broker2, topic_2. Migrate to broker3. The replica data is 200M. The current limit is set to 300*1024b/s, that is, 300k/s
  • The results are as follows: 200M = 2001024k,2001024k/300k/60 ≈ 11.37min. Because two topic s need to be migrated, and the leader side limits the transmission rate, the final migration duration is about 19 minutes
[controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=1] reassignment :Map(topic_1-0 -> ReplicaAssignment(replicas=1,2, addingReplicas=2, removingReplicas=), topic_2-0 -> ReplicaAssignment(replicas=1,3, addingReplicas=3, removingReplicas=)) start at 2021-11-13 13:18:20

[controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=1] No more partitions need to be reassigned. Deleting zk path /admin/reassign_partitions at 2021-11-13 13:37:31

Testing for follower

  • Prepare three brokers, topics_ 1 and topic_2,topic_ The leader of 1 is on broker1, topic_ The leaders of 2 are all migrated to broker3 on broker2. The replica data is 200M and the current limit is set to 300*1024b/s
  • The results are as follows. As in the previous example, it only took about 11 minutes. Because two topic s need to be migrated, and the follower side limits the transmission rate, the final migration duration is 21 minutes
[controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=1] reassignment :Map(topic_1-0 -> ReplicaAssignment(replicas=1,3, addingReplicas=3, removingReplicas=), topic_2-0 -> ReplicaAssignment(replicas=2,3, addingReplicas=3, removingReplicas=)) start at 2021-11-13 14:22:42

[controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=1] No more partitions need to be reassigned. Deleting zk path /admin/reassign_partitions at 2021-11-13 14:43:17

Limit tests that do not affect each other

  • Prepare three brokers, topics_ 1 and topic_2,topic_ The leader of 1 is on broker1, topic_ The leader of 2 is on broker2, topic_1 migrate to broker2, topic_2 migrate to broker3 The copy data is 200M, and the current limit is set to 300*1024b/s
  • The results are as follows. It can be seen that the migration of leader s and follower s is completed in 11 minutes when different topic s do not affect each other.
[controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=1] reassignment :Map(topic_1-0 -> ReplicaAssignment(replicas=1,2, addingReplicas=2, removingReplicas=), topic_2-0 -> ReplicaAssignment(replicas=2,3, addingReplicas=3, removingReplicas=)) start at 2021-11-13 15:04:37
[controller-event-thread] INFO kafka.controller.KafkaController - [Controller id=1] No more partitions need to be reassigned. Deleting zk path /admin/reassign_partitions at 2021-11-13 15:15:40

How to estimate the total cluster migration traffic?

My opinion is to take the union of all leaders and followers respectively and recalculate the number, multiply it by the current limit value respectively, and then take the smaller value to estimate. Sum = math min(leader,follower)*throttle.

Is kafka's current limiting implementation mechanism the same as you imagined?

I think kafka current limiting not only gives us a way to implement current limiting, but also teaches us how to split functions. This set is completely separated from kafka's main functions. It only provides key methods externally, and then records traffic values at key codes. This is what we need to learn in normal development and design.

Finally, some suggestions on learning kafka source code

  • First of all, I suggest you to understand the basic syntax of scala. Although it is similar to java, it is easier to understand the meaning of the source code after you are familiar with it.
  • It is recommended that you learn the relevant codes of kafka service communication and replica machine. These are two relatively independent modules, which are useful in many places.
  • Finally, the kafka breakpoint is very difficult to hit, and it is easy to lose contact with zk. Therefore, you should learn to log the key points and patiently analyze the source code.

Posted by Vidya_tr on Thu, 02 Jun 2022 08:32:22 +0530