kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5856; AdminClient.createPartitions() follow-up (KIP-195)
Date Thu, 21 Sep 2017 14:21:38 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 96ba21e0d -> a553764c8


KAFKA-5856; AdminClient.createPartitions() follow-up (KIP-195)

- Remove DelayedCreatePartitions to reduce code duplication
- Avoid unnecessary ZK calls (call it once per request instead
of once per topic, if possible)
- Simplify code
- A few minor clean-ups

Author: Ismael Juma <ismael@juma.me.uk>

Reviewers: Tom Bentley <tbentley@redhat.com>, Rajini Sivaram <rajinisivaram@googlemail.com>

Closes #3930 from ijuma/kafka-5856-admin-client-creation-partitions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a553764c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a553764c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a553764c

Branch: refs/heads/trunk
Commit: a553764c8b1611cafd318022d0fc4a34c861f6ba
Parents: 96ba21e
Author: Ismael Juma <ismael@juma.me.uk>
Authored: Thu Sep 21 15:21:09 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu Sep 21 15:21:09 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |   3 +-
 .../clients/admin/CreatePartitionsResult.java   |   2 +-
 .../kafka/clients/admin/KafkaAdminClient.java   |   5 +-
 .../kafka/clients/admin/NewPartitions.java      |   9 +-
 .../requests/CreatePartitionsRequest.java       |  17 +--
 .../common/requests/RequestResponseTest.java    |   4 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala | 116 +++++++++----------
 .../main/scala/kafka/admin/TopicCommand.scala   |  14 +--
 .../main/scala/kafka/server/AdminManager.scala  |  62 +++++-----
 .../kafka/server/DelayedCreatePartitions.scala  |  69 +++++------
 .../kafka/server/DelayedCreateTopics.scala      |  92 ---------------
 .../src/main/scala/kafka/server/KafkaApis.scala |  51 ++++----
 .../kafka/api/AdminClientIntegrationTest.scala  |  14 +--
 .../kafka/api/BaseProducerSendTest.scala        |   2 +-
 .../unit/kafka/admin/AddPartitionsTest.scala    |  32 ++---
 .../unit/kafka/admin/DeleteTopicTest.scala      |  21 ++--
 16 files changed, 212 insertions(+), 301 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 02c2d0a..0276fc3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -87,7 +87,8 @@ public abstract class AdminClient implements AutoCloseable {
     /**
      * Create a batch of new topics with the default options.
      *
-     * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options. See the overload for more details.
+     * This is a convenience method for #{@link #createTopics(Collection, CreateTopicsOptions)} with default options.
+     * See the overload for more details.
      *
      * @param newTopics         The new topics to create.
      * @return                  The CreateTopicsResult.

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
index b27abd6..c3a504b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreatePartitionsResult.java
@@ -50,4 +50,4 @@ public class CreatePartitionsResult {
     public KafkaFuture<Void> all() {
         return KafkaFuture.allOf(values.values().toArray(new KafkaFuture[0]));
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fe92b15..27c4b18 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1835,7 +1835,8 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeReplicaLogDirResult(new HashMap<TopicPartitionReplica, KafkaFuture<ReplicaLogDirInfo>>(futures));
     }
 
-    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions, final CreatePartitionsOptions options) {
+    public CreatePartitionsResult createPartitions(Map<String, NewPartitions> newPartitions,
+                                                   final CreatePartitionsOptions options) {
         final Map<String, KafkaFutureImpl<Void>> futures = new HashMap<>(newPartitions.size());
         for (String topic : newPartitions.keySet()) {
             futures.put(topic, new KafkaFutureImpl<Void>());
@@ -1869,7 +1870,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new CreatePartitionsResult((Map) futures);
+        return new CreatePartitionsResult(new HashMap<String, KafkaFuture<Void>>(futures));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
index 5aaaeac..66a4d92 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewPartitions.java
@@ -43,7 +43,7 @@ public class NewPartitions {
      * Increase the partition count for a topic to the given {@code totalCount}.
      * The assignment of new replicas to brokers will be decided by the broker.
      *
-     * @param totalCount The total partitions count after the operation succeeds.
+     * @param totalCount The total number of partitions after the operation succeeds.
      */
     public static NewPartitions increaseTo(int totalCount) {
         return new NewPartitions(totalCount, null);
@@ -70,7 +70,7 @@ public class NewPartitions {
      * <p>In this example partition 3's preferred leader will be broker 1, partition 4's preferred leader will be
      * broker 2 and partition 5's preferred leader will be broker 3.</p>
      *
-     * @param totalCount The total partitions count after the operation succeeds.
+     * @param totalCount The total number of partitions after the operation succeeds.
      * @param newAssignments The replica assignments for the new partitions.
      */
     public static NewPartitions increaseTo(int totalCount, List<List<Integer>> newAssignments) {
@@ -78,15 +78,14 @@ public class NewPartitions {
     }
 
     /**
-     * The new total partition count (not the number of new partitions).
+     * The total number of partitions after the operation succeeds.
      */
     public int totalCount() {
         return totalCount;
     }
 
     /**
-     * The replica assignments for the new partitions, or null if the assignment of
-     * replicas to brokers will be done by the controller.
+     * The replica assignments for the new partitions, or null if the assignment will be done by the controller.
      */
     public List<List<Integer>> assignments() {
         return newAssignments;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
index 81d26a0..3e3cc30 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java
@@ -39,7 +39,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT32;
 
 public class CreatePartitionsRequest extends AbstractRequest {
 
-    private static final String TOPIC_PARTITION_COUNT_KEY_NAME = "topic_partitions";
+    private static final String TOPIC_PARTITIONS_KEY_NAME = "topic_partitions";
     private static final String NEW_PARTITIONS_KEY_NAME = "new_partitions";
     private static final String COUNT_KEY_NAME = "count";
     private static final String ASSIGNMENT_KEY_NAME = "assignment";
@@ -47,24 +47,25 @@ public class CreatePartitionsRequest extends AbstractRequest {
     private static final String VALIDATE_ONLY_KEY_NAME = "validate_only";
 
     private static final Schema CREATE_PARTITIONS_REQUEST_V0 = new Schema(
-            new Field(TOPIC_PARTITION_COUNT_KEY_NAME, new ArrayOf(
+            new Field(TOPIC_PARTITIONS_KEY_NAME, new ArrayOf(
                     new Schema(
                             TOPIC_NAME,
                             new Field(NEW_PARTITIONS_KEY_NAME, new Schema(
                                     new Field(COUNT_KEY_NAME, INT32, "The new partition count."),
-                                    new Field(ASSIGNMENT_KEY_NAME, ArrayOf.nullable(new ArrayOf(INT32)), "The assigned brokers.")
+                                    new Field(ASSIGNMENT_KEY_NAME, ArrayOf.nullable(new ArrayOf(INT32)),
+                                            "The assigned brokers.")
                             )))),
                     "List of topic and the corresponding new partitions."),
             new Field(TIMEOUT_KEY_NAME, INT32, "The time in ms to wait for the partitions to be created."),
             new Field(VALIDATE_ONLY_KEY_NAME, BOOLEAN,
-                    "If true then validate the request, but don't actually increase the partition count."));
+                    "If true then validate the request, but don't actually increase the number of partitions."));
 
     public static Schema[] schemaVersions() {
         return new Schema[]{CREATE_PARTITIONS_REQUEST_V0};
     }
 
     // It is an error for duplicate topics to be present in the request,
-    // so track duplicates here to allow detailed KafkaApis to report per-topic errors.
+    // so track duplicates here to allow KafkaApis to report per-topic errors.
     private final Set<String> duplicates;
     private final Map<String, NewPartitions> newPartitions;
     private final int timeout;
@@ -110,7 +111,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
 
     public CreatePartitionsRequest(Struct struct, short apiVersion) {
         super(apiVersion);
-        Object[] topicCountArray = struct.getArray(TOPIC_PARTITION_COUNT_KEY_NAME);
+        Object[] topicCountArray = struct.getArray(TOPIC_PARTITIONS_KEY_NAME);
         Map<String, NewPartitions> counts = new HashMap<>(topicCountArray.length);
         Set<String> dupes = new HashSet<>();
         for (Object topicPartitionCountObj : topicCountArray) {
@@ -166,7 +167,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
         Struct struct = new Struct(ApiKeys.CREATE_PARTITIONS.requestSchema(version()));
         List<Struct> topicPartitionsList = new ArrayList<>();
         for (Map.Entry<String, NewPartitions> topicPartitionCount : this.newPartitions.entrySet()) {
-            Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITION_COUNT_KEY_NAME);
+            Struct topicPartitionCountStruct = struct.instance(TOPIC_PARTITIONS_KEY_NAME);
             topicPartitionCountStruct.set(TOPIC_NAME, topicPartitionCount.getKey());
             NewPartitions count = topicPartitionCount.getValue();
             Struct partitionCountStruct = topicPartitionCountStruct.instance(NEW_PARTITIONS_KEY_NAME);
@@ -184,7 +185,7 @@ public class CreatePartitionsRequest extends AbstractRequest {
             topicPartitionCountStruct.set(NEW_PARTITIONS_KEY_NAME, partitionCountStruct);
             topicPartitionsList.add(topicPartitionCountStruct);
         }
-        struct.set(TOPIC_PARTITION_COUNT_KEY_NAME, topicPartitionsList.toArray(new Object[0]));
+        struct.set(TOPIC_PARTITIONS_KEY_NAME, topicPartitionsList.toArray(new Object[0]));
         struct.set(TIMEOUT_KEY_NAME, this.timeout);
         struct.set(VALIDATE_ONLY_KEY_NAME, this.validateOnly);
         return struct;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 0ef0acd..a76cc84 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -247,7 +247,7 @@ public class RequestResponseTest {
         checkRequest(createCreatePartitionsRequest());
         checkRequest(createCreatePartitionsRequestWithAssignments());
         checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
-        checkResponse(createCreatePartitionsErrorResponse(), 0);
+        checkResponse(createCreatePartitionsResponse(), 0);
     }
 
     @Test
@@ -1126,7 +1126,7 @@ public class RequestResponseTest {
         return new CreatePartitionsRequest(assignments, 0, false, (short) 0);
     }
 
-    private CreatePartitionsResponse createCreatePartitionsErrorResponse() {
+    private CreatePartitionsResponse createCreatePartitionsResponse() {
         Map<String, ApiError> results = new HashMap<>();
         results.put("my_topic", ApiError.fromThrowable(
                 new InvalidReplicaAssignmentException("The assigned brokers included an unknown broker")));

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 2e0fddf..b6c9afe 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -24,14 +24,11 @@ import kafka.utils.ZkUtils._
 import java.util.Random
 import java.util.Properties
 
-import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
-import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, ReassignmentInProgressException, TopicExistsException, UnknownTopicOrPartitionException}
+import kafka.common.TopicAlreadyMarkedForDeletionException
+import org.apache.kafka.common.errors.{BrokerNotAvailableException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidReplicationFactorException, InvalidTopicException, TopicExistsException, UnknownTopicOrPartitionException}
 
-import scala.collection._
+import collection.{Map, Set, mutable, _}
 import scala.collection.JavaConverters._
-import scala.collection.mutable
-import collection.Map
-import collection.Set
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.kafka.common.internals.Topic
 
@@ -263,27 +260,24 @@ object AdminUtils extends Logging with AdminUtilities {
   *
   * @param zkUtils Zookeeper utilities
   * @param topic Topic for adding partitions to
+  * @param existingAssignment A map from partition id to its assigned replicas
+  * @param allBrokers All brokers in the cluster
   * @param numPartitions Number of partitions to be set
   * @param replicaAssignment Manual replica assignment, or none
-  * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
+  * @param validateOnly If true, validate the parameters without actually adding the partitions
+  * @return the updated replica assignment
   */
   def addPartitions(zkUtils: ZkUtils,
                     topic: String,
                     existingAssignment: Map[Int, Seq[Int]],
+                    allBrokers: Seq[BrokerMetadata],
                     numPartitions: Int = 1,
                     replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
-                    checkBrokerAvailable: Boolean = true,
-                    rackAwareMode: RackAwareMode = RackAwareMode.Enforced,
                     validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
-    if (zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)) {
-      // We prevent addition partitions while a reassignment is in progress, since
-      // during reassignment there is no meaningful notion of replication factor
-      throw new ReassignmentInProgressException("A partition reassignment is in progress.")
-    }
-
     val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
       throw new AdminOperationException(
-        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. Assignment: $existingAssignment"))
+        s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
+          s"Assignment: $existingAssignment"))
 
     val partitionsToAdd = numPartitions - existingAssignment.size
     if (partitionsToAdd <= 0)
@@ -292,76 +286,76 @@ object AdminUtils extends Logging with AdminUtilities {
           s"Topic $topic currently has ${existingAssignment.size} partitions, " +
           s"$numPartitions would not be an increase.")
 
-    // create the new partition replication list
-    val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
-    val newPartitionReplicaList = replicaAssignment.getOrElse{
-      val startIndex = math.max(0, brokerMetadatas.indexWhere(_.id >= existingAssignmentPartition0.head))
-      AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitionsToAdd, existingAssignmentPartition0.size,
-        startIndex, existingAssignment.size)
+    replicaAssignment.foreach { proposedReplicaAssignment =>
+      validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0,
+        allBrokers.map(_.id).toSet)
     }
-    validateReplicaAssignment(newPartitionReplicaList, existingAssignmentPartition0, brokerMetadatas.map(_.id).toSet, checkBrokerAvailable)
 
-    // check if manual assignment has the right replication factor
-    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => p.size != existingAssignmentPartition0.size)
-    if (unmatchedRepFactorList.nonEmpty)
-      throw new InvalidReplicaAssignmentException(s"Existing topic replication factor of ${existingAssignmentPartition0.size}, but manual replication assignment would imply replication factor(s) of ${unmatchedRepFactorList.map(_.size).mkString(",")}.")
+    val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
+      val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
+      AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
+        startIndex, existingAssignment.size)
+    }
+    val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
     if (!validateOnly) {
-      info(s"Add partition list for $topic is $newPartitionReplicaList.")
-      val partitionReplicaList = existingAssignment.map{ case (partitiondId, replicas) => (partitiondId, replicas) }
+      info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
+        s"$proposedAssignmentForNewPartitions.")
       // add the combined new list
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList ++ newPartitionReplicaList, update = true)
+      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true)
     }
-    newPartitionReplicaList
+    proposedAssignment
   }
 
   /**
-    * Parse a replica assignment string of the form
-    * <pre><code>
+    * Parse a replica assignment string of the form:
+    * {{{
     * broker_id_for_part1_replica1:broker_id_for_part1_replica2,
     * broker_id_for_part2_replica1:broker_id_for_part2_replica2,
     * ...
-    * </code></pre>
-    * @param replicaAssignmentList The string to parse
-    * @param startPartitionId
-    * @return
+    * }}}
     */
-  def parseManualReplicaAssignment(replicaAssignmentList: String, startPartitionId: Int): Map[Int, List[Int]] = {
-    var partitionList = replicaAssignmentList.split(",")
-    val ret = new mutable.HashMap[Int, List[Int]]()
+  def parseReplicaAssignment(replicaAssignmentsString: String, startPartitionId: Int): Map[Int, Seq[Int]] = {
+    val assignmentStrings = replicaAssignmentsString.split(",")
+    val assignmentMap = mutable.Map[Int, Seq[Int]]()
     var partitionId = startPartitionId
-    //partitionList = partitionList.takeRight(partitionList.size - partitionId)
-    for (i <- partitionList.indices) {
-      val brokerList = partitionList(i).split(":").map(s => s.trim().toInt).toList
-      ret.put(partitionId, brokerList)
+    for (assignmentString <- assignmentStrings) {
+      val brokerIds = assignmentString.split(":").map(_.trim.toInt).toSeq
+      assignmentMap.put(partitionId, brokerIds)
       partitionId = partitionId + 1
     }
-    ret.toMap
+    assignmentMap
   }
 
-  def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
-                                existingAssignmentPartition0: Seq[Int],
-                                availableBrokerList: Set[Int], checkBrokerAvailable: Boolean = true): Unit = {
+  private def validateReplicaAssignment(replicaAssignment: Map[Int, Seq[Int]],
+                                        existingAssignmentPartition0: Seq[Int],
+                                        availableBrokerIds: Set[Int]): Unit = {
 
-    val badRepFactor = replicaAssignment.toSeq.sortBy(_._1).reverse.map { case (partitionId, brokerList) =>
-      if (brokerList.isEmpty)
+    replicaAssignment.foreach { case (partitionId, replicas) =>
+      if (replicas.isEmpty)
         throw new InvalidReplicaAssignmentException(
           s"Cannot have replication factor of 0 for partition id $partitionId.")
-      if (brokerList.size != brokerList.toSet.size)
+      if (replicas.size != replicas.toSet.size)
         throw new InvalidReplicaAssignmentException(
           s"Duplicate brokers not allowed in replica assignment: " +
-            s"${brokerList.mkString(", ")} for partition id $partitionId.")
-      if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
+            s"${replicas.mkString(", ")} for partition id $partitionId.")
+      if (!replicas.toSet.subsetOf(availableBrokerIds))
         throw new BrokerNotAvailableException(
           s"Some brokers specified for partition id $partitionId are not available. " +
-            s"Specified brokers: ${brokerList.mkString(", ")}, " +
-            s"available brokers: ${availableBrokerList.mkString(", ")}.")
-      partitionId -> brokerList.size
-
-    }.filter{ case (partitionId, repFactor) => repFactor != existingAssignmentPartition0.size}
-    if (badRepFactor.nonEmpty)
+            s"Specified brokers: ${replicas.mkString(", ")}, " +
+            s"available brokers: ${availableBrokerIds.mkString(", ")}.")
+      partitionId -> replicas.size
+    }
+    val badRepFactors = replicaAssignment.collect {
+      case (partition, replicas) if replicas.size != existingAssignmentPartition0.size => partition -> replicas.size
+    }
+    if (badRepFactors.nonEmpty) {
+      val sortedBadRepFactors = badRepFactors.toSeq.sortBy { case (partitionId, _) => partitionId }
+      val partitions = sortedBadRepFactors.map { case (partitionId, _) => partitionId }
+      val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
       throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between partitions, " +
-        s"partition 0 has ${existingAssignmentPartition0.size} " +
-        s"while partitions [${badRepFactor.map(_._1).mkString(", ")}] have replication factors [${badRepFactor.map(_._2).mkString(", ")}], respectively")
+        s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(", ")}] have " +
+        s"replication factors [${repFactors.mkString(", ")}], respectively")
+    }
   }
 
   def deleteTopic(zkUtils: ZkUtils, topic: String) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f22c5d4..f2a74a0 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -150,15 +150,13 @@ object TopicCommand extends Logging {
         if (existingAssignment.isEmpty)
           throw new InvalidTopicException(s"The topic $topic does not exist")
         val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
-        val newAssignment = if (replicaAssignmentStr == null || replicaAssignmentStr.isEmpty)
-          None
-        else {
-          var partitionList = replicaAssignmentStr.split(",")
-          val startPartitionId = existingAssignment.size;
-          partitionList = partitionList.takeRight(partitionList.size - startPartitionId)
-          Some(AdminUtils.parseManualReplicaAssignment(partitionList.mkString(","), startPartitionId))
+        val newAssignment = Option(replicaAssignmentStr).filter(_.nonEmpty).map { replicaAssignmentString =>
+          val startPartitionId = existingAssignment.size
+          val partitionList = replicaAssignmentString.split(",").drop(startPartitionId)
+          AdminUtils.parseReplicaAssignment(partitionList.mkString(","), startPartitionId)
         }
-        AdminUtils.addPartitions(zkUtils, topic, existingAssignment, nPartitions, newAssignment)
+        val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
+        AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers, nPartitions, newAssignment)
         println("Adding partitions succeeded!")
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 58bb094..fd31fc4 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -19,13 +19,13 @@ package kafka.server
 import java.util.{Collections, Properties}
 
 import kafka.admin.{AdminOperationException, AdminUtils}
-import kafka.common.{TopicAlreadyMarkedForDeletionException}
+import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
 import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, ConfigResource}
