kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4457; Add BrokerApiVersionsCommand
Date Wed, 18 Jan 2017 23:49:04 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.2 dbca4a3b6 -> 866b33c16


KAFKA-4457; Add BrokerApiVersionsCommand

Author: Colin P. Mccabe <cmccabe@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>, Apurva Mehta <apurva.1618@gmail.com>,
Ismael Juma <ismael@juma.me.uk>

Closes #2184 from cmccabe/KAFKA-4457

(cherry picked from commit 4a6f2c6cc0647a08f016a2d712a01ec02630cf87)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/866b33c1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/866b33c1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/866b33c1

Branch: refs/heads/0.10.2
Commit: 866b33c16496ab654d59e69cb3e83587c52da673
Parents: dbca4a3
Author: Colin P. Mccabe <cmccabe@confluent.io>
Authored: Wed Jan 18 22:45:57 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Jan 18 23:49:00 2017 +0000

----------------------------------------------------------------------
 bin/kafka-broker-api-versions.sh                | 17 ++++
 .../apache/kafka/clients/NodeApiVersions.java   | 34 +++++++-
 .../kafka/clients/NodeApiVersionsTest.java      |  8 +-
 .../main/scala/kafka/admin/AdminClient.scala    | 49 +++++++-----
 .../kafka/admin/BrokerApiVersionsCommand.scala  | 81 ++++++++++++++++++++
 .../admin/BrokerApiVersionsCommandTest.scala    | 61 +++++++++++++++
 .../integration/kafka/api/AdminClientTest.scala | 19 +++++
 7 files changed, 244 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/bin/kafka-broker-api-versions.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-broker-api-versions.sh b/bin/kafka-broker-api-versions.sh
new file mode 100755
index 0000000..4f560a0
--- /dev/null
+++ b/bin/kafka-broker-api-versions.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+exec $(dirname $0)/kafka-run-class.sh kafka.admin.BrokerApiVersionsCommand "$@"

