kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5856; AdminClient.createPartitions() follow up
Date Wed, 04 Oct 2017 18:00:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 2a1b39ef1 -> c6e5a32d0


KAFKA-5856; AdminClient.createPartitions() follow up

- Improve tests and javadoc (including expected exceptions)
- Return correct authorization error if no describe topic
permission

Author: Tom Bentley <tbentley@redhat.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>

Closes #3937 from tombentley/KAFKA-5856-AdminClient.createPartitions-follow-up


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c6e5a32d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c6e5a32d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c6e5a32d

Branch: refs/heads/trunk
Commit: c6e5a32d0464622ced4b7887923ca37c717b9e74
Parents: 2a1b39e
Author: Tom Bentley <tbentley@redhat.com>
Authored: Wed Oct 4 18:57:26 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Oct 4 18:57:31 2017 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/admin/AdminClient.java |  42 ++-
 .../common/errors/InvalidTopicException.java    |   4 +
 .../UnknownTopicOrPartitionException.java       |   6 +-
 .../src/main/scala/kafka/admin/AdminUtils.scala |   3 +-
 .../main/scala/kafka/server/AdminManager.scala  |  16 +-
 .../kafka/server/DelayedCreatePartitions.scala  |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  13 +-
 .../kafka/api/AdminClientIntegrationTest.scala  | 332 ++++++++++++-------
 .../kafka/api/AuthorizerIntegrationTest.scala   |  39 ++-
 9 files changed, 302 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 636317c..fd695f9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -453,11 +453,12 @@ public abstract class AdminClient implements AutoCloseable {
     public abstract DescribeReplicaLogDirsResult describeReplicaLogDirs(Collection<TopicPartitionReplica>
replicas, DescribeReplicaLogDirsOptions options);
 
     /**
-     * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
-     * according to the corresponding values.
+     * <p>Increase the number of partitions of the topics given as the keys of {@code
newPartitions}
+     * according to the corresponding values. <strong>If partitions are increased for
a topic that has a key,
+     * the partition logic or ordering of the messages will be affected.</strong></p>
      *
-     * This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)}
with default options.
-     * See the overload for more details.
+     * <p>This is a convenience method for {@link #createPartitions(Map, CreatePartitionsOptions)}
with default options.
+     * See the overload for more details.</p>
      *
      * @param newPartitions The topics which should have new partitions created, and corresponding
parameters
      *                      for the created partitions.
@@ -468,17 +469,36 @@ public abstract class AdminClient implements AutoCloseable {
     }
 
     /**
-     * Increase the number of partitions of the topics given as the keys of {@code newPartitions}
-     * according to the corresponding values.
+     * <p>Increase the number of partitions of the topics given as the keys of {@code
newPartitions}
+     * according to the corresponding values. <strong>If partitions are increased for
a topic that has a key,
+     * the partition logic or ordering of the messages will be affected.</strong></p>
      *
-     * This operation is not transactional so it may succeed for some topics while fail for
others.
+     * <p>This operation is not transactional so it may succeed for some topics while
fail for others.</p>
      *
-     * It may take several seconds after this method returns
+     * <p>It may take several seconds after this method returns
      * success for all the brokers to become aware that the partitions have been created.
      * During this time, {@link AdminClient#describeTopics(Collection)}
-     * may not return information about the new partitions.
-     *
-     * This operation is supported by brokers with version 1.0.0 or higher.
+     * may not return information about the new partitions.</p>
+     *
+     * <p>This operation is supported by brokers with version 1.0.0 or higher.</p>
+     *
+     * <p>The following exceptions can be anticipated when calling {@code get()} on
the futures obtained from the
+     * {@link CreatePartitionsResult#values() values()} method of the returned {@code CreatePartitionsResult}</p>
+     * <ul>
+     *     <li>{@link org.apache.kafka.common.errors.AuthorizationException}
+     *     if the authenticated user is not authorized to alter the topic</li>
+     *     <li>{@link org.apache.kafka.common.errors.TimeoutException}
+     *     if the request was not completed in within the given {@link CreatePartitionsOptions#timeoutMs()}.</li>
+     *     <li>{@link org.apache.kafka.common.errors.ReassignmentInProgressException}
+     *     if a partition reassignment is currently in progress</li>
+     *     <li>{@link org.apache.kafka.common.errors.BrokerNotAvailableException}
+     *     if the requested {@link NewPartitions#assignments()} contain a broker that is
currently unavailable.</li>
+     *     <li>{@link org.apache.kafka.common.errors.InvalidReplicationFactorException}
+     *     if no {@link NewPartitions#assignments()} are given and it is impossible for the
broker to assign
+     *     replicas with the topics replication factor.</li>
+     *     <li>Subclasses of {@link org.apache.kafka.common.KafkaException}
+     *     if the request is invalid in some way.</li>
+     * </ul>
      *
      * @param newPartitions The topics which should have new partitions created, and corresponding
parameters
      *                      for the created partitions.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
index 5c7b2be..f79e9a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidTopicException.java
@@ -18,6 +18,10 @@ package org.apache.kafka.common.errors;
 
 /**
  * The client has attempted to perform an operation on an invalid topic.
+ * For example the topic name is too long, contains invalid characters etc.
+ * This exception is not retriable because the operation won't suddenly become valid.
+ *
+ * @see UnknownTopicOrPartitionException
  */
 public class InvalidTopicException extends ApiException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
index 0f2a562..6d10945 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/UnknownTopicOrPartitionException.java
@@ -17,7 +17,11 @@
 package org.apache.kafka.common.errors;
 
 /**
- * This topic/partition doesn't exist
+ * This topic/partition doesn't exist.
+ * This exception is used in contexts where a topic doesn't seem to exist based on possibly
stale metadata.
+ * This exception is retriable because the topic or partition might subsequently be created.
+ *
+ * @see InvalidTopicException
  */
 public class UnknownTopicOrPartitionException extends InvalidMetadataException {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index ee987f1..32cab2a 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -304,6 +304,7 @@ object AdminUtils extends Logging with AdminUtilities {
       AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment,
update = true)
     }
     proposedAssignment
+
   }
 
   /**
@@ -354,7 +355,7 @@ object AdminUtils extends Logging with AdminUtilities {
       val repFactors = sortedBadRepFactors.map { case (_, rf) => rf }
       throw new InvalidReplicaAssignmentException(s"Inconsistent replication factor between
partitions, " +
         s"partition 0 has ${existingAssignmentPartition0.size} while partitions [${partitions.mkString(",
")}] have " +
-        s"replication factors [${repFactors.mkString(", ")}], respectively")
+        s"replication factors [${repFactors.mkString(", ")}], respectively.")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 2c83c1b..aefdefd 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -122,15 +122,15 @@ class AdminManager(val config: KafkaConfig,
             else
               AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments,
configs, update = false)
         }
-        CreateTopicMetadata(topic, assignments, ApiError.NONE)
+        CreatePartitionsMetadata(topic, assignments, ApiError.NONE)
       } catch {
         // Log client errors at a lower level than unexpected exceptions
         case e@ (_: PolicyViolationException | _: ApiException) =>
           info(s"Error processing create topic request for topic $topic with arguments $arguments",
e)
-          CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
         case e: Throwable =>
           error(s"Error processing create topic request for topic $topic with arguments $arguments",
e)
-          CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map(), ApiError.fromThrowable(e))
       }
     }
 
@@ -219,7 +219,7 @@ class AdminManager(val config: KafkaConfig,
           case (topicPartition, replicas) => topicPartition.partition -> replicas
         }
         if (existingAssignment.isEmpty)
-          throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist")
+          throw new UnknownTopicOrPartitionException(s"The topic '$topic' does not exist.")
 
         val oldNumPartitions = existingAssignment.size
         val newNumPartitions = newPartition.totalCount
@@ -238,7 +238,7 @@ class AdminManager(val config: KafkaConfig,
               s"Unknown broker(s) in replica assignment: ${unknownBrokers.mkString(", ")}.")
 
           if (assignments.size != numPartitionsIncrement)
-            throw new InvalidRequestException(
+            throw new InvalidReplicaAssignmentException(
               s"Increasing the number of partitions by $numPartitionsIncrement " +
                 s"but ${assignments.size} assignments provided.")
 
@@ -249,12 +249,12 @@ class AdminManager(val config: KafkaConfig,
 
         val updatedReplicaAssignment = AdminUtils.addPartitions(zkUtils, topic, existingAssignment,
allBrokers,
           newPartition.totalCount, reassignment, validateOnly = validateOnly)
-        CreateTopicMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
+        CreatePartitionsMetadata(topic, updatedReplicaAssignment, ApiError.NONE)
       } catch {
         case e: AdminOperationException =>
-          CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e))
         case e: ApiException =>
-          CreateTopicMetadata(topic, Map.empty, ApiError.fromThrowable(e))
+          CreatePartitionsMetadata(topic, Map.empty, ApiError.fromThrowable(e))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
index 81a096e..0a99483 100644
--- a/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreatePartitions.scala
@@ -26,13 +26,13 @@ import scala.collection._
 /**
   * The create metadata maintained by the delayed create topic or create partitions operations.
   */