-import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException}
+import org.apache.kafka.common.errors.{ApiException, InvalidPartitionsException, InvalidReplicaAssignmentException, InvalidRequestException, InvalidTopicException, PolicyViolationException, ReassignmentInProgressException, UnknownTopicOrPartitionException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
@@ -147,7 +147,7 @@ class AdminManager(val config: KafkaConfig,
       responseCallback(results)
     } else {
       // 3. else pass the assignments and errors to the delayed operation and set the keys
-      val delayedCreate = new DelayedCreateTopics(timeout, metadata.toSeq, this, responseCallback)
+      val delayedCreate = new DelayedCreatePartitions(timeout, metadata.toSeq, this, responseCallback)
       val delayedCreateKeys = createInfo.keys.map(new TopicKey(_)).toSeq
       // try to complete the request immediately, otherwise put it into the purgatory
       topicPurgatory.tryCompleteElseWatch(delayedCreate, delayedCreateKeys)
@@ -203,10 +203,25 @@ class AdminManager(val config: KafkaConfig,
                        listenerName: ListenerName,
                        callback: Map[String, ApiError] => Unit): Unit = {
 
+    val reassignPartitionsInProgress = zkUtils.pathExists(ZkUtils.ReassignPartitionsPath)
+    val allBrokers = AdminUtils.getBrokerMetadatas(zkUtils)
+    val allBrokerIds = allBrokers.map(_.id)
+
     // 1. map over topics creating assignment and calling AdminUtils
-    val metadata = newPartitions.map{ case (topic, newPartition) =>
+    val metadata = newPartitions.map { case (topic, newPartition) =>
       try {
-        val oldNumPartitions = zkUtils.getTopicPartitionCount(topic).getOrElse(throw new InvalidTopicException())
+        // We prevent addition partitions while a reassignment is in progress, since
+        // during reassignment there is no meaningful notion of replication factor
+        if (reassignPartitionsInProgress)
+          throw new ReassignmentInProgressException("A partition reassignment is in progress.")
+
+        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
+          case (topicPartition, replicas) => topicPartition.partition -> replicas
+        }
+        if (existingAssignment.isEmpty)
+          throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist")
+
+        val oldNumPartitions = existingAssignment.size
         val newNumPartitions = newPartition.totalCount
         val numPartitionsIncrement = newNumPartitions - oldNumPartitions
         if (numPartitionsIncrement < 0) {
@@ -215,38 +230,31 @@ class AdminManager(val config: KafkaConfig,
         } else if (numPartitionsIncrement == 0) {
           throw new InvalidPartitionsException(s"Topic already has $oldNumPartitions partitions.")
         }
-        val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
-          case (topicPartition, replicas) => topicPartition.partition -> replicas
-        }
-        if (existingAssignment.isEmpty)
-          throw new InvalidTopicException(s"The topic $topic does not exist")
-        val reassignment = if (newPartition.assignments == null) {
-          None
-        } else {
-          val assignments = newPartition.assignments.asScala.map(inner => inner.asScala.toList)
-          // check each broker exists
-          val unknownBrokers = assignments.flatten.toSet -- zkUtils.getAllBrokersInCluster.map(broker => broker.id)
-          if (unknownBrokers.nonEmpty) {
+
+        val reassignment = Option(newPartition.assignments).map(_.asScala.map(_.asScala.map(_.toInt))).map { assignments =>
+          val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
+          if (unknownBrokers.nonEmpty)
             throw new InvalidReplicaAssignmentException(
               s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.")
-          }
-          if (assignments.size != numPartitionsIncrement) {
+
+          if (assignments.size != numPartitionsIncrement)
             throw new InvalidRequestException(
               s"Increasing the number of partitions by $numPartitionsIncrement " +
                 s"but ${assignments.size} assignments provided.")
-          }
-          Some(newPartition.assignments.asScala.toList.zipWithIndex.map{ case (replicas, index) =>
-            existingAssignment.size + index -> replicas.asScala.map(_.toInt)
-          }.toMap)
+
+          assignments.zipWithIndex.map { case (replicas, index) =>
+            existingAssignment.size + index -> replicas
+          }.toMap
         }
 
-        val added = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, newPartition.totalCount, reassignment, validateOnly = validateOnly)
-        CreatePartitionMetadata(topic, added.keySet, ApiError.NONE)
+        val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment, allBrokers,
+          newPartition.totalCount, reassignment, validateOnly = validateOnly)
+        CreateTopicMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
       } catch {
         case e: AdminOperationException =>
-          CreatePartitionMetadata(topic, null, ApiError.fromThrowable(e))
+          CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e))
         case e: ApiException =>
-          CreatePartitionMetadata(topic, null, ApiError.fromThrowable(e))
+          CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
index f7bf32a..81a096e 100644
--- a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
@@ -1,53 +1,53 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
 
 package kafka.server
 
 import kafka.api.LeaderAndIsr
-import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
 
-import scala.collection.{Map, Seq, Set}
-import scala.collection.JavaConverters._
+import scala.collection._
 
-case class CreatePartitionMetadata(topic: String, addedPartitions: Set[Int], error: ApiError)
+/**
+  * The create metadata maintained by the delayed create topic or create partitions operations.
+  */
+case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: ApiError)
 
 /**
-  * A delayed create partitions operation that can be created by the admin manager and watched
-  * in the topic purgatory
+  * A delayed create topic or create partitions operation that is stored in the topic purgatory.
   */
 class DelayedCreatePartitions(delayMs: Long,
-                              createMetadata: Seq[CreatePartitionMetadata],
+                              createMetadata: Seq[CreateTopicMetadata],
                               adminManager: AdminManager,
                               responseCallback: Map[String, ApiError] => Unit)
   extends DelayedOperation(delayMs) {
 
   /**
-    * The operation can be completed if all of the topics that do not have an error exist and every partition has a leader in the controller.
+    * The operation can be completed if all of the topics that do not have an error exist and every partition has a
+    * leader in the controller.
     * See KafkaController.onNewTopicCreation
     */
   override def tryComplete() : Boolean = {
     trace(s"Trying to complete operation for $createMetadata")
 
-    val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess)
-      .foldLeft(0) { case (topicCounter, metadata) =>
-        topicCounter + missingLeaderCount(metadata.topic, metadata.addedPartitions)
-      }
+    val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess).foldLeft(0) { case (topicCounter, metadata) =>
+      topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet)
+    }
 
     if (leaderlessPartitionCount == 0) {
       trace("All partitions have a leader, completing the delayed operation")
@@ -65,7 +65,7 @@ class DelayedCreatePartitions(delayMs: Long,
     trace(s"Completing operation for $createMetadata")
     val results = createMetadata.map { metadata =>
       // ignore topics that already have errors
-      if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.addedPartitions) > 0)
+      if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
         (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
       else
         (metadata.topic, metadata.error)
@@ -73,23 +73,16 @@ class DelayedCreatePartitions(delayMs: Long,
     responseCallback(results)
   }
 
-  override def onExpiration(): Unit = { }
+  override def onExpiration(): Unit = {}
 
   private def missingLeaderCount(topic: String, partitions: Set[Int]): Int = {
     partitions.foldLeft(0) { case (counter, partition) =>
-      if (isMissingLeader(topic, partition)) {
-        trace(s"topic $topic, partition $partition is missing its leader")
-        counter + 1
-      } else {
-        trace(s"topic $topic, partition $partition has its leader")
-        counter
-      }
+      if (isMissingLeader(topic, partition)) counter + 1 else counter
     }
   }
 
   private def isMissingLeader(topic: String, partition: Int): Boolean = {
     val partitionInfo = adminManager.metadataCache.getPartitionInfo(topic, partition)
-    trace(s"PartitionState topic $topic, partition $partition: $partitionInfo")
     partitionInfo.isEmpty || partitionInfo.get.basePartitionState.leader == LeaderAndIsr.NoLeader
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
deleted file mode 100644
index 50b8143..0000000
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.server
-
-import kafka.api.LeaderAndIsr
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.ApiError
-
-import scala.collection._
-
-/**
-  * The create metadata maintained by the delayed create operation
-  *
-  * TODO: local state doesn't count, need to know state of all relevant brokers
-  *
-  */
-case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: ApiError)
-
-/**
-  * A delayed create topics operation that can be created by the admin manager and watched
-  * in the topic purgatory
-  */
-class DelayedCreateTopics(delayMs: Long,
-                          createMetadata: Seq[CreateTopicMetadata],
-                          adminManager: AdminManager,
-                          responseCallback: Map[String, ApiError] => Unit)
-  extends DelayedOperation(delayMs) {
-
-  /**
-    * The operation can be completed if all of the topics that do not have an error exist and every partition has a leader in the controller.
-    * See KafkaController.onNewTopicCreation
-    */
-  override def tryComplete() : Boolean = {
-    trace(s"Trying to complete operation for $createMetadata")
-
-    val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess)
-      .foldLeft(0) { case (topicCounter, metadata) =>
-        topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet)
-      }
-
-    if (leaderlessPartitionCount == 0) {
-      trace("All partitions have a leader, completing the delayed operation")
-      forceComplete()
-    } else {
-      trace(s"$leaderlessPartitionCount partitions do not have a leader, not completing the delayed operation")
-      false
-    }
-  }
-
-  /**
-    * Check for partitions that are still missing a leader, update their error code and call the responseCallback
-    */
-  override def onComplete() {
-    trace(s"Completing operation for $createMetadata")
-    val results = createMetadata.map { metadata =>
-      // ignore topics that already have errors
-      if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0)
-        (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
-      else
-        (metadata.topic, metadata.error)
-    }.toMap
-    responseCallback(results)
-  }
-
-  override def onExpiration(): Unit = { }
-
-  private def missingLeaderCount(topic: String, partitions: Set[Int]): Int = {
-    partitions.foldLeft(0) { case (counter, partition) =>
-      if (isMissingLeader(topic, partition)) counter + 1 else counter
-    }
-  }
-
-  private def isMissingLeader(topic: String, partition: Int): Boolean = {
-    val partitionInfo = adminManager.metadataCache.getPartitionInfo(topic, partition)
-    partitionInfo.isEmpty || partitionInfo.get.basePartitionState.leader == LeaderAndIsr.NoLeader
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 6d82892..6434d23 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -55,7 +55,6 @@ import org.apache.kafka.common.requests.{SaslAuthenticateResponse, SaslHandshake
 import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 import DescribeLogDirsResponse.LogDirInfo
-import org.apache.kafka.clients.admin.NewPartitions
 
 import scala.collection.{mutable, _}
 import scala.collection.JavaConverters._
@@ -1336,39 +1335,41 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreatePartitionsRequest(request: RequestChannel.Request): Unit = {
-    val alterPartitionCountsRequest = request.body[CreatePartitionsRequest]
-    val (valid, errors) =
+    val createPartitionsRequest = request.body[CreatePartitionsRequest]
+
+    def sendResponseCallback(results: Map[String, ApiError]): Unit = {
+      def createResponse(requestThrottleMs: Int): AbstractResponse = {
+        val responseBody = new CreatePartitionsResponse(requestThrottleMs, results.asJava)
+        trace(s"Sending create partitions response $responseBody for correlation id ${request.header.correlationId} to " +
+          s"client ${request.header.clientId}.")
+        responseBody
+      }
+      sendResponseMaybeThrottle(request, createResponse)
+    }
+
     if (!controller.isActive) {
-      (Map.empty[String, NewPartitions],
-      alterPartitionCountsRequest.newPartitions.asScala.map { case (topic, _) =>
+      val result = createPartitionsRequest.newPartitions.asScala.map { case (topic, _) =>
         (topic, new ApiError(Errors.NOT_CONTROLLER, null))
-      })
+      }
+      sendResponseCallback(result)
     } else {
       // Special handling to add duplicate topics to the response
-      val dupes = alterPartitionCountsRequest.duplicates.asScala
-      val notDuped = alterPartitionCountsRequest.newPartitions.asScala.retain((topic, count) => !dupes.contains(topic))
+      val dupes = createPartitionsRequest.duplicates.asScala
+      val notDuped = createPartitionsRequest.newPartitions.asScala -- dupes
       val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
         authorize(request.session, Alter, new Resource(Topic, topic))
       }
-      val (queuedForDeletion, live) = authorized.partition{ case (topic, _) => controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic) }
-      (live,
-      dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request")).toMap ++
-        unauthorized.map{ case (topic, np) =>
-          topic -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, Errors.TOPIC_AUTHORIZATION_FAILED.message())
-        } ++ queuedForDeletion.map{ case (topic, np) =>
-          topic -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion.")
-        })
-    }
-
-    def sendResponseCallback(results: Map[String, ApiError]): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        val responseBody = new CreatePartitionsResponse(requestThrottleMs, (results ++ errors).asJava)
-        trace(s"Sending alter partition counts response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.")
-        responseBody
+      val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
+        controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic)
       }
