kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: HOTFIX: fix group coordinator edge cases around metadata storage callback
Date Sun, 08 Nov 2015 20:23:07 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 34d997665 -> 83fb73460


HOTFIX: fix group coordinator edge cases around metadata storage callback

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang

Closes #451 from hachikuji/hotfix-group-coordinator


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/83fb7346
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/83fb7346
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/83fb7346

Branch: refs/heads/trunk
Commit: 83fb734603376d1c9ef1d88bcb5f160da5522e45
Parents: 34d9976
Author: Jason Gustafson <jason@confluent.io>
Authored: Sun Nov 8 12:29:02 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Nov 8 12:29:02 2015 -0800

----------------------------------------------------------------------
 .../scala/kafka/coordinator/GroupCoordinator.scala     | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/83fb7346/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 4d69840..2acc223 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -279,12 +279,13 @@ class GroupCoordinator(val brokerId: Int,
               val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
 
               // persist the group metadata and upon finish transition to stable and propagate
the assignment
+              val generationId = group.generationId
               groupManager.storeGroup(group, assignment, (errorCode: Short) => {
                 group synchronized {
                   // another member may have joined the group while we were awaiting this
callback,
-                  // so we must ensure we are still in the AwaitingSync state when it gets
invoked.
-                  // if we have transitioned to another state, then we shouldn't do anything
-                  if (group.is(AwaitingSync)) {
+                  // so we must ensure we are still in the AwaitingSync state and the same
generation
+                  // when it gets invoked. if we have transitioned to another state, then
do nothing
+                  if (group.is(AwaitingSync) && generationId == group.generationId)
{
                     if (errorCode != Errors.NONE.code) {
                       resetAndPropagateAssignmentError(group, errorCode)
                       maybePrepareRebalance(group)
@@ -485,6 +486,12 @@ class GroupCoordinator(val brokerId: Int,
       if (member.awaitingSyncCallback != null) {
         member.awaitingSyncCallback(member.assignment, errorCode)
         member.awaitingSyncCallback = null
+
+        // reset the session timeout for members after propagating the member's assignment.
+        // This is because if any member's session expired while we were still awaiting either
+        // the leader sync group or the storage callback, its expiration will be ignored
and no
+        // future heartbeat expectations will not be scheduled.
+        completeAndScheduleNextHeartbeatExpiration(group, member)
       }
     }
   }


Mime
View raw message