Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0FA9AE19B for ; Sat, 26 Jan 2013 06:04:57 +0000 (UTC) Received: (qmail 57598 invoked by uid 500); 26 Jan 2013 06:04:57 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 57416 invoked by uid 500); 26 Jan 2013 06:04:54 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 55902 invoked by uid 99); 26 Jan 2013 06:04:38 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 26 Jan 2013 06:04:38 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id C1B5D825CCC; Sat, 26 Jan 2013 06:04:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: junrao@apache.org To: commits@kafka.apache.org X-Mailer: ASF-Git Admin Mailer Subject: [6/28] git commit: Revert "draft patch" Message-Id: <20130126060436.C1B5D825CCC@tyr.zones.apache.org> Date: Sat, 26 Jan 2013 06:04:36 +0000 (UTC) Revert "draft patch" This reverts commit aa1546b0907c959a4df90a7e3d48bad0890d1f2f. Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5490884e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5490884e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5490884e Branch: refs/heads/trunk Commit: 5490884e9cbd28ac57e4b73bec72bcab81ce6523 Parents: 426ef16 Author: Neha Narkhede Authored: Mon Jan 21 09:53:41 2013 -0800 Committer: Neha Narkhede Committed: Mon Jan 21 09:53:41 2013 -0800 ---------------------------------------------------------------------- .../kafka/controller/PartitionLeaderSelector.scala | 3 +-- .../kafka/controller/PartitionStateMachine.scala | 9 --------- core/src/main/scala/kafka/utils/ZkUtils.scala | 11 +++++------ 3 files changed, 6 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala index 5f748b8..3eb23cd 100644 --- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala +++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala @@ -177,8 +177,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext) (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas) case None => - throw new StateChangeFailedException(("No other replicas in ISR %s for [%s,%d] besides current leader %d and" + - " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topic, partition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(","))) + throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition)) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 94f27ae..372793b 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -151,15 +151,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { case OfflinePartition => // pre: partition should be in Online state assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition) - // mark the partition offline by setting the leader to -1 - // read the current leader and isr path - val leaderIsrAndControllerEpoch = controller.controllerContext.allLeaders(topicAndPartition) - leaderIsrAndControllerEpoch.leaderAndIsr.leader = -1 - leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch += 1 - leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion += 1 - ZkUtils.updatePersistentPath(zkClient, - ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), - ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch)) // should be called when the leader for a partition is no longer alive info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition)) partitionState.put(topicAndPartition, OfflinePartition) http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 113ad37..f594404 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -95,11 +95,10 @@ object ZkUtils extends Logging { : Option[LeaderIsrAndControllerEpoch] = { Json.parseFull(leaderAndIsrStr) match { case Some(m) => - val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]] - val leader = leaderIsrAndEpochInfo.get("leader").get.toInt - val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt - val isrString = leaderIsrAndEpochInfo.get("ISR").get - val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt + val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt + val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt + val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get + val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt val isr = Utils.parseCsvList(isrString).map(r => r.toInt) val zkPathVersion = stat.getVersion debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch, @@ -202,7 +201,7 @@ object ZkUtils extends Logging { val jsonDataMap = new HashMap[String, String] jsonDataMap.put("leader", leaderAndIsr.leader.toString) jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString) - jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(",")) + jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(",")) jsonDataMap.put("controllerEpoch", controllerEpoch.toString) Utils.stringMapToJson(jsonDataMap) }