Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7DC3F17495 for ; Wed, 22 Oct 2014 17:04:52 +0000 (UTC) Received: (qmail 90495 invoked by uid 500); 22 Oct 2014 17:04:52 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 90474 invoked by uid 500); 22 Oct 2014 17:04:52 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 90465 invoked by uid 99); 22 Oct 2014 17:04:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Oct 2014 17:04:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 18AE59B2AC1; Wed, 22 Oct 2014 17:04:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: nehanarkhede@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: KAFKA-1653 Duplicate broker ids allowed in replica assignment; reviewed by Neha Narkhede Date: Wed, 22 Oct 2014 17:04:52 +0000 (UTC) Repository: kafka Updated Branches: refs/heads/trunk 4b095760c -> e8923ae33 KAFKA-1653 Duplicate broker ids allowed in replica assignment; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e8923ae3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e8923ae3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e8923ae3 Branch: refs/heads/trunk Commit: e8923ae33ac90170381d8c4a59a5eb810ee9f36d Parents: 4b09576 Author: Ewen Cheslack-Postava Authored: Wed Oct 22 09:59:17 2014 -0700 Committer: Neha Narkhede Committed: Wed Oct 22 10:04:39 2014 -0700 ---------------------------------------------------------------------- .../PreferredReplicaLeaderElectionCommand.scala | 11 ++++++--- .../kafka/admin/ReassignPartitionsCommand.scala | 24 +++++++++++++++++--- .../main/scala/kafka/admin/TopicCommand.scala | 4 ++++ .../kafka/tools/StateChangeLogMerger.scala | 7 +++++- core/src/main/scala/kafka/utils/Utils.scala | 10 ++++++++ core/src/main/scala/kafka/utils/ZkUtils.scala | 15 ++++++++---- 6 files changed, 59 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index c791848..79b5e0a 100644 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -78,12 +78,17 @@ object PreferredReplicaLeaderElectionCommand extends Logging { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { case Some(partitionsList) => - val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]] - partitions.map { p => + val partitionsRaw = partitionsList.asInstanceOf[List[Map[String, Any]]] + val partitions = partitionsRaw.map { p => val topic = p.get("topic").get.asInstanceOf[String] val partition = p.get("partition").get.asInstanceOf[Int] TopicAndPartition(topic, partition) - }.toSet + } + val duplicatePartitions = Utils.duplicates(partitions) + val partitionsSet = partitions.toSet + if (duplicatePartitions.nonEmpty) + throw new AdminOperationException("Preferred replica election data contains duplicate partitions: %s".format(duplicatePartitions.mkString(","))) + partitionsSet case None => throw new AdminOperationException("Preferred replica election data is empty") } case None => throw new AdminOperationException("Preferred replica election data is empty") http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 691d69a..979992b 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -81,8 +81,14 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) + val duplicateReassignments = Utils.duplicates(brokerListToReassign) + if (duplicateReassignments.nonEmpty) + throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(","))) val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign) + if (duplicateTopicsToReassign.nonEmpty) + throw new AdminCommandFailedException("List of topics to reassign contains duplicate entries: %s".format(duplicateTopicsToReassign.mkString(","))) val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() @@ -103,17 +109,29 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString) + val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + val duplicateReassignedPartitions = Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp}) + if (duplicateReassignedPartitions.nonEmpty) + throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(","))) + val duplicateEntries= partitionsToBeReassigned + .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))} + .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty } + if (duplicateEntries.nonEmpty) { + val duplicatesMsg = duplicateEntries + .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp, duplicateReplicas.mkString(",")) } + .mkString(". ") + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicatesMsg)) + } + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap) // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, partitionsToBeReassigned.map(_._1.topic).toSeq) println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) // start the reassignment if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned))) + println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap))) else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 7672c5a..0b2735e 100644 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -19,6 +19,7 @@ package kafka.admin import joptsimple._ import java.util.Properties +import kafka.common.AdminCommandFailedException import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException @@ -225,6 +226,9 @@ object TopicCommand { val ret = new mutable.HashMap[Int, List[Int]]() for (i <- 0 until partitionList.size) { val brokerList = partitionList(i).split(":").map(s => s.trim().toInt) + val duplicateBrokers = Utils.duplicates(brokerList) + if (duplicateBrokers.nonEmpty) + throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(","))) ret.put(i, brokerList.toList) if (ret(i).size != ret(0).size) throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList) http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala index d298e7e..b34b8c7 100644 --- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala +++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala @@ -22,7 +22,7 @@ import scala.util.matching.Regex import collection.mutable import java.util.Date import java.text.SimpleDateFormat -import kafka.utils.{Logging, CommandLineUtils} +import kafka.utils.{Utils, Logging, CommandLineUtils} import kafka.common.Topic import java.io.{BufferedOutputStream, OutputStream} @@ -115,6 +115,11 @@ object StateChangeLogMerger extends Logging { } if (options.has(partitionsOpt)) { partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt) + val duplicatePartitions = Utils.duplicates(partitions) + if (duplicatePartitions.nonEmpty) { + System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(","))) + System.exit(1) + } } startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim) endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim) http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/utils/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala index 29d5a17..23aefb4 100644 --- a/core/src/main/scala/kafka/utils/Utils.scala +++ b/core/src/main/scala/kafka/utils/Utils.scala @@ -566,4 +566,14 @@ object Utils extends Logging { case c => c }.mkString } + + /** + * Returns a list of duplicated items + */ + def duplicates[T](s: Traversable[T]): Iterable[T] = { + s.groupBy(identity) + .map{ case (k,l) => (k,l.size)} + .filter{ case (k,l) => (l > 1) } + .keys + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index a7b1fdc..56e3e88 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -575,23 +575,28 @@ object ZkUtils extends Logging { } } - def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { - val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map() + // Parses without deduplicating keys so the the data can be checked before allowing reassignment to proceed + def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition, Seq[Int])] = { Json.parseFull(jsonData) match { case Some(m) => m.asInstanceOf[Map[String, Any]].get("partitions") match { case Some(partitionsSeq) => - partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => { + partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => { val topic = p.get("topic").get.asInstanceOf[String] val partition = p.get("partition").get.asInstanceOf[Int] val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]] - reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas + TopicAndPartition(topic, partition) -> newReplicas }) case None => + Seq.empty } case None => + Seq.empty } - reassignedPartitions + } + + def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]] = { + parsePartitionReassignmentDataWithoutDedup(jsonData).toMap } def parseTopicsData(jsonData: String): Seq[String] = {