kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-831 Controller does not send the complete list of partitions to a newly started broker; reviewed by Jun Rao and Swapnil Ghike
Date Fri, 29 Mar 2013 15:27:01 GMT
Updated Branches:
  refs/heads/0.8 6e05d7da8 -> 9f6af315c


KAFKA-831 Controller does not send the complete list of partitions to a newly started broker;
reviewed by Jun Rao and Swapnil Ghike


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f6af315
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f6af315
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f6af315

Branch: refs/heads/0.8
Commit: 9f6af315ca101272748de54fd1347c9def7d80af
Parents: 6e05d7d
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Fri Mar 29 08:24:25 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Fri Mar 29 08:26:41 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/cluster/Partition.scala  |    2 +-
 .../kafka/controller/ReplicaStateMachine.scala     |   23 ++++++--------
 2 files changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 6e73003..2ca7ee6 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -195,7 +195,7 @@ class Partition(val topic: String,
           replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset,
leaderBroker)
         case None => // leader went down
           stateChangeLogger.trace("Broker %d aborted the become-follower state change with
correlation id %d from " +
-            " controller %d epoch %d since leader %d for partition [%s,%d] became unavailable
during the state change operation"
+            " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable
during the state change operation"
                                      .format(localBrokerId, correlationId, controllerId,
leaderIsrAndControllerEpoch.controllerEpoch,
                                               newLeaderBrokerId, topic, partitionId))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 199640b..3cf1da3 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -143,21 +143,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging
{
               stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d
for partition %s to OnlineReplica"
                                         .format(controllerId, controller.epoch, replicaId,
topicAndPartition))
             case _ =>
-              // check if the leader for this partition is alive or even exists
-                controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
+              // check if the leader for this partition ever existed
+              controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
-                  controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
match {
-                    case true => // leader is alive
-                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                          topic, partition,
leaderIsrAndControllerEpoch,
-                                                                          replicaAssignment.size)
-                      replicaState.put((topic, partition, replicaId), OnlineReplica)
-                      stateChangeLogger.trace("Controller %d epoch %d changed state of replica
%d for partition %s to OnlineReplica"
-                                                .format(controllerId, controller.epoch, replicaId,
topicAndPartition))
-                    case false => // ignore partitions whose leader is not alive
-                  }
-                case None => // ignore partitions who don't have a leader yet
+                  brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic,
partition, leaderIsrAndControllerEpoch,
+                    replicaAssignment.size)
+                  replicaState.put((topic, partition, replicaId), OnlineReplica)
+                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica
%d for partition %s to OnlineReplica"
+                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+                case None => // that means the partition was never in OnlinePartition
state, this means the broker never
+                  // started a log for that partition and does not have a high watermark
value for this partition
               }
+
           }
           replicaState.put((topic, partition, replicaId), OnlineReplica)
         case OfflineReplica =>


Mime
View raw message