-      sendResponseMaybeThrottle(request, createResponse)
+
+      val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic in request")) ++
+        unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)) ++
+        queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is queued for deletion."))
+
+      adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly,
+        request.context.listenerName, result => sendResponseCallback(result ++ errors))
     }
-    adminManager.createPartitions(alterPartitionCountsRequest.timeout(), valid, alterPartitionCountsRequest.validateOnly, request.context.listenerName, sendResponseCallback)
   }
 
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index 823e44d..da818d6 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -382,14 +382,14 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       NewPartitions.increaseTo(2)).asJava, validateOnly)
     var altered = alterResult.values.get(topic1).get
     // assert that the topics still has 1 partition
-    assertEquals(1, client.describeTopics(Set(topic1).asJavaCollection).values.get(topic1).get.partitions.size)
+    assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
 
     // try creating a new partition (no assignments), to bring the total to 3 partitions
     alterResult = client.createPartitions(Map(topic1 ->
       NewPartitions.increaseTo(3)).asJava)
     altered = alterResult.values.get(topic1).get
     // assert that the topics now has 2 partitions
-    var actualPartitions = client.describeTopics(Set(topic1).asJavaCollection).values.get(topic1).get.partitions
+    var actualPartitions = client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions
     assertEquals(3, actualPartitions.size)
 
     // now try creating a new partition (with assignments), to bring the total to 3 partitions
