kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: Fix incorrect version used for group metadata version
Date Wed, 04 Nov 2015 22:56:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 70a784b64 -> 8de62253a


HOTFIX: Fix incorrect version used for group metadata version

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #424 from hachikuji/hotfix-metadata-storage


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8de62253
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8de62253
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8de62253

Branch: refs/heads/trunk
Commit: 8de62253adb4c6b90badbf92881c0402068cd65c
Parents: 70a784b
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Nov 4 15:02:32 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Nov 4 15:02:32 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/coordinator/GroupMetadataManager.scala  | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8de62253/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 0052b5d..0c8333f 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -673,12 +673,14 @@ object GroupMetadataManager {
   private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members")
 
   // map of versions to key schemas as data types
-  private val MESSAGE_TYPE_SCHEMAS = Map(0 -> OFFSET_COMMIT_KEY_SCHEMA,
+  private val MESSAGE_TYPE_SCHEMAS = Map(
+    0 -> OFFSET_COMMIT_KEY_SCHEMA,
     1 -> OFFSET_COMMIT_KEY_SCHEMA,
     2 -> GROUP_METADATA_KEY_SCHEMA)
 
   // map of version of offset value schemas
-  private val OFFSET_VALUE_SCHEMAS = Map(0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
+  private val OFFSET_VALUE_SCHEMAS = Map(
+    0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
     1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
   private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
 
@@ -712,7 +714,7 @@ object GroupMetadataManager {
     val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
     schemaOpt match {
       case Some(schema) => schema
-      case _ => throw new KafkaException("Unknown offset schema version " + version)
+      case _ => throw new KafkaException("Unknown group metadata version " + version)
     }
   }
 
@@ -762,7 +764,7 @@ object GroupMetadataManager {
     value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
     value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+    byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -804,7 +806,7 @@ object GroupMetadataManager {
     value.set(GROUP_METADATA_MEMBERS_V0, memberArray.toArray)
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+    byteBuffer.putShort(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }


Mime
View raw message