-case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error:
ApiError)
+case class CreatePartitionsMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]],
error: ApiError)
 
 /**
   * A delayed create topic or create partitions operation that is stored in the topic purgatory.
   */
 class DelayedCreatePartitions(delayMs: Long,
-                              createMetadata: Seq[CreateTopicMetadata],
+                              createMetadata: Seq[CreatePartitionsMetadata],
                               adminManager: AdminManager,
                               responseCallback: Map[String, ApiError] => Unit)
   extends DelayedOperation(delayMs) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 13959b8..c171aaa 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1367,12 +1367,14 @@ class KafkaApis(val requestChannel: RequestChannel,
       val (authorized, unauthorized) = notDuped.partition { case (topic, _) =>
         authorize(request.session, Alter, new Resource(Topic, topic))
       }
+
       val (queuedForDeletion, valid) = authorized.partition { case (topic, _) =>
         controller.topicDeletionManager.isTopicQueuedUpForDeletion(topic)
+
       }
 
-      val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic
in request")) ++
-        unauthorized.keySet.map(_ -> new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null))
++
+      val errors = dupes.map(_ -> new ApiError(Errors.INVALID_REQUEST, "Duplicate topic
in request.")) ++
+        unauthorized.keySet.map( topic => topic -> createPartitionsAuthorizationApiError(request.session,
topic) ) ++
         queuedForDeletion.keySet.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION,
"The topic is queued for deletion."))
 
       adminManager.createPartitions(createPartitionsRequest.timeout, valid, createPartitionsRequest.validateOnly,
@@ -1380,6 +1382,13 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  private def createPartitionsAuthorizationApiError(session: RequestChannel.Session, topic:
String): ApiError = {
+    if (authorize(session, Describe, new Resource(Topic, topic)))
+      new ApiError(Errors.TOPIC_AUTHORIZATION_FAILED, null)
+    else
+      new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, null)
+  }
+
   def handleDeleteTopicsRequest(request: RequestChannel.Request) {
     val deleteTopicRequest = request.body[DeleteTopicsRequest]
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index e916efa..a18b217 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -376,158 +376,235 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with
Logging {
     assertEquals(1, client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions.size)
 
     val validateOnly = new CreatePartitionsOptions().validateOnly(true)
+    val actuallyDoIt = new CreatePartitionsOptions().validateOnly(false)
 
-    // assert that a validateOnly request doesn't increase the number of partitions
+    def partitions(topic: String) =
+      client.describeTopics(Set(topic).asJava).values.get(topic).get.partitions
+
+    def numPartitions(topic: String) =
+      partitions(topic).size
+
+    // validateOnly: try creating a new partition (no assignments), to bring the total to
3 partitions
     var alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(2)).asJava, validateOnly)
