kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1653 Duplicate broker ids allowed in replica assignment; reviewed by Neha Narkhede
Date Wed, 22 Oct 2014 17:04:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 4b095760c -> e8923ae33


KAFKA-1653 Duplicate broker ids allowed in replica assignment; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e8923ae3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e8923ae3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e8923ae3

Branch: refs/heads/trunk
Commit: e8923ae33ac90170381d8c4a59a5eb810ee9f36d
Parents: 4b09576
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Wed Oct 22 09:59:17 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Wed Oct 22 10:04:39 2014 -0700

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala | 11 ++++++---
 .../kafka/admin/ReassignPartitionsCommand.scala | 24 +++++++++++++++++---
 .../main/scala/kafka/admin/TopicCommand.scala   |  4 ++++
 .../kafka/tools/StateChangeLogMerger.scala      |  7 +++++-
 core/src/main/scala/kafka/utils/Utils.scala     | 10 ++++++++
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 15 ++++++++----
 6 files changed, 59 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index c791848..79b5e0a 100644
--- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -78,12 +78,17 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       case Some(m) =>
         m.asInstanceOf[Map[String, Any]].get("partitions") match {
           case Some(partitionsList) =>
-            val partitions = partitionsList.asInstanceOf[List[Map[String, Any]]]
-            partitions.map { p =>
+            val partitionsRaw = partitionsList.asInstanceOf[List[Map[String, Any]]]
+            val partitions = partitionsRaw.map { p =>
               val topic = p.get("topic").get.asInstanceOf[String]
               val partition = p.get("partition").get.asInstanceOf[Int]
               TopicAndPartition(topic, partition)
-            }.toSet
+            }
+            val duplicatePartitions = Utils.duplicates(partitions)
+            val partitionsSet = partitions.toSet
+            if (duplicatePartitions.nonEmpty)
+              throw new AdminOperationException("Preferred replica election data contains
duplicate partitions: %s".format(duplicatePartitions.mkString(",")))
+            partitionsSet
           case None => throw new AdminOperationException("Preferred replica election data
is empty")
         }
       case None => throw new AdminOperationException("Preferred replica election data
is empty")

http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 691d69a..979992b 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -81,8 +81,14 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command
must include both --topics-to-move-json-file and --broker-list options")
     val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt)
     val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt)
+    val duplicateReassignments = Utils.duplicates(brokerListToReassign)
+    if (duplicateReassignments.nonEmpty)
+      throw new AdminCommandFailedException("Broker list contains duplicate entries: %s".format(duplicateReassignments.mkString(",")))
     val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
     val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
+    val duplicateTopicsToReassign = Utils.duplicates(topicsToReassign)
+    if (duplicateTopicsToReassign.nonEmpty)
+      throw new AdminCommandFailedException("List of topics to reassign contains duplicate
entries: %s".format(duplicateTopicsToReassign.mkString(",")))
     val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
 
     var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition,
List[Int]]()
@@ -103,17 +109,29 @@ object ReassignPartitionsCommand extends Logging {
       CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command
must include --reassignment-json-file that was output " + "during the --generate option")
     val reassignmentJsonFile =  opts.options.valueOf(opts.reassignmentJsonFileOpt)
     val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile)
-    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(reassignmentJsonString)
+    val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString)
     if (partitionsToBeReassigned.isEmpty)
       throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile))
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned)
+    val duplicateReassignedPartitions = Utils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas)
=> tp})
+    if (duplicateReassignedPartitions.nonEmpty)
+      throw new AdminCommandFailedException("Partition reassignment contains duplicate topic
partitions: %s".format(duplicateReassignedPartitions.mkString(",")))
+    val duplicateEntries= partitionsToBeReassigned
+      .map{ case(tp,replicas) => (tp, Utils.duplicates(replicas))}
+      .filter{ case (tp,duplicatedReplicas) => duplicatedReplicas.nonEmpty }
+    if (duplicateEntries.nonEmpty) {
+      val duplicatesMsg = duplicateEntries
+        .map{ case (tp,duplicateReplicas) => "%s contains multiple entries for %s".format(tp,
duplicateReplicas.mkString(",")) }
+        .mkString(". ")
+      throw new AdminCommandFailedException("Partition replica lists may not contain duplicate
entries: %s".format(duplicatesMsg))
+    }
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned.toMap)
     // before starting assignment, output the current replica assignment to facilitate rollback
     val currentPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
partitionsToBeReassigned.map(_._1.topic).toSeq)
     println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file