http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
index 6acbb63..b90009b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils;
 import java.util.Collection;
 import java.util.EnumMap;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.TreeMap;
 
 public class NodeApiVersions {
@@ -56,10 +57,21 @@ public class NodeApiVersions {
     }
 
     /**
+     * Convert the object to a string with no linebreaks.<p/>
+     *
      * This toString method is relatively expensive, so avoid calling it unless debug logging
is turned on.
      */
     @Override
     public String toString() {
+        return toString(false);
+    }
+
+    /**
+     * Convert the object to a string.
+     *
+     * @param lineBreaks True if we should add a linebreak after each api.
+     */
+    public String toString(boolean lineBreaks) {
         // The apiVersion collection may not be in sorted order.  We put it into
         // a TreeMap before printing it out to ensure that we always print in
         // ascending order.
@@ -73,11 +85,20 @@ public class NodeApiVersions {
             if (!apiKeysText.containsKey(apiKey.id)) {
                 StringBuilder bld = new StringBuilder();
                 bld.append(apiKey.name).append("(").
-                    append(apiKey.id).append("): ").append("UNSUPPORTED");
+                        append(apiKey.id).append("): ").append("UNSUPPORTED");
                 apiKeysText.put(apiKey.id, bld.toString());
             }
         }
-        return "{" + Utils.join(apiKeysText.values(), ", ") + "}";
+        String separator = lineBreaks ? ",\n\t" : ", ";
+        StringBuilder bld = new StringBuilder();
+        bld.append("(");
+        if (lineBreaks)
+            bld.append("\n\t");
+        bld.append(Utils.join(apiKeysText.values(), separator));
+        if (lineBreaks)
+            bld.append("\n");
+        bld.append(")");
+        return bld.toString();
     }
 
     private String apiVersionToText(ApiVersion apiVersion) {
@@ -106,4 +127,13 @@ public class NodeApiVersions {
         }
         return bld.toString();
     }
+
+    public ApiVersion apiVersion(ApiKeys apiKey) {
+        for (ApiVersion apiVersion : apiVersions) {
+            if (apiVersion.apiKey == apiKey.id) {
+                return apiVersion;
+            }
+        }
+        throw new NoSuchElementException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index b39a0aa..861a28f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -36,13 +36,13 @@ public class NodeApiVersionsTest {
         NodeApiVersions versions = new NodeApiVersions(
                 Collections.<ApiVersion>emptyList());
         StringBuilder bld = new StringBuilder();
-        String prefix = "{";
+        String prefix = "(";
         for (ApiKeys apiKey : ApiKeys.values()) {
             bld.append(prefix).append(apiKey.name).
                     append("(").append(apiKey.id).append("): UNSUPPORTED");
             prefix = ", ";
         }
-        bld.append("}");
+        bld.append(")");
         assertEquals(bld.toString(), versions.toString());
     }
 
@@ -59,7 +59,7 @@ public class NodeApiVersionsTest {
         }
         NodeApiVersions versions = new NodeApiVersions(versionList);
         StringBuilder bld = new StringBuilder();
-        String prefix = "{";
+        String prefix = "(";
         for (ApiKeys apiKey : ApiKeys.values()) {
             bld.append(prefix);
             if (apiKey == ApiKeys.CONTROLLED_SHUTDOWN_KEY) {
@@ -80,7 +80,7 @@ public class NodeApiVersionsTest {
             }
             prefix = ", ";
         }
-        bld.append("}");
+        bld.append(")");
         assertEquals(bld.toString(), versions.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 33089d1..680c5e1 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -16,6 +16,7 @@ import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
 import java.util.concurrent.atomic.AtomicInteger
 
+import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion
 import kafka.common.KafkaException
 import kafka.coordinator.GroupOverview
 import kafka.utils.Logging
@@ -28,11 +29,11 @@ import org.apache.kafka.common.network.Selector
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 class AdminClient(val time: Time,
                   val requestTimeoutMs: Int,
@@ -68,37 +69,42 @@ class AdminClient(val time: Time,
   def findCoordinator(groupId: String): Node = {
     val requestBuilder = new GroupCoordinatorRequest.Builder(groupId)
     val response = sendAnyNode(ApiKeys.GROUP_COORDINATOR, requestBuilder).asInstanceOf[GroupCoordinatorResponse]
-    Errors.forCode(response.errorCode()).maybeThrow()
-    response.node()
+    Errors.forCode(response.errorCode).maybeThrow()
+    response.node
   }
 
   def listGroups(node: Node): List[GroupOverview] = {
     val response = send(node, ApiKeys.LIST_GROUPS, new ListGroupsRequest.Builder()).asInstanceOf[ListGroupsResponse]
-    Errors.forCode(response.errorCode()).maybeThrow()
-    response.groups().asScala.map(group => GroupOverview(group.groupId(), group.protocolType())).toList
+    Errors.forCode(response.errorCode).maybeThrow()
+    response.groups.asScala.map(group => GroupOverview(group.groupId, group.protocolType)).toList
+  }
+
+  def getApiVersions(node: Node): List[ApiVersion] = {
+    val response = send(node, ApiKeys.API_VERSIONS, new ApiVersionsRequest.Builder()).asInstanceOf[ApiVersionsResponse]
+    Errors.forCode(response.errorCode).maybeThrow()
+    response.apiVersions.asScala.toList
   }
 
   private def findAllBrokers(): List[Node] = {
     val request = MetadataRequest.Builder.allTopics()
     val response = sendAnyNode(ApiKeys.METADATA, request).asInstanceOf[MetadataResponse]
-    val errors = response.errors()
+    val errors = response.errors
     if (!errors.isEmpty)
       debug(s"Metadata request contained errors: $errors")
-    response.cluster().nodes().asScala.toList
+    response.cluster.nodes.asScala.toList
   }
 
   def listAllGroups(): Map[Node, List[GroupOverview]] = {
-    findAllBrokers.map {
-      case broker =>
-        broker -> {
-          try {
-            listGroups(broker)
-          } catch {
-            case e: Exception =>
-              debug(s"Failed to find groups from broker $broker", e)
-              List[GroupOverview]()
-          }
+    findAllBrokers.map { broker =>
+      broker -> {
+        try {
+          listGroups(broker)
+        } catch {
+          case e: Exception =>
+            debug(s"Failed to find groups from broker $broker", e)
+            List[GroupOverview]()
         }
+      }
     }.toMap
   }
 
@@ -123,9 +129,14 @@ class AdminClient(val time: Time,
     if (response.hasError)
       throw response.error.exception
     response.maybeThrowFirstPartitionError
-    response.responseData().asScala.map { responseData => (responseData._1, responseData._2.offset)
}.toMap
+    response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset)
}.toMap
   }
 
+  def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] =
+    findAllBrokers.map { broker =>
+      broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava))
+    }.toMap
+
   /**
    * Case class used to represent a consumer of a consumer group
    */
@@ -252,6 +263,6 @@ object AdminClient {
       time,
       DefaultRequestTimeoutMs,
       highLevelClient,
-      bootstrapCluster.nodes().asScala.toList)
+      bootstrapCluster.nodes.asScala.toList)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
new file mode 100644
index 0000000..812bc9d
--- /dev/null
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.PrintStream
+import java.util.Properties
+
+import kafka.utils.CommandLineUtils
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.clients.CommonClientConfigs
+import joptsimple._
+
+import scala.util.{Failure, Success}
+
+/**
+ * A command for retrieving broker version information.
+ */
+object BrokerApiVersionsCommand {
+
+  def main(args: Array[String]): Unit = {
+    execute(args, System.out)
+  }
+
+  def execute(args: Array[String], out: PrintStream): Unit = {
+    val opts = new BrokerVersionCommandOptions(args)
+    val adminClient = createAdminClient(opts)
+    val brokerMap = adminClient.listAllBrokerVersionInfo()
+    brokerMap.foreach { case (broker, versionInfoOrError) =>
+      versionInfoOrError match {
+        case Success(v) => out.print(s"${broker} -> ${v.toString(true)}\n")
+        case Failure(v) => out.print(s"${broker} -> ERROR: ${v}\n")
+      }
+    }
+  }
+
+  private def createAdminClient(opts: BrokerVersionCommandOptions): AdminClient = {
+    val props = if (opts.options.has(opts.commandConfigOpt))
+      Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt))
+    else
+      new Properties()
+    props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt))
+    AdminClient.create(props)
+  }
+
+  class BrokerVersionCommandOptions(args: Array[String]) {
+    val BootstrapServerDoc = "REQUIRED: The server to connect to."
+    val CommandConfigDoc = "A property file containing configs to be passed to Admin Client."
+
+    val parser = new OptionParser
+    val commandConfigOpt = parser.accepts("command-config", CommandConfigDoc)
+                                 .withRequiredArg
+                                 .describedAs("command config property file")
+                                 .ofType(classOf[String])
+    val bootstrapServerOpt = parser.accepts("bootstrap-server", BootstrapServerDoc)
+                                   .withRequiredArg
+                                   .describedAs("server(s) to use for bootstrapping")
+                                   .ofType(classOf[String])
+    val options = parser.parse(args : _*)
+    checkArgs()
+
+    def checkArgs() {
+      // check required args
+      CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
new file mode 100644
index 0000000..ff93f22
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.admin
+
+import java.io.{ByteArrayOutputStream, PrintStream}
+import java.nio.charset.StandardCharsets
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.NodeApiVersions
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.ApiVersionsResponse
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.Test
+
+class BrokerApiVersionsCommandTest extends KafkaServerTestHarness {
+
+  def generateConfigs(): Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+
+  @Test
+  def checkBrokerApiVersionCommandOutput() {
+    val byteArrayOutputStream = new ByteArrayOutputStream
+    val printStream = new PrintStream(byteArrayOutputStream)
+    BrokerApiVersionsCommand.execute(Array("--bootstrap-server", brokerList), printStream)
+    val content = new String(byteArrayOutputStream.toByteArray, StandardCharsets.UTF_8)
+    val lineIter = content.split("\n").iterator
+    assertTrue(lineIter.hasNext)
+    assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next)
+    val nodeApiVersions = new NodeApiVersions(ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersions)
+    for (apiKey <- ApiKeys.values) {
+      val apiVersion = nodeApiVersions.apiVersion(apiKey)
+      val versionRangeStr =
+        if (apiVersion.minVersion == apiVersion.maxVersion) apiVersion.minVersion.toString
+        else s"${apiVersion.minVersion} to ${apiVersion.maxVersion}"
+      val terminator = if (apiKey == ApiKeys.values.last) "" else ","
+      val usableVersion = nodeApiVersions.usableVersion(apiKey)
+      val line = s"\t${apiKey.name}(${apiKey.id}): $versionRangeStr [usable: $usableVersion]$terminator"
+      assertTrue(lineIter.hasNext)
+      assertEquals(line, lineIter.next)
+    }
+    assertTrue(lineIter.hasNext)
+    assertEquals(")", lineIter.next)
+    assertFalse(lineIter.hasNext)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/866b33c1/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
index a62922c..1e2749f 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientTest.scala
@@ -24,6 +24,7 @@ import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.ApiKeys
 import org.junit.{Before, Test}
 import org.junit.Assert._
 
@@ -78,6 +79,24 @@ class AdminClientTest extends IntegrationTestHarness with Logging {
   }
 
   @Test
+  def testListAllBrokerVersionInfo() {
+    consumers.head.subscribe(Collections.singletonList(topic))
+    TestUtils.waitUntilTrue(() => {
+      consumers.head.poll(0)
+      !consumers.head.assignment.isEmpty
+    }, "Expected non-empty assignment")
+    val brokerVersionInfos = client.listAllBrokerVersionInfo
+    val brokers = brokerList.split(",")
+    assertEquals(brokers.size, brokerVersionInfos.size)
+    for ((node, tryBrokerVersionInfo) <- brokerVersionInfos) {
+      val hostStr = s"${node.host}:${node.port}"
+      assertTrue(s"Unknown host:port pair $hostStr in brokerVersionInfos", brokers.contains(hostStr))
+      val brokerVersionInfo = tryBrokerVersionInfo.get
+      assertEquals(0, brokerVersionInfo.usableVersion(ApiKeys.API_VERSIONS))
+    }
+  }
+
+  @Test
   def testGetConsumerGroupSummary() {
     consumers.head.subscribe(Collections.singletonList(topic))
     TestUtils.waitUntilTrue(() => {


Mime
View raw message