kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Lucas Wang (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (KAFKA-6481) Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
Date Thu, 08 Feb 2018 08:33:00 GMT

     [ https://issues.apache.org/jira/browse/KAFKA-6481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Lucas Wang updated KAFKA-6481:
------------------------------
    Description: 
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process
the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating
through the set of partitions in TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted
can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas,
the topic a0 would be marked as ineligible for deletion, and its partitions will be retained
in the field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas
in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of
a broker going offline, a call stack listed as following will happen on the controller, causing
a iteration of the whole partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition,
beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing
brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment
`echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted,
since the topic has dead replicas, and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the controller.log file, one
line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip
sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment
`echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is because we have
the following stack trace: 
    addUpdateMetadataRequestForBrokers
    addLeaderAndIsrRequestForBrokers
    _inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable,
and a new topic is created with m partitions, m * n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica
state on the controller.
 8. Verify that the following log message in step 4 appears another *210* times. This is because
    a. During controlled shutdown, the function KafkaController.doControlledShutdown calls
replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState.
That in turn generates 100 (10 x 10) entries of the logs above.
    b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls
KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState.
And this again generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest
(given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition
in the a0 topic.

In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and
a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated.

Performance improvement benchmark: if we perform the steps above with topic a0 having 5000
partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the controller
~4 minutes for to controller to go through controlled shutdown, detect the broker failure
through zookeeper, and transition all replicas to OfflineReplica state. After applying the
patch, the same process takes 23 seconds.

After applying the patch in this RB, I've verified that by going through the steps above,
broker 2 going offline NO LONGER generates log entries for the a0 partitions.
 Also I've verified that topic deletion for topic a1 still works fine.

  was:
The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only process
the partitions specified in the partitions parameter, i.e. the 2nd parameter, and avoid iterating
through the set of partitions in TopicDeletionManager.partitionsToBeDeleted.

 

Here is why the current code can be a problem:

The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted
can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas,
the topic a0 would be marked as ineligible for deletion, and its partitions will be retained
in the field TopicDeletionManager.partitionsToBeDeleted for future retries.
 With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some replicas
in another topic a1 needs to be transitioned to OfflineReplica state, possibly because of
a broker going offline, a call stack listed as following will happen on the controller, causing
a iteration of the whole partitions-to-be-deleted set for every single affected partition.

    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition,
beingDeleted = true))
     ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
     ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
     _inside a for-loop for each partition_ 
 ReplicaStateMachine.doHandleStateChanges
 ReplicaStateMachine.handleStateChanges
 KafkaController.onReplicasBecomeOffline
 KafkaController.onBrokerFailure

How to reproduce the problem:
 1. Cretae a cluster with 2 brokers having id 1 and 2
 2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing
brokers, i.e. 
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment
`echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`

3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted,
since the topic has dead replicas, and is ineligible for deletion.

./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0

4._Verify that the following log message shows up 10 times in the controller.log file, one
line for each partition in topic a0: "Leader not yet assigned for partition [a0,..]. Skip
sending UpdateMetadataRequest."_

5. Create another topic a1 also having 10 partitions, i.e.
 ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment
`echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`

6. Verify that the log message in step 4 appears *100 more* times (). This is because we have
the following stack trace: 
    addUpdateMetadataRequestForBrokers
    addLeaderAndIsrRequestForBrokers
    _inside a for-loop for each create response_   
initializeLeaderAndIsrForPartitions

In general, if n partitions have already been accumulated in the partitionsToBeDeleted variable,
and a new topic is created with m partitions, m * n log messages above will be generated.

 7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica
state on the controller.
 8. Verify that the following log message in step 4 appears another *210* times. This is because
    a. During controlled shutdown, the function KafkaController.doControlledShutdown calls
replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to OfflineState.
That in turn generates 100 (10 x 10) entries of the logs above.
    b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure calls
KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to OfflineState.
And this again generates 100 (10 x 10) logs above.
   c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest
(given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition
in the a0 topic.

In general, when we have n partitions accumulated in the variable partitionsToBeDeleted, and
a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated.

After applying the patch in this RB, I've verified that by going through the steps above,
broker 2 going offline NO LONGER generates log entries for the a0 partitions.
 Also I've verified that topic deletion for topic a1 still works fine.


> Improving performance of the function ControllerChannelManager.addUpdateMetadataRequestForBrokers
> -------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6481
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6481
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Minor
>
> The function ControllerChannelManager.addUpdateMetadataRequestForBrokers should only
process the partitions specified in the partitions parameter, i.e. the 2nd parameter, and
avoid iterating through the set of partitions in TopicDeletionManager.partitionsToBeDeleted.
>  
> Here is why the current code can be a problem:
> The number of partitions-to-be-deleted stored in the field TopicDeletionManager.partitionsToBeDeleted
can become quite large under certain scenarios. For instance, if a topic a0 has dead replicas,
the topic a0 would be marked as ineligible for deletion, and its partitions will be retained
in the field TopicDeletionManager.partitionsToBeDeleted for future retries.
>  With a large set of partitions in TopicDeletionManager.partitionsToBeDeleted, if some
replicas in another topic a1 needs to be transitioned to OfflineReplica state, possibly because
of a broker going offline, a call stack listed as following will happen on the controller,
causing a iteration of the whole partitions-to-be-deleted set for every single affected partition.
>     controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition,
beingDeleted = true))
>      ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
>      ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
>      _inside a for-loop for each partition_ 
>  ReplicaStateMachine.doHandleStateChanges
>  ReplicaStateMachine.handleStateChanges
>  KafkaController.onReplicasBecomeOffline
>  KafkaController.onBrokerFailure
> How to reproduce the problem:
>  1. Cretae a cluster with 2 brokers having id 1 and 2
>  2. Create a topic having 10 partitions and deliberately assign the replicas to non-existing
brokers, i.e. 
>  ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 --replica-assignment
`echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`
> 3. Delete the topic and cause all of its partitions to be retained in the field TopicDeletionManager.partitionsToBeDeleted,
since the topic has dead replicas, and is ineligible for deletion.
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0
> 4._Verify that the following log message shows up 10 times in the controller.log file,
one line for each partition in topic a0: "Leader not yet assigned for partition [a0,..].
Skip sending UpdateMetadataRequest."_
> 5. Create another topic a1 also having 10 partitions, i.e.
>  ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 --replica-assignment
`echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
> 6. Verify that the log message in step 4 appears *100 more* times (). This is because
we have the following stack trace: 
>     addUpdateMetadataRequestForBrokers
>     addLeaderAndIsrRequestForBrokers
>     _inside a for-loop for each create response_   
> initializeLeaderAndIsrForPartitions
> In general, if n partitions have already been accumulated in the partitionsToBeDeleted
variable, and a new topic is created with m partitions, m * n log messages above will be generated.
>  7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned to OfflineReplica
state on the controller.
>  8. Verify that the following log message in step 4 appears another *210* times. This
is because
>     a. During controlled shutdown, the function KafkaController.doControlledShutdown
calls replicaStateMachine.handleStateChanges to transition all the replicas on broker 2 to
OfflineState. That in turn generates 100 (10 x 10) entries of the logs above.
>     b. When the broker zNode is gone in ZK, the function KafkaController.onBrokerFailure
calls KafkaController.onReplicasBecomeOffline to transition all the replicas on broker 2 to
OfflineState. And this again generates 100 (10 x 10) logs above.
>    c. At the bottom of the the function onReplicasBecomeOffline, it calls sendUpdateMetadataRequest
(given the partitionsWithoutLeader is empty), which generates 10 logs, one for each partition
in the a0 topic.
> In general, when we have n partitions accumulated in the variable partitionsToBeDeleted,
and a broker with m partitions becomes offline, up to 2 * m * n + n logs could be generated.
> Performance improvement benchmark: if we perform the steps above with topic a0 having
5000 partitions, and topic a1 having 5000 partitions, when broker 2 goes down, it takes the
controller ~4 minutes for to controller to go through controlled shutdown, detect the broker
failure through zookeeper, and transition all replicas to OfflineReplica state. After applying
the patch, the same process takes 23 seconds.
> After applying the patch in this RB, I've verified that by going through the steps above,
broker 2 going offline NO LONGER generates log entries for the a0 partitions.
>  Also I've verified that topic deletion for topic a1 still works fine.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message