kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2924: support offsets topic in DumpLogSegments
Date Wed, 09 Dec 2015 05:48:22 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.9.0 1f98b0315 -> 78192b8be


KAFKA-2924: support offsets topic in DumpLogSegments

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Ismael Juma, Guozhang Wang

Closes #622 from hachikuji/KAFKA-2924

(cherry picked from commit 454d7d09014c0e6be3064a120c80261998341fed)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78192b8b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78192b8b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78192b8b

Branch: refs/heads/0.9.0
Commit: 78192b8becfe48b5fb63f750dde8eb2d12e65007
Parents: 1f98b03
Author: Jason Gustafson <jason@confluent.io>
Authored: Tue Dec 8 21:48:11 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 8 21:48:19 2015 -0800

----------------------------------------------------------------------
 .../coordinator/GroupMetadataManager.scala      |   6 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 123 ++++++++++++++++---
 2 files changed, 111 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/78192b8b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index f2ffa50..71d2338 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -847,7 +847,7 @@ object GroupMetadataManager {
    * @param buffer input byte-buffer
    * @return an GroupTopicPartition object
    */
-  private def readMessageKey(buffer: ByteBuffer): BaseKey = {
+  def readMessageKey(buffer: ByteBuffer): BaseKey = {
     val version = buffer.getShort
     val keySchema = schemaForKey(version)
     val key = keySchema.read(buffer).asInstanceOf[Struct]
@@ -876,7 +876,7 @@ object GroupMetadataManager {
    * @param buffer input byte-buffer
    * @return an offset-metadata object from the message
    */
-  private def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
+  def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
     if(buffer == null) { // tombstone
       null
     } else {
@@ -909,7 +909,7 @@ object GroupMetadataManager {
    * @param buffer input byte-buffer
    * @return a group metadata object from the message
    */
-  private def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata =
{
+  def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
     if(buffer == null) { // tombstone
       null
     } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/78192b8b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index fc11a2a..fd15014 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -18,15 +18,20 @@
 package kafka.tools
 
 import java.io._
-import kafka.message._
-import kafka.log._
-import kafka.utils._
-import collection.mutable
+import java.nio.ByteBuffer
+
 import joptsimple.OptionParser
+import kafka.coordinator.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
+import kafka.log._
+import kafka.message._
 import kafka.serializer.Decoder
-import kafka.utils.VerifiableProperties
+import kafka.utils.{VerifiableProperties, _}
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils
 
+import scala.collection.mutable
+
 object DumpLogSegments {
 
   def main(args: Array[String]) {
@@ -51,6 +56,8 @@ object DumpLogSegments {
                                .withOptionalArg()
                                .ofType(classOf[java.lang.String])
                                .defaultsTo("kafka.serializer.StringDecoder")
+    val offsetsOpt = parser.accepts("offsets-decoder", "if set, log data will be parsed as
offset data from __consumer_offsets topic")
+
 
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to
the console, useful for debugging a seemingly corrupt log segment.")
@@ -65,8 +72,13 @@ object DumpLogSegments {
     val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
     val isDeepIteration = if(options.has(deepIterationOpt)) true else false
   
-    val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt),
new VerifiableProperties)
-    val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt),
new VerifiableProperties)
+    val messageParser = if (options.has(offsetsOpt)) {
+      new OffsetsMessageParser
+    } else {
+      val valueDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(valueDecoderOpt),
new VerifiableProperties)
+      val keyDecoder: Decoder[_] = CoreUtils.createObject[Decoder[_]](options.valueOf(keyDecoderOpt),
new VerifiableProperties)
+      new DecoderMessageParser(keyDecoder, valueDecoder)
+    }
 
     val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
     val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
@@ -75,7 +87,7 @@ object DumpLogSegments {
       val file = new File(arg)
       if(file.getName.endsWith(Log.LogFileSuffix)) {
         println("Dumping " + file)
-        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize
, valueDecoder, keyDecoder)
+        dumpLog(file, print, nonConsecutivePairsForLogFilesMap, isDeepIteration, maxMessageSize
, messageParser)
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
         dumpIndex(file, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
@@ -124,15 +136,97 @@ object DumpLogSegments {
         println("offset: %d position: %d".format(entry.offset + index.baseOffset, entry.position))
     }
   }
-  
+
+  private trait MessageParser[K, V] {
+    def parse(message: Message): (Option[K], Option[V])
+  }
+
+  private class DecoderMessageParser[K, V](keyDecoder: Decoder[K], valueDecoder: Decoder[V])
extends MessageParser[K, V] {
+    override def parse(message: Message): (Option[K], Option[V]) = {
+      if (message.isNull) {
+        (None, None)
+      } else {
+        val key = if (message.hasKey)
+          Some(keyDecoder.fromBytes(Utils.readBytes(message.key)))
+        else
+          None
+
+        val payload = Some(valueDecoder.fromBytes(Utils.readBytes(message.payload)))
+
+        (key, payload)
+      }
+    }
+  }
+
+  private class OffsetsMessageParser extends MessageParser[String, String] {
+    private def hex(bytes: Array[Byte]): String = {
+      if (bytes.isEmpty)
+        ""
+      else
+        String.format("%X", BigInt(1, bytes))
+    }
+
+    private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer) = {
+      val group = offsetKey.key.group
+      val (topic, partition)  = offsetKey.key.topicPartition.asTuple
+      val offset = GroupMetadataManager.readOffsetMessageValue(payload)
+
+      val keyString = s"offset::${group}:${topic}:${partition}"
+      val valueString = if (offset.metadata.isEmpty)
+        String.valueOf(offset.offset)
+      else
+        s"${offset.offset}:${offset.metadata}"
+
+      (Some(keyString), Some(valueString))
+    }
+
+    private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer)
= {
+      val groupId = groupMetadataKey.key
+      val group = GroupMetadataManager.readGroupMessageValue(groupId, payload)
+      val protocolType = group.protocolType
+
+      val assignment = group.allMemberMetadata.map { member =>
+        if (protocolType == ConsumerProtocol.PROTOCOL_TYPE) {
+          val partitionAssignment = ConsumerProtocol.deserializeAssignment(ByteBuffer.wrap(member.assignment))
+          val userData = hex(Utils.toArray(partitionAssignment.userData()))
+
+          if (userData.isEmpty)
+            s"${member.memberId}=${partitionAssignment.partitions()}"
+          else
+            s"${member.memberId}=${partitionAssignment.partitions()}:${userData}"
+        } else {
+          s"${member.memberId}=${hex(member.assignment)}"
+        }
+      }.mkString("{", ",", "}")
+
+      val keyString = s"metadata::${groupId}"
+      val valueString = s"${protocolType}:${group.protocol}:${group.generationId}:${assignment}"
+
+      (Some(keyString), Some(valueString))
+    }
+
+    override def parse(message: Message): (Option[String], Option[String]) = {
+      if (message.isNull)
+        (None, None)
+      else if (!message.hasKey) {
+        throw new KafkaException("Failed to decode message using offset topic decoder (message
had a missing key)")
+      } else {
+        GroupMetadataManager.readMessageKey(message.key) match {
+          case offsetKey: OffsetKey => parseOffsets(offsetKey, message.payload)
+          case groupMetadataKey: GroupMetadataKey => parseGroupMetadata(groupMetadataKey,
message.payload)
+          case _ => throw new KafkaException("Failed to decode message using offset topic
decoder (message had an invalid key)")
+        }
+      }
+    }
+  }
+
   /* print out the contents of the log */
   private def dumpLog(file: File,
                       printContents: Boolean,
                       nonConsecutivePairsForLogFilesMap: mutable.HashMap[String, List[(Long,
Long)]],
                       isDeepIteration: Boolean,
                       maxMessageSize: Int,
-                      valueDecoder: Decoder[_],
-                      keyDecoder: Decoder[_]) {
+                      parser: MessageParser[_, _]) {
     val startOffset = file.getName().split("\\.")(0).toLong
     println("Starting offset: " + startOffset)
     val messageSet = new FileMessageSet(file, false)
@@ -160,10 +254,9 @@ object DumpLogSegments {
         if(msg.hasKey)
           print(" keysize: " + msg.keySize)
         if(printContents) {
-          if(msg.hasKey)
-            print(" key: " + keyDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.key)))
-          val payload = if(messageAndOffset.message.isNull) null else valueDecoder.fromBytes(Utils.readBytes(messageAndOffset.message.payload))
-          print(" payload: " + payload)
+          val (key, payload) = parser.parse(msg)
+          key.map(key => print(s" key: ${key}"))
+          payload.map(payload => print(s" payload: ${payload}"))
         }
         println()
       }


Mime
View raw message