+      NewPartitions.increaseTo(3)).asJava, validateOnly)
     var altered = alterResult.values.get(topic1).get
-    // assert that the topics still has 1 partition
-    assertEquals(1, client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions.size)
+    assertEquals(1, numPartitions(topic1))
 
     // try creating a new partition (no assignments), to bring the total to 3 partitions
     alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(3)).asJava)
+      NewPartitions.increaseTo(3)).asJava, actuallyDoIt)
     altered = alterResult.values.get(topic1).get
-    // assert that the topics now has 2 partitions
-    var actualPartitions = client.describeTopics(Set(topic1).asJava).values.get(topic1).get.partitions
-    assertEquals(3, actualPartitions.size)
+    assertEquals(3, numPartitions(topic1))
+
+    // validateOnly: now try creating a new partition (with assignments), to bring the total
to 3 partitions
+    val newPartition2Assignments = asList[util.List[Integer]](asList(0, 1), asList(1, 2))
+    alterResult = client.createPartitions(Map(topic2 ->
+      NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, validateOnly)
+    altered = alterResult.values.get(topic2).get
+    assertEquals(1, numPartitions(topic2))
 
     // now try creating a new partition (with assignments), to bring the total to 3 partitions
     alterResult = client.createPartitions(Map(topic2 ->
-      NewPartitions.increaseTo(3, asList(asList(0, 1), asList(1, 2)))).asJava)
+      NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, actuallyDoIt)
     altered = alterResult.values.get(topic2).get
