kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4291; TopicCommand --describe should show topics marked for deletion
Date Sat, 03 Jun 2017 09:31:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 b9f7aa52d -> 45b326d93


KAFKA-4291; TopicCommand --describe should show topics marked for deletion

Developed with edoardocomar

Author: Mickael Maison <mickael.maison@gmail.com>

Reviewers: Vahid Hashemian <vahidhashemian@us.ibm.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2011 from mimaison/KAFKA-4291

(cherry picked from commit a598c4d26fb06acb455e14e84468306dfa6e1c8b)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45b326d9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45b326d9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45b326d9

Branch: refs/heads/0.11.0
Commit: 45b326d9365c767378018357ca5993edceffdace
Parents: b9f7aa5
Author: Mickael Maison <mickael.maison@gmail.com>
Authored: Sat Jun 3 10:07:56 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Sat Jun 3 10:31:15 2017 +0100

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 18 +++++++---
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  4 +++
 .../unit/kafka/admin/DeleteTopicTest.scala      |  6 ++--
 .../unit/kafka/admin/TopicCommandTest.scala     | 37 ++++++++++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala | 12 ++++++-
 5 files changed, 68 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index efb9237..9e516b0 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -153,7 +153,7 @@ object TopicCommand extends Logging {
   def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
     val topics = getTopics(zkUtils, opts)
     for(topic <- topics) {
-      if (zkUtils.pathExists(getDeleteTopicPath(topic))) {
+      if (zkUtils.isTopicMarkedForDeletion(topic)) {
         println("%s - marked for deletion".format(topic))
       } else {
         println(topic)
@@ -199,14 +199,17 @@ object TopicCommand extends Logging {
         case Some(topicPartitionAssignment) =>
           val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
           val describePartitions: Boolean = !reportOverriddenConfigs
-          val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) =>
m1._1 < m2._1)
+          val sortedPartitions = topicPartitionAssignment.toSeq.sortBy(_._1)
+          val markedForDeletion = zkUtils.isTopicMarkedForDeletion(topic)
           if (describeConfigs) {
             val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
             if (!reportOverriddenConfigs || configs.nonEmpty) {
               val numPartitions = topicPartitionAssignment.size
               val replicationFactor = topicPartitionAssignment.head._2.size
-              println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
-                .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1
+ "=" + kv._2).mkString(",")))
+              val configsAsString = configs.map { case (k, v) => s"$k=$v" }.mkString(",")
+              val markedForDeletionString = if (markedForDeletion) "\tMarkedForDeletion:true"
else ""
+              println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s%s"
+                .format(topic, numPartitions, replicationFactor, configsAsString, markedForDeletionString))
             }
           }
           if (describePartitions) {
@@ -216,11 +219,16 @@ object TopicCommand extends Logging {
               if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions)
||
                   (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size)
||
                   (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get))))
{
+
+                val markedForDeletionString =
+                  if (markedForDeletion && !describeConfigs) "\tMarkedForDeletion:
true" else ""
                 print("\tTopic: " + topic)
                 print("\tPartition: " + partitionId)
                 print("\tLeader: " + (if(leader.isDefined) leader.get else "none"))
                 print("\tReplicas: " + assignedReplicas.mkString(","))
-                println("\tIsr: " + inSyncReplicas.mkString(","))
+                print("\tIsr: " + inSyncReplicas.mkString(","))
+                print(markedForDeletionString)
+                println()
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 899b7c3..76b9569 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -692,6 +692,10 @@ class ZkUtils(val zkClient: ZkClient,
     zkClient.exists(path)
   }
 
+  def isTopicMarkedForDeletion(topic: String): Boolean = {
+    pathExists(getDeleteTopicPath(topic))
+  }
+
   def getCluster(): Cluster = {
     val cluster = new Cluster
     val nodes = getChildrenParentMayNotExist(BrokerIdsPath)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 15018f5..7df3693 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -66,7 +66,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
       servers.filter(s => s.config.brokerId != follower.config.brokerId)
         .forall(_.getLogManager().getLog(topicPartition).isEmpty), "Replicas 0,1 have not
deleted log.")
     // ensure topic deletion is halted
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => zkUtils.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
     // restart follower replica
     follower.startup()
@@ -90,7 +90,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     controller.shutdown()
 
     // ensure topic deletion is halted
-    TestUtils.waitUntilTrue(() => zkUtils.pathExists(getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => zkUtils.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topic/test path deleted even when a replica is down")
 
     controller.startup()
@@ -310,7 +310,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     servers = createTestTopicAndCluster(topic, deleteTopicEnabled = false)
     // mark the topic for deletion
     AdminUtils.deleteTopic(zkUtils, "test")
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topic/%s path not deleted even if deleteTopic is disabled".format(topic))
     // verify that topic test is untouched
     assertTrue(servers.forall(_.getLogManager().getLog(topicPartition).isDefined))

http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index ad6cfa5..e72a4e3 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -187,4 +187,41 @@ class TopicCommandTest extends ZooKeeperTestHarness with Logging with
RackAwareT
     }
     checkReplicaDistribution(assignment, rackInfo, rackInfo.size, alteredNumPartitions, replicationFactor)
   }
+
+  @Test
+  def testDescribeAndListTopicsMarkedForDeletion() {
+    val brokers = List(0)
+    val topic = "testtopic"
+    val markedForDeletionDescribe = "MarkedForDeletion"
+    val markedForDeletionList = "marked for deletion"
+    TestUtils.createBrokersInZk(zkUtils, brokers)
+
+    val createOpts = new TopicCommandOptions(Array("--partitions", "1", "--replication-factor",
"1", "--topic", topic))
+    TopicCommand.createTopic(zkUtils, createOpts)
+
+    // delete the broker first, so when we attempt to delete the topic it gets into "marked
for deletion"
+    TestUtils.deleteBrokersInZk(zkUtils, brokers)
+    TopicCommand.deleteTopic(zkUtils, new TopicCommandOptions(Array("--topic", topic)))
+
+    // Test describe topics
+    def describeTopicsWithConfig() {
+      TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe")))
+    }
+    val outputWithConfig = TestUtils.grabConsoleOutput(describeTopicsWithConfig)
+    assertTrue(outputWithConfig.contains(topic) && outputWithConfig.contains(markedForDeletionDescribe))
+
+    def describeTopicsNoConfig() {
+      TopicCommand.describeTopic(zkUtils, new TopicCommandOptions(Array("--describe", "--unavailable-partitions")))
+    }
+    val outputNoConfig = TestUtils.grabConsoleOutput(describeTopicsNoConfig)
+    assertTrue(outputNoConfig.contains(topic) && outputNoConfig.contains(markedForDeletionDescribe))
+
+    // Test list topics
+    def listTopics() {
+      TopicCommand.listTopics(zkUtils, new TopicCommandOptions(Array("--list")))
+    }
+    val output = TestUtils.grabConsoleOutput(listTopics)
+    assertTrue(output.contains(topic) && output.contains(markedForDeletionList))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45b326d9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 572de9b..a0f4762 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1120,7 +1120,7 @@ object TestUtils extends Logging {
   def verifyTopicDeletion(zkUtils: ZkUtils, topic: String, numPartitions: Int, servers: Seq[KafkaServer])
{
     val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _))
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-    TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getDeleteTopicPath(topic)),
+    TestUtils.waitUntilTrue(() => !zkUtils.isTopicMarkedForDeletion(topic),
       "Admin path /admin/delete_topic/%s path not deleted even after a replica is restarted".format(topic))
     TestUtils.waitUntilTrue(() => !zkUtils.pathExists(getTopicPath(topic)),
       "Topic path /brokers/topics/%s not deleted after /admin/delete_topic/%s path is deleted".format(topic,
topic))
@@ -1424,6 +1424,16 @@ object TestUtils extends Logging {
     }
   }
 
+  /**
+   * Capture the console output during the execution of the provided function.
+   */
+  def grabConsoleOutput(f: => Unit) : String = {
+    val out = new ByteArrayOutputStream
+    try scala.Console.withOut(out)(f)
+    finally scala.Console.out.flush
+    out.toString
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {


Mime
View raw message