Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 93B42200B6B for ; Fri, 26 Aug 2016 03:38:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 9218B160ABD; Fri, 26 Aug 2016 01:38:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6701E160AA5 for ; Fri, 26 Aug 2016 03:38:45 +0200 (CEST) Received: (qmail 66797 invoked by uid 500); 26 Aug 2016 01:38:44 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 66787 invoked by uid 99); 26 Aug 2016 01:38:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 26 Aug 2016 01:38:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 78563E098D; Fri, 26 Aug 2016 01:38:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <7220bcd27fd0483f8e982a17f244b63c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Move a few methods from the `ZKUtils` class to the companion object Date: Fri, 26 Aug 2016 01:38:44 +0000 (UTC) archived-at: Fri, 26 Aug 2016 01:38:46 -0000 Repository: kafka Updated Branches: refs/heads/trunk 53651937f -> 32feed25a MINOR: Move a few methods from the `ZKUtils` class to the companion object They don't require access to `ZkClient`. Also include a few obvious clean-ups in `ZKUtils`: * Remove redundant rethrows and braces * Use named arguments for booleans Author: Ismael Juma Reviewers: Gwen Shapira Closes #1775 from ijuma/move-some-zk-utils-methods-to-companion-object Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32feed25 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32feed25 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32feed25 Branch: refs/heads/trunk Commit: 32feed25aefdda3a9f2b780d0709a3002777c9df Parents: 5365193 Author: Ismael Juma Authored: Fri Aug 26 00:41:23 2016 +0100 Committer: Ismael Juma Committed: Fri Aug 26 00:41:23 2016 +0100 ---------------------------------------------------------------------- .../kafka/admin/ReassignPartitionsCommand.scala | 16 +-- .../kafka/controller/KafkaController.scala | 2 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 132 +++++++++---------- .../admin/ReassignPartitionsClusterTest.scala | 9 +- 4 files changed, 77 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 18f741e..0b32d93 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -62,7 +62,7 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val jsonString = Utils.readFileAsString(jsonFile) - val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentData(jsonString) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) println("Status of partition reassignment:") val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned) @@ -89,12 +89,12 @@ object ReassignPartitionsCommand extends Logging { val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val disableRackAware = opts.options.has(opts.disableRackAware) val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware) - println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments))) - println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(proposedAssignments))) + println("Current partition replica assignment\n\n%s".format(ZkUtils.formatAsReassignmentJson(currentAssignments))) + println("Proposed partition reassignment configuration\n\n%s".format(ZkUtils.formatAsReassignmentJson(proposedAssignments))) } def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = { - val topicsToReassign = zkUtils.parseTopicsData(topicsToMoveJsonString) + val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) val duplicateTopicsToReassign = CoreUtils.duplicates(topicsToReassign) if (duplicateTopicsToReassign.nonEmpty) throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) @@ -126,7 +126,7 @@ object ReassignPartitionsCommand extends Logging { def executeAssignment(zkUtils: ZkUtils, reassignmentJsonString: String) { - val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file is empty") val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map { case (tp, _) => tp }) @@ -145,10 +145,10 @@ object ReassignPartitionsCommand extends Logging { // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic)) println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" - .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment))) + .format(ZkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment))) // start the reassignment if (reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap))) + println("Successfully started reassignment of partitions %s".format(ZkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap))) else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } @@ -226,7 +226,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top val validPartitions = partitions.filter(p => validatePartition(zkUtils, p._1.topic, p._1.partition)) if (validPartitions.isEmpty) false else { - val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions) + val jsonReassignmentData = ZkUtils.formatAsReassignmentJson(validPartitions) zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData) true } http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 0d6f048..04bd3f4 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -1259,7 +1259,7 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL def handleDataChange(dataPath: String, data: Object) { debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s" .format(dataPath, data)) - val partitionsReassignmentData = zkUtils.parsePartitionReassignmentData(data.toString) + val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString) val partitionsToBeReassigned = inLock(controllerContext.controllerLock) { partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 3788ef4..a137da8 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -120,6 +120,61 @@ object ZkUtils { def getDeleteTopicPath(topic: String): String = DeleteTopicsPath + "/" + topic + + // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed + def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { + Json.parseFull(jsonData) match { + case Some(m) => + m.asInstanceOf[Map[String, Any]].get("partitions") match { + case Some(partitionsSeq) => + partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => { + val topic = p.get("topic").get.asInstanceOf[String] + val partition = p.get("partition").get.asInstanceOf[Int] + val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]] + TopicAndPartition(topic, partition) -> newReplicas + }) + case None => + Seq.empty + } + case None => + Seq.empty + } + } + + def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = + parsePartitionReassignmentDataWithoutDedup(jsonData).toMap + + def parseTopicsData(jsonData: String): Seq[String] = { + var topics = List.empty[String] + Json.parseFull(jsonData) match { + case Some(m) => + m.asInstanceOf[Map[String, Any]].get("topics") match { + case Some(partitionsSeq) => + val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]] + mapPartitionSeq.foreach(p => { + val topic = p.get("topic").get.asInstanceOf[String] + topics ++= List(topic) + }) + case None => + } + case None => + } + topics + } + + def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { + Json.encode(Map( + "version" -> 1, + "partitions" -> partitionsToBeReassigned.map { case (TopicAndPartition(topic, partition), replicas) => + Map( + "topic" -> topic, + "partition" -> partition, + "replicas" -> replicas + ) + } + )) + } + } class ZkUtils(val zkClient: ZkClient, @@ -337,7 +392,7 @@ class ZkUtils(val zkClient: ZkClient, } else acls if (!zkClient.exists(path)) - ZkPath.createPersistent(zkClient, path, true, acl) //won't throw NoNodeException or NodeExistsException + ZkPath.createPersistent(zkClient, path, createParents = true, acl) //won't throw NoNodeException or NodeExistsException } /** @@ -346,7 +401,7 @@ class ZkUtils(val zkClient: ZkClient, private def createParentPath(path: String, acls: java.util.List[ACL] = DefaultAcls): Unit = { val parentDir = path.substring(0, path.lastIndexOf('/')) if (parentDir.length != 0) { - ZkPath.createPersistent(zkClient, parentDir, true, acls) + ZkPath.createPersistent(zkClient, parentDir, createParents = true, acls) } } @@ -357,10 +412,9 @@ class ZkUtils(val zkClient: ZkClient, try { ZkPath.createEphemeral(zkClient, path, data, acls) } catch { - case e: ZkNoNodeException => { + case e: ZkNoNodeException => createParentPath(path) ZkPath.createEphemeral(zkClient, path, data, acls) - } } } @@ -372,14 +426,13 @@ class ZkUtils(val zkClient: ZkClient, try { createEphemeralPath(path, data, acls) } catch { - case e: ZkNodeExistsException => { + case e: ZkNodeExistsException => // this can happen when there is connection loss; make sure the data is what we intend to write var storedData: String = null try { storedData = readData(path)._1 } catch { case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this - case e2: Throwable => throw e2 } if (storedData == null || storedData != data) { info("conflict in " + path + " data: " + data + " stored data: " + storedData) @@ -388,8 +441,6 @@ class ZkUtils(val zkClient: ZkClient, // otherwise, the creation succeeded, return normally info(path + " exists with value " + data + " during connection loss; this is ok") } - } - case e2: Throwable => throw e2 } } @@ -400,10 +451,9 @@ class ZkUtils(val zkClient: ZkClient, try { ZkPath.createPersistent(zkClient, path, data, acls) } catch { - case e: ZkNoNodeException => { + case e: ZkNoNodeException => createParentPath(path) ZkPath.createPersistent(zkClient, path, data, acls) - } } } @@ -420,17 +470,14 @@ class ZkUtils(val zkClient: ZkClient, try { zkClient.writeData(path, data) } catch { - case e: ZkNoNodeException => { + case e: ZkNoNodeException => createParentPath(path) try { ZkPath.createPersistent(zkClient, path, data, acls) } catch { case e: ZkNodeExistsException => zkClient.writeData(path, data) - case e2: Throwable => throw e2 } - } - case e2: Throwable => throw e2 } } @@ -493,11 +540,9 @@ class ZkUtils(val zkClient: ZkClient, try { zkClient.writeData(path, data) } catch { - case e: ZkNoNodeException => { + case e: ZkNoNodeException => createParentPath(path) ZkPath.createEphemeral(zkClient, path, data, acls) - } - case e2: Throwable => throw e2 } } @@ -509,7 +554,6 @@ class ZkUtils(val zkClient: ZkClient, // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") false - case e2: Throwable => throw e2 } } @@ -533,7 +577,6 @@ class ZkUtils(val zkClient: ZkClient, case e: ZkNoNodeException => // this can happen during a connection loss event, return normally info(path + " deleted during connection loss; this is ok") - case e2: Throwable => throw e2 } } @@ -544,13 +587,12 @@ class ZkUtils(val zkClient: ZkClient, } def readDataMaybeNull(path: String): (Option[String], Stat) = { - val stat: Stat = new Stat() + val stat = new Stat() val dataAndStat = try { (Some(zkClient.readData(path, stat)), stat) } catch { case e: ZkNoNodeException => (None, stat) - case e2: Throwable => throw e2 } dataAndStat } @@ -568,7 +610,6 @@ class ZkUtils(val zkClient: ZkClient, zkClient.getChildren(path) } catch { case e: ZkNoNodeException => Nil - case e2: Throwable => throw e2 } } @@ -668,53 +709,6 @@ class ZkUtils(val zkClient: ZkClient, } } - // Parses without deduplicating keys so the data can be checked before allowing reassignment to proceed - def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { - Json.parseFull(jsonData) match { - case Some(m) => - m.asInstanceOf[Map[String, Any]].get("partitions") match { - case Some(partitionsSeq) => - partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => { - val topic = p.get("topic").get.asInstanceOf[String] - val partition = p.get("partition").get.asInstanceOf[Int] - val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]] - TopicAndPartition(topic, partition) -> newReplicas - }) - case None => - Seq.empty - } - case None => - Seq.empty - } - } - - def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { - parsePartitionReassignmentDataWithoutDedup(jsonData).toMap - } - - def parseTopicsData(jsonData: String): Seq[String] = { - var topics = List.empty[String] - Json.parseFull(jsonData) match { - case Some(m) => - m.asInstanceOf[Map[String, Any]].get("topics") match { - case Some(partitionsSeq) => - val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]] - mapPartitionSeq.foreach(p => { - val topic = p.get("topic").get.asInstanceOf[String] - topics ++= List(topic) - }) - case None => - } - case None => - } - topics - } - - def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { - Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition, - "replicas" -> e._2)))) - } - def updatePartitionReassignmentData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]) { val zkPath = ReassignPartitionsPath partitionsToBeReassigned.size match { http://git-wip-us.apache.org/repos/asf/kafka/blob/32feed25/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala index ac2c1ae..791c4d2 100644 --- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -16,10 +16,11 @@ import kafka.admin.ReassignPartitionsCommand import kafka.server.{KafkaConfig, KafkaServer} import kafka.utils.TestUtils._ import kafka.utils.ZkUtils._ -import kafka.utils.{CoreUtils, Logging} +import kafka.utils.{CoreUtils, Logging, ZkUtils} import kafka.zk.ZooKeeperTestHarness import org.junit.{After, Before, Test} import org.junit.Assert.assertEquals + import scala.collection.Seq @@ -73,7 +74,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //When rebalancing val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1 - ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment)) + ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment)) waitForReasignmentToComplete() //Then the replicas should span all three brokers @@ -94,7 +95,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { //When rebalancing val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1 - ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment)) + ReassignPartitionsCommand.executeAssignment(zkUtils, ZkUtils.formatAsReassignmentJson(newAssignment)) waitForReasignmentToComplete() //Then replicas should only span the first two brokers @@ -103,7 +104,7 @@ class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { } def waitForReasignmentToComplete() { - waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted") + waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode ${ZkUtils.ReassignPartitionsPath} wasn't deleted") } def json(topic: String): String = {