@@ -397,10 +397,10 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       NewPartitions.increaseTo(3, asList(asList(0, 1), asList(1, 2)))).asJava)
     altered = alterResult.values.get(topic2).get
     // assert that the topics now has 3 partitions
-    actualPartitions = client.describeTopics(Set(topic2).asJavaCollection).values.get(topic2).get.partitions
+    actualPartitions = client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions
     assertEquals(3, actualPartitions.size)
-    assertEquals(List[Integer](0, 1), actualPartitions.get(1).replicas.asScala.map(_.id).toList)
-    assertEquals(List[Integer](1, 2), actualPartitions.get(2).replicas.asScala.map(_.id).toList)
+    assertEquals(Seq(0, 1), actualPartitions.get(1).replicas.asScala.map(_.id))
+    assertEquals(Seq(1, 2), actualPartitions.get(2).replicas.asScala.map(_.id))
 
     // try a newCount which would be a decrease
     alterResult = client.createPartitions(Map(topic1 ->
@@ -435,8 +435,8 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging {
       fail("Expect InvalidTopicException when using an unknown topic")
     } catch {
       case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
-        assertEquals("The request attempted to perform an operation on an invalid topic.", e.getCause.getMessage)
+        assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
+        assertEquals("The topic 'an-unknown-topic' does not exist", e.getCause.getMessage)
     }
 
     // try an invalid newCount

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ef9d9c1..1369136 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -375,7 +375,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     val existingAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic)).map {
       case (topicPartition, replicas) => topicPartition.partition -> replicas
     }
