kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1086 Improve GetOffsetShell to find metadata automatically; reviewed by Jun Rao
Date Tue, 15 Oct 2013 17:01:51 GMT
Updated Branches:
  refs/heads/trunk e351039b1 -> a160f1023


KAFKA-1086 Improve GetOffsetShell to find metadata automatically; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a160f102
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a160f102
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a160f102

Branch: refs/heads/trunk
Commit: a160f1023b23c7d9d79c7970a72f48ae883dfe07
Parents: e351039
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue Oct 15 09:58:08 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Oct 15 09:58:08 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/GetOffsetShell.scala | 63 +++++++++++++++-----
 kafka-patch-review.py                           |  2 +-
 2 files changed, 48 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a160f102/core/src/main/scala/kafka/tools/GetOffsetShell.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index 2b9438a..4d1457f 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -23,25 +23,26 @@ import joptsimple._
 import java.net.URI
 import kafka.api.{PartitionOffsetRequestInfo, OffsetRequest}
 import kafka.common.TopicAndPartition
+import kafka.client.ClientUtils
 
 
 object GetOffsetShell {
 
   def main(args: Array[String]): Unit = {
     val parser = new OptionParser
-    val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect
to.")
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and
port of the server to connect to.")
                            .withRequiredArg
-                           .describedAs("kafka://hostname:port")
+                           .describedAs("hostname:port,...,hostname:port")
                            .ofType(classOf[String])
     val topicOpt = parser.accepts("topic", "REQUIRED: The topic to get offset from.")
                            .withRequiredArg
                            .describedAs("topic")
                            .ofType(classOf[String])
-    val partitionOpt = parser.accepts("partition", "partition id")
+    val partitionOpt = parser.accepts("partitions", "comma separated list of partition ids.
If not specified, it will find offsets for all partitions")
                            .withRequiredArg
-                           .describedAs("partition id")
-                           .ofType(classOf[java.lang.Integer])
-                           .defaultsTo(0)
+                           .describedAs("partition ids")
+                           .ofType(classOf[String])
+                           .defaultsTo("")
     val timeOpt = parser.accepts("time", "timestamp of the offsets before that")
                            .withRequiredArg
                            .describedAs("timestamp/-1(latest)/-2(earliest)")
@@ -51,10 +52,15 @@ object GetOffsetShell {
                            .describedAs("count")
                            .ofType(classOf[java.lang.Integer])
                            .defaultsTo(1)
+    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request
waits.")
+                           .withRequiredArg
+                           .describedAs("ms")
+                           .ofType(classOf[java.lang.Integer])
+                           .defaultsTo(1000)
 
     val options = parser.parse(args : _*)
 
-    for(arg <- List(urlOpt, topicOpt, timeOpt)) {
+    for(arg <- List(brokerListOpt, topicOpt, timeOpt)) {
       if(!options.has(arg)) {
         System.err.println("Missing required argument \"" + arg + "\"")
         parser.printHelpOn(System.err)
@@ -62,17 +68,42 @@ object GetOffsetShell {
       }
     }
 
-    val url = new URI(options.valueOf(urlOpt))
+    val clientId = "GetOffsetShell"
+    val metadataTargetBrokers = ClientUtils.parseBrokerList(options.valueOf(brokerListOpt))
     val topic = options.valueOf(topicOpt)
-    val partition = options.valueOf(partitionOpt).intValue
+    var partitionList = options.valueOf(partitionOpt)
     var time = options.valueOf(timeOpt).longValue
     val nOffsets = options.valueOf(nOffsetsOpt).intValue
-    val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000, "GetOffsetShell")
-    val topicAndPartition = TopicAndPartition(topic, partition)
-    val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time,
nOffsets)))
-    val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
-    println("get " + offsets.length + " results")
-    for (offset <- offsets)
-      println(offset)
+    val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
+
+    val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers,
clientId, maxWaitMs).topicsMetadata
+    if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
+      System.err.println(("Error: no valid topic metadata for topic: %s, " + " probably the
topic does not exist, run ").format(topic) +
+        "kafka-list-topic.sh to verify")
+      System.exit(1)
+    }
+    val partitions =
+      if(partitionList == "") {
+        topicsMetadata.head.partitionsMetadata.map(_.partitionId)
+      } else {
+        partitionList.mkString(",").map(_.toInt)
+      }
+    partitions.foreach { partitionId =>
+      val partitionMetadataOpt = topicsMetadata.head.partitionsMetadata.find(_.partitionId
== partitionId)
+      partitionMetadataOpt match {
+        case Some(metadata) =>
+          metadata.leader match {
+            case Some(leader) =>
+              val consumer = new SimpleConsumer(leader.host, leader.port, 10000, 100000,
clientId)
+              val topicAndPartition = TopicAndPartition(topic, partitionId)
+              val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(time,
nOffsets)))
+              val offsets = consumer.getOffsetsBefore(request).partitionErrorAndOffsets(topicAndPartition).offsets
+
+              println("%s:%d:%s".format(topic, partitionId, offsets.mkString(",")))
+            case None => System.err.println("Error: partition %d does not have a leader.
Skip getting offsets".format(partitionId))
+          }
+        case None => System.err.println("Error: partition %d does not exist".format(partitionId))
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a160f102/kafka-patch-review.py
----------------------------------------------------------------------
diff --git a/kafka-patch-review.py b/kafka-patch-review.py
index f1d5192..ab8787a 100644
--- a/kafka-patch-review.py
+++ b/kafka-patch-review.py
@@ -90,7 +90,7 @@ def main():
 
   comment="Created reviewboard " 
   if not opt.reviewboard:
-    print 'Created a new reviewboard ',rb_url
+    print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch
   else:
     print 'Updated reviewboard',opt.reviewboard
     comment="Updated reviewboard "


Mime
View raw message