-    // assert that the topics now has 3 partitions
-    actualPartitions = client.describeTopics(Set(topic2).asJava).values.get(topic2).get.partitions
-    assertEquals(3, actualPartitions.size)
-    assertEquals(Seq(0, 1), actualPartitions.get(1).replicas.asScala.map(_.id))
-    assertEquals(Seq(1, 2), actualPartitions.get(2).replicas.asScala.map(_.id))
+    var actualPartitions2 = partitions(topic2)
+    assertEquals(3, actualPartitions2.size)
+    assertEquals(Seq(0, 1), actualPartitions2.get(1).replicas.asScala.map(_.id).toList)
+    assertEquals(Seq(1, 2), actualPartitions2.get(2).replicas.asScala.map(_.id).toList)
+
+    // loop over error cases calling with+without validate-only
+    for (option <- Seq(validateOnly, actuallyDoIt)) {
+      val desc = if (option.validateOnly()) "validateOnly" else "validateOnly=false"
+
+      // try a newCount which would be a decrease
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(1)).asJava, option)
+      try {
+        alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidPartitionsException when newCount is a decrease")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic currently has 3 partitions, which is higher than the
requested 1.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try a newCount which would be a decrease
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(1)).asJava, validateOnly)
-    try {
-      alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when newCount is a decrease")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic currently has 3 partitions, which is higher than the requested
1.", e.getCause.getMessage)
-    }
+      // try a newCount which would be a noop (without assignment)
+      alterResult = client.createPartitions(Map(topic2 ->
+        NewPartitions.increaseTo(3)).asJava, option)
+      try {
+        alterResult.values.get(topic2).get
+        fail(s"$desc: Expect InvalidPartitionsException when requesting a noop")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic2))
+      }
 
-    // try a newCount which would be a noop
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(3)).asJava, validateOnly)
-    try {
-      alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when newCount == oldCount")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic already has 3 partitions.", e.getCause.getMessage)
-    }
+      // try a newCount which would be a noop (where the assignment matches current state)
+      alterResult = client.createPartitions(Map(topic2 ->
+        NewPartitions.increaseTo(3, newPartition2Assignments)).asJava, option)
+      try {
+        alterResult.values.get(topic2).get
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic2))
+      }
 
-    // try a bad topic name
-    val unknownTopic = "an-unknown-topic"
-    alterResult = client.createPartitions(Map(unknownTopic ->
-      NewPartitions.increaseTo(2)).asJava, validateOnly)
-    try {
-      alterResult.values.get(unknownTopic).get
-      fail("Expect InvalidTopicException when using an unknown topic")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
-        assertEquals("The topic 'an-unknown-topic' does not exist", e.getCause.getMessage)
-    }
+      // try a newCount which would be a noop (where the assignment doesn't match current
state)
+      alterResult = client.createPartitions(Map(topic2 ->
+        NewPartitions.increaseTo(3, newPartition2Assignments.asScala.reverse.toList.asJava)).asJava,
option)
+      try {
+        alterResult.values.get(topic2).get
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic already has 3 partitions.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic2))
+      }
 
