Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 40E6CF464 for ; Fri, 29 Mar 2013 15:27:03 +0000 (UTC) Received: (qmail 30109 invoked by uid 500); 29 Mar 2013 15:27:02 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 29465 invoked by uid 500); 29 Mar 2013 15:27:01 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 29244 invoked by uid 99); 29 Mar 2013 15:27:01 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 29 Mar 2013 15:27:01 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4DF7D833EBB; Fri, 29 Mar 2013 15:27:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nehanarkhede@apache.org To: commits@kafka.apache.org Message-Id: <68a1fac7ac174ed68e7e7b5e8d26d5b4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: git commit: KAFKA-831 Controller does not send the complete list of partitions to a newly started broker; reviewed by Jun Rao and Swapnil Ghike Date: Fri, 29 Mar 2013 15:27:01 +0000 (UTC) Updated Branches: refs/heads/0.8 6e05d7da8 -> 9f6af315c KAFKA-831 Controller does not send the complete list of partitions to a newly started broker; reviewed by Jun Rao and Swapnil Ghike Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f6af315 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f6af315 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f6af315 Branch: refs/heads/0.8 Commit: 9f6af315ca101272748de54fd1347c9def7d80af Parents: 6e05d7d Author: Neha Narkhede Authored: Fri Mar 29 08:24:25 2013 -0700 Committer: Neha Narkhede Committed: Fri Mar 29 08:26:41 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/cluster/Partition.scala | 2 +- .../kafka/controller/ReplicaStateMachine.scala | 23 ++++++-------- 2 files changed, 11 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6e73003..2ca7ee6 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -195,7 +195,7 @@ class Partition(val topic: String, replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) case None => // leader went down stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " + - " controller %d epoch %d since leader %d for partition [%s,%d] became unavailable during the state change operation" + " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation" .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, newLeaderBrokerId, topic, partitionId)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 199640b..3cf1da3 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -143,21 +143,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ => - // check if the leader for this partition is alive or even exists - controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { + // check if the leader for this partition ever existed + controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => - controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { - case true => // leader is alive - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderIsrAndControllerEpoch, - replicaAssignment.size) - replicaState.put((topic, partition, replicaId), OnlineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - case false => // ignore partitions whose leader is not alive - } - case None => // ignore partitions who don't have a leader yet + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OnlineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + case None => // that means the partition was never in OnlinePartition state, this means the broker never + // started a log for that partition and does not have a high watermark value for this partition } + } replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica =>