kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: bug updating cache when loading group metadata
Date Mon, 09 Nov 2015 18:20:21 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk e9fc7b8c8 -> 2b04004de


HOTFIX: bug updating cache when loading group metadata

The bug causes only the first instance of group metadata in the topic to be written to the
cache (because of the putIfNotExists in addGroup). Coordinator fail-over won't work properly
unless the cache is loaded with the right metadata.

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #462 from hachikuji/hotfix-group-loading


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b04004d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b04004d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b04004d

Branch: refs/heads/trunk
Commit: 2b04004de878823fe631af1f3f85108c0b38caec
Parents: e9fc7b8
Author: Jason Gustafson <jason@confluent.io>
Authored: Mon Nov 9 10:26:17 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 9 10:26:17 2015 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/GroupMetadataManager.scala      | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2b04004d/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index f6b8103..f98fc74 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -125,6 +125,13 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   /**
+   * Update the current cached metadata for the group with the given groupId or add the group
if there is none.
+   */
+  private def updateGroup(groupId: String, group: GroupMetadata) {
+    groupsCache.put(groupId, group)
+  }
+
+  /**
    * Remove all metadata associated with the group, note this function needs to be
    * called inside the group lock
    * @param group
@@ -401,9 +408,10 @@ class GroupMetadataManager(val brokerId: Int,
                     // load group metadata
                     val groupId = baseKey.key.asInstanceOf[String]
                     val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId,
msgAndOffset.message.payload)
-
-                    if (groupMetadata != null)
-                      addGroup(groupId, groupMetadata)
+                    if (groupMetadata != null) {
+                      trace(s"Loaded group metadata for group ${groupMetadata.groupId} with
generation ${groupMetadata.generationId}")
+                      updateGroup(groupId, groupMetadata)
+                    }
                   }
 
                   currOffset = msgAndOffset.nextOffset


Mime
View raw message