kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [10/43] git commit: KAFKA-831 Controller does not send the complete list of partitions to a newly started broker; reviewed by Jun Rao
Date Mon, 08 Jul 2013 23:14:35 GMT
KAFKA-831 Controller does not send the complete list of partitions to a newly started broker;
reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d5e95f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d5e95f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d5e95f6

Branch: refs/heads/trunk
Commit: 1d5e95f6c4067884a374deddd88ec3f471664658
Parents: 66b1038
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Thu Mar 28 16:29:37 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu Mar 28 16:29:37 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/controller/KafkaController.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d5e95f6/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9d32901..47d4d7b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -273,9 +273,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends
Logg
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
 
     val newBrokersSet = newBrokers.toSet
-    // update partition state machine
-    partitionStateMachine.triggerOnlinePartitionStateChange()
+    // the very first thing to do when a new broker comes up is send it the entire list of
partitions that it is
+    // supposed to host. Based on that the broker starts the high watermark threads for the
input list of partitions
     replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
newBrokers), OnlineReplica)
+    // when a new broker comes up, the controller needs to trigger leader election for all
new and offline partitions
+    // to see if these brokers can become leaders for some/all of those
+    partitionStateMachine.triggerOnlinePartitionStateChange()
 
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{


Mime
View raw message