option during rollback"
       .format(ZkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment)))
     // start the reassignment
     if(reassignPartitionsCommand.reassignPartitions())
-      println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned)))
+      println("Successfully started reassignment of partitions %s".format(ZkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap)))
     else
       println("Failed to reassign partitions %s".format(partitionsToBeReassigned))
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 7672c5a..0b2735e 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -19,6 +19,7 @@ package kafka.admin
 
 import joptsimple._
 import java.util.Properties
+import kafka.common.AdminCommandFailedException
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -225,6 +226,9 @@ object TopicCommand {
     val ret = new mutable.HashMap[Int, List[Int]]()
     for (i <- 0 until partitionList.size) {
       val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
+      val duplicateBrokers = Utils.duplicates(brokerList)
+      if (duplicateBrokers.nonEmpty)
+        throw new AdminCommandFailedException("Partition replica lists may not contain duplicate
entries: %s".format(duplicateBrokers.mkString(",")))
       ret.put(i, brokerList.toList)
       if (ret(i).size != ret(0).size)
         throw new AdminOperationException("Partition " + i + " has different replication
factor: " + brokerList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
index d298e7e..b34b8c7 100644
--- a/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
+++ b/core/src/main/scala/kafka/tools/StateChangeLogMerger.scala
@@ -22,7 +22,7 @@ import scala.util.matching.Regex
 import collection.mutable
 import java.util.Date
 import java.text.SimpleDateFormat
-import kafka.utils.{Logging, CommandLineUtils}
+import kafka.utils.{Utils, Logging, CommandLineUtils}
 import kafka.common.Topic
 import java.io.{BufferedOutputStream, OutputStream}
 
@@ -115,6 +115,11 @@ object StateChangeLogMerger extends Logging {
     }
     if (options.has(partitionsOpt)) {
       partitions = options.valueOf(partitionsOpt).split(",").toList.map(_.toInt)
+      val duplicatePartitions = Utils.duplicates(partitions)
+      if (duplicatePartitions.nonEmpty) {
+        System.err.println("The list of partitions contains repeated entries: %s".format(duplicatePartitions.mkString(",")))
+        System.exit(1)
+      }
     }
     startDate = dateFormat.parse(options.valueOf(startTimeOpt).replace('\"', ' ').trim)
     endDate = dateFormat.parse(options.valueOf(endTimeOpt).replace('\"', ' ').trim)

http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 29d5a17..23aefb4 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -566,4 +566,14 @@ object Utils extends Logging {
       case c => c
     }.mkString 
   }
+
+  /**
+   * Returns a list of duplicated items
+   */
+  def duplicates[T](s: Traversable[T]): Iterable[T] = {
+    s.groupBy(identity)
+      .map{ case (k,l) => (k,l.size)}
+      .filter{ case (k,l) => (l > 1) }
+      .keys
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e8923ae3/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index a7b1fdc..56e3e88 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -575,23 +575,28 @@ object ZkUtils extends Logging {
     }
   }
 
-  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]]
= {
-    val reassignedPartitions: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map()
+  // Parses without deduplicating keys so the the data can be checked before allowing reassignment
to proceed
+  def parsePartitionReassignmentDataWithoutDedup(jsonData: String): Seq[(TopicAndPartition,
Seq[Int])] = {
     Json.parseFull(jsonData) match {
       case Some(m) =>
         m.asInstanceOf[Map[String, Any]].get("partitions") match {
           case Some(partitionsSeq) =>
-            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].foreach(p => {
+            partitionsSeq.asInstanceOf[Seq[Map[String, Any]]].map(p => {
               val topic = p.get("topic").get.asInstanceOf[String]
               val partition = p.get("partition").get.asInstanceOf[Int]
               val newReplicas = p.get("replicas").get.asInstanceOf[Seq[Int]]
-              reassignedPartitions += TopicAndPartition(topic, partition) -> newReplicas
+              TopicAndPartition(topic, partition) -> newReplicas
             })
           case None =>
+            Seq.empty
         }
       case None =>
+        Seq.empty
     }
-    reassignedPartitions
+  }
+
+  def parsePartitionReassignmentData(jsonData: String): Map[TopicAndPartition, Seq[Int]]
= {
+    parsePartitionReassignmentDataWithoutDedup(jsonData).toMap
   }
 
   def parseTopicsData(jsonData: String): Seq[String] = {


Mime
View raw message