-    // try an invalid newCount
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(-22)).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when newCount is invalid")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
-        assertEquals("Topic currently has 3 partitions, which is higher than the requested
-22.",
-          e.getCause.getMessage)
-    }
+      // try a bad topic name
+      val unknownTopic = "an-unknown-topic"
+      alterResult = client.createPartitions(Map(unknownTopic ->
+        NewPartitions.increaseTo(2)).asJava, option)
+      try {
+        alterResult.values.get(unknownTopic).get
+        fail(s"$desc: Expect InvalidTopicException when using an unknown topic")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[UnknownTopicOrPartitionException])
+          assertEquals(desc, "The topic 'an-unknown-topic' does not exist.", e.getCause.getMessage)
+      }
 
-    // try assignments where the number of brokers != replication factor
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidPartitionsException when #brokers != replication factor")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Inconsistent replication factor between partitions, partition 0 has
1 " +
-          "while partitions [3] have replication factors [2], respectively",
-          e.getCause.getMessage)
-    }
+      // try an invalid newCount
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(-22)).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidPartitionsException when newCount is invalid")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidPartitionsException])
+          assertEquals(desc, "Topic currently has 3 partitions, which is higher than the
requested -22.",
+            e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try #assignments incompatible with the increase
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidRequestException when #assignments != newCount - oldCount")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidRequestException])
-        assertEquals("Increasing the number of partitions by 1 but 2 assignments provided.",
e.getCause.getMessage)
-    }
+      // try assignments where the number of brokers != replication factor
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(1, 2)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidPartitionsException when #brokers != replication factor")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Inconsistent replication factor between partitions, partition
0 has 1 " +
+            "while partitions [3] have replication factors [2], respectively.",
+            e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try with duplicate brokers in assignments
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidReplicaAssignmentException when assignments has duplicate brokers")
-    } catch {
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Duplicate brokers not allowed in replica assignment: 1, 1 for partition
id 3.",
-          e.getCause.getMessage)
-    }
+      // try #assignments < with the increase
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(6, asList(asList(1)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount
- oldCount")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Increasing the number of partitions by 3 but 1 assignments
provided.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try assignments with differently sized inner lists
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidReplicaAssignmentException when assignments have differently sized
inner lists")
-    } catch {
-      case e: ExecutionException =>
-        e.printStackTrace()
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Inconsistent replication factor between partitions, partition 0 has
1 " +
-          "while partitions [4] have replication factors [2], respectively", e.getCause.getMessage)
-    }
+      // try #assignments > with the increase
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(1), asList(2)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when #assignments != newCount
- oldCount")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Increasing the number of partitions by 1 but 2 assignments
provided.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
 
-    // try assignments with unknown brokers
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, asList(asList(12)))).asJava, validateOnly)
-    try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidReplicaAssignmentException when assignments contains an unknown
broker")
-    } catch {
-      case e: ExecutionException =>
-        e.printStackTrace()
-        assertTrue(e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
-        assertEquals("Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage)
+      // try with duplicate brokers in assignments
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(1, 1)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments has duplicate
brokers")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Duplicate brokers not allowed in replica assignment: 1, 1 for
partition id 3.",
+            e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
+
+      // try assignments with differently sized inner lists
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(5, asList(asList(1), asList(1, 0)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments have differently
sized inner lists")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Inconsistent replication factor between partitions, partition
0 has 1 " +
+            "while partitions [4] have replication factors [2], respectively.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
+
+      // try assignments with unknown brokers
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, asList(asList(12)))).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments contains
an unknown broker")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Unknown broker(s) in replica assignment: 12.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
+
+      // try with empty assignments
+      alterResult = client.createPartitions(Map(topic1 ->
+        NewPartitions.increaseTo(4, Collections.emptyList())).asJava, option)
+      try {
+        altered = alterResult.values.get(topic1).get
+        fail(s"$desc: Expect InvalidReplicaAssignmentException when assignments is empty")
+      } catch {
+        case e: ExecutionException =>
+          assertTrue(desc, e.getCause.isInstanceOf[InvalidReplicaAssignmentException])
+          assertEquals(desc, "Increasing the number of partitions by 1 but 0 assignments
provided.", e.getCause.getMessage)
+          assertEquals(desc, 3, numPartitions(topic1))
+      }
     }
 
-    // try with empty assignments
-    alterResult = client.createPartitions(Map(topic1 ->
-      NewPartitions.increaseTo(4, Collections.emptyList())).asJava, validateOnly)
+    // a mixed success, failure response
+    alterResult = client.createPartitions(Map(
+      topic1 -> NewPartitions.increaseTo(4),
+      topic2 -> NewPartitions.increaseTo(2)).asJava, actuallyDoIt)
+    // assert that the topic1 now has 4 partitions
+    altered = alterResult.values.get(topic1).get
+    assertEquals(4, numPartitions(topic1))
     try {
-      altered = alterResult.values.get(topic1).get
-      fail("Expect InvalidRequestException when assignments is empty")
+      altered = alterResult.values.get(topic2).get
     } catch {
       case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidRequestException])
-        assertEquals("Increasing the number of partitions by 1 but 0 assignments provided.",
e.getCause.getMessage)
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[InvalidPartitionsException])
+        assertEquals("Topic currently has 3 partitions, which is higher than the requested
2.", e.getCause.getMessage)
+        // assert that the topic2 still has 3 partitions
+        assertEquals(3, numPartitions(topic2))
     }
 
     // finally, try to add partitions to a topic queued for deletion
@@ -543,7 +620,6 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging
{
         assertTrue(e.getCause.isInstanceOf[InvalidTopicException])
         assertEquals("The topic is queued for deletion.", e.getCause.getMessage)
     }
-
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/c6e5a32d/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 728e958..d07d08e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -28,6 +28,7 @@ import kafka.network.SocketServer
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.NewPartitions
 import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
@@ -86,12 +87,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Read)))
   val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Write)))
   val topicDescribeAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Describe)))