-    AdminUtils.addPartitions(zkUtils, topic, existingAssignment, 2)
+    AdminUtils.addPartitions(zkUtils, topic, existingAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2)
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index 2df5305..44b2843 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -25,9 +25,8 @@ import kafka.utils.TestUtils._
 import kafka.utils.TestUtils
 import kafka.cluster.Broker
 import kafka.client.ClientUtils
-import kafka.common.TopicAndPartition
 import kafka.server.{KafkaConfig, KafkaServer}
-import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException}
+import org.apache.kafka.common.errors.InvalidReplicaAssignmentException
 import org.apache.kafka.common.network.ListenerName
 import org.junit.{After, Before, Test}
 
@@ -74,7 +73,8 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testWrongReplicaCount(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, 2, Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
+      AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+        Some(Map(0 -> Seq(0, 1), 1 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
       case _: InvalidReplicaAssignmentException => //this is good
@@ -84,7 +84,8 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
   @Test
   def testMissingPartition0(): Unit = {
     try {
-      AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, 2, Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
+      AdminUtils.addPartitions(zkUtils, topic5, topic5Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+        Some(Map(1 -> Seq(0, 1), 2 -> Seq(0, 1, 2))))
       fail("Add partitions should fail")
     } catch {
       case e: AdminOperationException => //this is good
@@ -94,7 +95,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testIncrementPartitions(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, 3)
+    AdminUtils.addPartitions(zkUtils, topic1, topic1Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 1)
     val leader2 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic1, 2)
@@ -121,7 +122,8 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testManualAssignmentOfReplicas(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, 3,
+    // Add 2 partitions
+    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3,
       Some(Map(0 -> Seq(1, 2), 1 -> Seq(0, 1), 2 -> Seq(2, 3))))
     // wait until leader is elected
     val leader1 = waitUntilLeaderIsElectedOrChanged(zkUtils, topic2, 1)
@@ -137,20 +139,20 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
       brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
       "AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 0).topicsMetadata
-    val metaDataForTopic2 = metadata.filter(p => p.topic.equals(topic2))
+    val metaDataForTopic2 = metadata.filter(_.topic == topic2)
     val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
-    assertEquals(partitionDataForTopic2.size, 3)
-    assertEquals(partitionDataForTopic2(1).partitionId, 1)
-    assertEquals(partitionDataForTopic2(2).partitionId, 2)
+    assertEquals(3, partitionDataForTopic2.size)
+    assertEquals(1, partitionDataForTopic2(1).partitionId)
+    assertEquals(2, partitionDataForTopic2(2).partitionId)
     val replicas = partitionDataForTopic2(1).replicas
-    assertEquals(replicas.size, 2)
-    assert(replicas.head.id == 0 || replicas.head.id == 1)
-    assert(replicas(1).id == 0 || replicas(1).id == 1)
+    assertEquals(2, replicas.size)
+    assertTrue(replicas.head.id == 0 || replicas.head.id == 1)
+    assertTrue(replicas(1).id == 0 || replicas(1).id == 1)
   }
 
   @Test
   def testReplicaPlacementAllServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, 7)
+    AdminUtils.addPartitions(zkUtils, topic3, topic3Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 7)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 1)
@@ -177,7 +179,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
 
   @Test
   def testReplicaPlacementPartialServers(): Unit = {
-    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, 3)
+    AdminUtils.addPartitions(zkUtils, topic2, topic2Assignment, AdminUtils.getBrokerMetadatas(zkUtils), 3)
 
     // read metadata from a broker and verify the new topic partitions exist
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a553764c/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 36f439f..50fa21e 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -152,11 +152,17 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     servers = createTestTopicAndCluster(topic)
     val leaderIdOpt = zkUtils.getLeaderForPartition(topic, 0)
     assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+    val follower = servers.filter(_.config.brokerId != leaderIdOpt.get).last
     val newPartition = new TopicPartition(topic, 1)
+    // capture the brokers before we shutdown so that we don't fail validation in `addPartitions`
+    val brokers = AdminUtils.getBrokerMetadatas(zkUtils)
     follower.shutdown()
+    // wait until the broker has been removed from ZK to reduce non-determinism
+    TestUtils.waitUntilTrue(() => zkUtils.getBrokerInfo(follower.config.brokerId).isEmpty,
+      s"Follower ${follower.config.brokerId} was not removed from ZK")
     // add partitions to topic
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, 2, Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))), false)
+    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, brokers, 2,
+      Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
     follower.startup()
@@ -176,7 +182,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     AdminUtils.deleteTopic(zkUtils, topic)
     // add partitions to topic
     val newPartition = new TopicPartition(topic, 1)
-    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, 2, Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
+    AdminUtils.addPartitions(zkUtils, topic, expectedReplicaAssignment, AdminUtils.getBrokerMetadatas(zkUtils), 2,
+      Some(Map(1 -> Seq(0, 1, 2), 2 -> Seq(0, 1, 2))))
     TestUtils.verifyTopicDeletion(zkUtils, topic, 1, servers)
     // verify that new partition doesn't exist on any broker either
     assertTrue("Replica logs not deleted after delete topic is complete",
@@ -275,11 +282,9 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
   }
 
   private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true): Seq[KafkaServer] = {
-
-    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
-    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", deleteTopicEnabled.toString)
-    )
-    createTestTopicAndCluster(topic,brokerConfigs)
+    val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown = false)
+    brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString))
+    createTestTopicAndCluster(topic, brokerConfigs)
   }
 
   private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer] = {


Mime
View raw message