+  val topicAlterAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow,
Acl.WildCardHost, Alter)))
   val topicDeleteAcl = Map(deleteTopicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Delete)))
   val topicDescribeConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, DescribeConfigs)))
   val topicAlterConfigsAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, AlterConfigs)))
   val transactionIdWriteAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Write)))
   val transactionalIdDescribeAcl = Map(transactionalIdResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS,
Allow, Acl.WildCardHost, Describe)))
 
+
   val consumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
   val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
@@ -142,7 +145,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
       ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse],
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse],
-      ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse]
+      ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse],
+      ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -181,7 +185,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error),
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)),
     ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
-      if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED)
+      if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED),
+    ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1
== topic).get._2.error)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -217,7 +222,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
     ApiKeys.DELETE_ACLS -> clusterAlterAcl,
     ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl,
-    ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl
+    ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl,
+    ApiKeys.CREATE_PARTITIONS -> topicAlterAcl
+
   )
 
   @Before
@@ -321,6 +328,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
+  private def createPartitionsRequest = {
+    new CreatePartitionsRequest.Builder(
+      Map(topic -> NewPartitions.increaseTo(10)).asJava, 10000, true
+    ).build()
+  }
+
   private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build()
 
   private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build()
@@ -399,7 +412,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.DELETE_ACLS -> deleteAclsRequest,
       ApiKeys.DESCRIBE_ACLS -> describeAclsRequest,
       ApiKeys.ALTER_REPLICA_LOG_DIRS -> alterReplicaLogDirsRequest,
-      ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest
+      ApiKeys.DESCRIBE_LOG_DIRS -> describeLogDirsRequest,
+      ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {
@@ -971,6 +985,23 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     assertEquals(Errors.NONE, deleteRecordsResponse.responses.asScala.head._2.error)
   }
 
+  @Test
+  def testUnauthorizedCreatePartitions() {
+    val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
+    val version = ApiKeys.CREATE_PARTITIONS.latestVersion
+    val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
+    assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, createPartitionsResponse.errors.asScala.head._2.error)
+  }
+
+  @Test
+  def testCreatePartitionsWithWildCardAuth() {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)),
new Resource(Topic, "*"))
+    val response = connectAndSend(createPartitionsRequest, ApiKeys.CREATE_PARTITIONS)
+    val version = ApiKeys.CREATE_PARTITIONS.latestVersion
+    val createPartitionsResponse = CreatePartitionsResponse.parse(response, version)
+    assertEquals(Errors.NONE, createPartitionsResponse.errors.asScala.head._2.error)
+  }
+
   @Test(expected = classOf[TransactionalIdAuthorizationException])
   def testTransactionalProducerInitTransactionsNoWriteTransactionalIdAcl(): Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)),
transactionalIdResource)


Mime
View raw message