kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [3/3] kafka git commit: KAFKA-2017: Persist Group Metadata and Assignment before Responding
Date Tue, 03 Nov 2015 07:36:22 GMT
KAFKA-2017: Persist Group Metadata and Assignment before Responding

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Onur Karaman, Jason Gustafson, Jun Rao

Closes #386 from guozhangwang/K2017


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c334752
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c334752
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c334752

Branch: refs/heads/trunk
Commit: 7c33475274cb6e65a8e8d907e7fef6e56bc8c8e6
Parents: e466ccd
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Mon Nov 2 23:41:58 2015 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Nov 2 23:41:58 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java |   6 +-
 .../consumer/internals/ConsumerCoordinator.java |   9 +-
 .../errors/GroupLoadInProgressException.java    |  40 +
 .../errors/OffsetLoadInProgressException.java   |  40 -
 .../apache/kafka/common/protocol/Errors.java    |   4 +-
 .../common/requests/JoinGroupResponse.java      |   1 +
 .../common/requests/LeaveGroupResponse.java     |   1 +
 .../common/requests/OffsetCommitResponse.java   |   1 +
 .../common/requests/OffsetFetchResponse.java    |   2 +-
 .../internals/ConsumerCoordinatorTest.java      |   2 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   2 +-
 .../main/scala/kafka/common/ErrorMapping.scala  |   2 +-
 .../kafka/common/OffsetMetadataAndError.scala   |   2 +-
 core/src/main/scala/kafka/common/Topic.scala    |   2 +-
 .../kafka/coordinator/CoordinatorMetadata.scala |  81 --
 .../kafka/coordinator/GroupCoordinator.scala    | 139 ++-
 .../scala/kafka/coordinator/GroupMetadata.scala |   6 +-
 .../coordinator/GroupMetadataManager.scala      | 952 +++++++++++++++++++
 .../kafka/coordinator/MemberMetadata.scala      |   7 +-
 .../scala/kafka/coordinator/OffsetConfig.scala  |  61 ++
 .../src/main/scala/kafka/server/KafkaApis.scala |  12 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |  19 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   2 +-
 .../main/scala/kafka/server/OffsetManager.scala | 627 ------------
 .../kafka/api/AuthorizerIntegrationTest.scala   |   2 +-
 .../kafka/api/BaseConsumerTest.scala            |   2 +-
 .../kafka/api/IntegrationTestHarness.scala      |   2 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |   8 +-
 .../unit/kafka/consumer/TopicFilterTest.scala   |   9 +-
 .../coordinator/CoordinatorMetadataTest.scala   |  71 --
 .../GroupCoordinatorResponseTest.scala          | 515 ++++------
 .../kafka/coordinator/GroupMetadataTest.scala   |  12 -
 32 files changed, 1390 insertions(+), 1251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 4b2a824..d8f3c25 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -351,6 +351,10 @@ public abstract class AbstractCoordinator {
                 } else {
                     onJoinFollower().chain(future);
                 }
+            } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
+                log.debug("Attempt to join group {} rejected since coordinator is loading the group.", groupId);
+                // backoff and retry
+                future.raise(Errors.forCode(errorCode));
             } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
                 // reset the member id and retry immediately
                 AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
@@ -362,7 +366,7 @@ public abstract class AbstractCoordinator {
                 // re-discover the coordinator and retry with backoff
                 coordinatorDead();
                 log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
-                        groupId);
+                    groupId);
                 future.raise(Errors.forCode(errorCode));
             } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
                     || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 641939a..97d25c3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -451,7 +451,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                         // update the local cache only if the partition is still assigned
                         subscriptions.committed(tp, offsetAndMetadata);
                 } else {
-                    if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
+                    if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
+                        // just retry
+                        future.raise(Errors.GROUP_LOAD_IN_PROGRESS);
+                    } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
                             || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                         coordinatorDead();
                     } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()
@@ -511,9 +514,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl
                     log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode)
                             .exception()
                             .getMessage());
-                    if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) {
+                    if (data.errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
                         // just retry
-                        future.raise(Errors.OFFSET_LOAD_IN_PROGRESS);
+                        future.raise(Errors.GROUP_LOAD_IN_PROGRESS);
                     } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
                         // re-discover the coordinator and retry
                         coordinatorDead();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
new file mode 100644
index 0000000..17e205f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+/**
+ * The broker returns this error code for any coordiantor request if it is still loading the metadata (after a leader change
+ * for that offsets topic partition) for this group.
+ */
+public class GroupLoadInProgressException extends RetriableException {
+
+    private static final long serialVersionUID = 1L;
+
+    public GroupLoadInProgressException() {
+        super();
+    }
+
+    public GroupLoadInProgressException(String message) {
+        super(message);
+    }
+
+    public GroupLoadInProgressException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public GroupLoadInProgressException(Throwable cause) {
+        super(cause);
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
deleted file mode 100644
index 016506e..0000000
--- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-
-package org.apache.kafka.common.errors;
-
-/**
- * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change
- * for that offsets topic partition).
- */
-public class OffsetLoadInProgressException extends RetriableException {
-
-    private static final long serialVersionUID = 1L;
-
-    public OffsetLoadInProgressException() {
-        super();
-    }
-
-    public OffsetLoadInProgressException(String message) {
-        super(message);
-    }
-
-    public OffsetLoadInProgressException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public OffsetLoadInProgressException(Throwable cause) {
-        super(cause);
-    }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index bc607f0..2c9cb20 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -58,8 +58,8 @@ public enum Errors {
             new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")),
     NETWORK_EXCEPTION(13,
             new NetworkException("The server disconnected before a response was received.")),
-    OFFSET_LOAD_IN_PROGRESS(14,
-            new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")),
+    GROUP_LOAD_IN_PROGRESS(14,
+            new GroupLoadInProgressException("The coordinator is loading and hence can't process requests for this group.")),
     GROUP_COORDINATOR_NOT_AVAILABLE(15,
             new GroupCoordinatorNotAvailableException("The group coordinator is not available.")),
     NOT_COORDINATOR_FOR_GROUP(16,

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index c65a4bb..0615e5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -31,6 +31,7 @@ public class JoinGroupResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
+     * GROUP_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
      * NOT_COORDINATOR_FOR_GROUP (16)
      * INCONSISTENT_GROUP_PROTOCOL (23)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index d2af1a1..278e3e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -26,6 +26,7 @@ public class LeaveGroupResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
+     * GROUP_LOAD_IN_PROGRESS (14)
      * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
      * NOT_COORDINATOR_FOR_CONSUMER (16)
      * UNKNOWN_CONSUMER_ID (25)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index dae9c37..baea6f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -42,6 +42,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
      * Possible error code:
      *
      * OFFSET_METADATA_TOO_LARGE (12)
+     * GROUP_LOAD_IN_PROGRESS (14)
      * GROUP_COORDINATOR_NOT_AVAILABLE (15)
      * NOT_COORDINATOR_FOR_GROUP (16)
      * ILLEGAL_GENERATION (22)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 09ac74a..afc5618 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -48,7 +48,7 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
      * Possible error code:
      *
      *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
-     *  OFFSET_LOAD_IN_PROGRESS (14)
+     *  GROUP_LOAD_IN_PROGRESS (14)
      *  NOT_COORDINATOR_FOR_GROUP (16)
      *  ILLEGAL_GENERATION (22)
      *  UNKNOWN_MEMBER_ID (25)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 963da42..2029e92 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -693,7 +693,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.assignFromUser(Arrays.asList(tp));
         subscriptions.needRefreshCommits();
-        client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L));
+        client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L));
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L));
         coordinator.refreshCommittedOffsetsIfNeeded();
         assertFalse(subscriptions.refreshCommitsNeeded());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 51b4957..37ef9dc 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -129,7 +129,7 @@ object TopicCommand extends Logging {
       }
 
       if(opts.options.has(opts.partitionsOpt)) {
-        if (topic == GroupCoordinator.OffsetsTopicName) {
+        if (topic == GroupCoordinator.GroupMetadataTopicName) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
         }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index 23224ec..81cb51b 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -59,7 +59,7 @@ object ErrorMapping {
   // 26: INVALID_SESSION_TIMEOUT
   // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED
   // 28: INVALID_COMMIT_OFFSET_SIZE
-  val AuthorizationCode: Short = 29;
+  val AuthorizationCode: Short = 29
 
   private val exceptionToCode =
     Map[Class[Throwable], Short](

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index bbee894..a94e58c 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -63,7 +63,7 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short =
 
 object OffsetMetadataAndError {
   val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code)
-  val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code)
+  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS.code)
   val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code)
   val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code)
   val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index ca41eba..982955e 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -26,7 +26,7 @@ object Topic {
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 
-  val InternalTopics = Set(GroupCoordinator.OffsetsTopicName)
+  val InternalTopics = Set(GroupCoordinator.GroupMetadataTopicName)
 
   def validate(topic: String) {
     if (topic.length <= 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
deleted file mode 100644
index 2279924..0000000
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.threadsafe
-
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
-import scala.collection.mutable
-
-/**
- * CoordinatorMetadata manages group and topic metadata.
- * It delegates all group logic to the callers.
- */
-@threadsafe
-private[coordinator] class CoordinatorMetadata(brokerId: Int) {
-
-  /**
-   * NOTE: If a group lock and metadataLock are simultaneously needed,
-   * be sure to acquire the group lock before metadataLock to prevent deadlock
-   */
-  private val metadataLock = new ReentrantReadWriteLock()
-
-  /**
-   * These should be guarded by metadataLock
-   */
-  private val groups = new mutable.HashMap[String, GroupMetadata]
-
-  def shutdown() {
-    inWriteLock(metadataLock) {
-      groups.clear()
-    }
-  }
-
-  /**
-   * Get the group associated with the given groupId, or null if not found
-   */
-  def getGroup(groupId: String) = {
-    inReadLock(metadataLock) {
-      groups.get(groupId).orNull
-    }
-  }
-
-  /**
-   * Add a group or get the group associated with the given groupId if it already exists
-   */
-  def addGroup(groupId: String, protocolType: String) = {
-    inWriteLock(metadataLock) {
-      groups.getOrElseUpdate(groupId, new GroupMetadata(groupId, protocolType))
-    }
-  }
-
-  /**
-   * Remove all metadata associated with the group, including its topics
-   * @param groupId the groupId of the group we are removing
-   */
-  def removeGroup(groupId: String) {
-    inWriteLock(metadataLock) {
-      if (!groups.contains(groupId))
-        throw new IllegalArgumentException("Cannot remove non-existing group")
-      groups.remove(groupId)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index 0ef542d..97ce22b 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -29,8 +29,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest
 
 import scala.collection.{Map, Seq, immutable}
 
-case class GroupManagerConfig(groupMinSessionTimeoutMs: Int,
-                              groupMaxSessionTimeoutMs: Int)
+case class GroupConfig(groupMinSessionTimeoutMs: Int,
+                       groupMaxSessionTimeoutMs: Int)
 
 case class JoinGroupResult(members: Map[String, Array[Byte]],
                            memberId: String,
@@ -46,9 +46,9 @@ case class JoinGroupResult(members: Map[String, Array[Byte]],
  * groups. Groups are assigned to coordinators based on their group names.
  */
 class GroupCoordinator(val brokerId: Int,
-                       val groupConfig: GroupManagerConfig,
-                       val offsetConfig: OffsetManagerConfig,
-                       private val offsetManager: OffsetManager) extends Logging {
+                       val groupConfig: GroupConfig,
+                       val offsetConfig: OffsetConfig,
+                       val groupManager: GroupMetadataManager) extends Logging {
   type JoinCallback = JoinGroupResult => Unit
   type SyncCallback = (Array[Byte], Short) => Unit
 
@@ -58,15 +58,14 @@ class GroupCoordinator(val brokerId: Int,
 
   private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null
   private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null
-  private var coordinatorMetadata: CoordinatorMetadata = null
 
   def this(brokerId: Int,
-           groupConfig: GroupManagerConfig,
-           offsetConfig: OffsetManagerConfig,
+           groupConfig: GroupConfig,
+           offsetConfig: OffsetConfig,
            replicaManager: ReplicaManager,
            zkUtils: ZkUtils,
-           scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
-    new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler))
+           scheduler: Scheduler) = this(brokerId, groupConfig, offsetConfig,
+    new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils, scheduler))
 
   def offsetsTopicConfigs: Properties = {
     val props = new Properties
@@ -88,7 +87,6 @@ class GroupCoordinator(val brokerId: Int,
     info("Starting up.")
     heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
     joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId)
-    coordinatorMetadata = new CoordinatorMetadata(brokerId)
     isActive.set(true)
     info("Startup complete.")
   }
@@ -100,8 +98,7 @@ class GroupCoordinator(val brokerId: Int,
   def shutdown() {
     info("Shutting down.")
     isActive.set(false)
-    offsetManager.shutdown()
-    coordinatorMetadata.shutdown()
+    groupManager.shutdown()
     heartbeatPurgatory.shutdown()
     joinPurgatory.shutdown()
     info("Shutdown complete.")
@@ -109,6 +106,7 @@ class GroupCoordinator(val brokerId: Int,
 
   def handleJoinGroup(groupId: String,
                       memberId: String,
+                      clientId: String,
                       sessionTimeoutMs: Int,
                       protocolType: String,
                       protocols: List[(String, Array[Byte])],
@@ -118,7 +116,9 @@ class GroupCoordinator(val brokerId: Int,
     } else if (!validGroupId(groupId)) {
       responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code))
     } else if (!isCoordinatorForGroup(groupId)) {
-      responseCallback(joinError(memberId,Errors.NOT_COORDINATOR_FOR_GROUP.code))
+      responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code))
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code))
     } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs ||
                sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) {
       responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code))
@@ -126,22 +126,23 @@ class GroupCoordinator(val brokerId: Int,
       // only try to create the group if the group is not unknown AND
       // the member id is UNKNOWN, if member is specified but group does not
       // exist we should reject the request
-      var group = coordinatorMetadata.getGroup(groupId)
+      var group = groupManager.getGroup(groupId)
       if (group == null) {
         if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
           responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
         } else {
-          group = coordinatorMetadata.addGroup(groupId, protocolType)
-          doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+          group = groupManager.addGroup(groupId, protocolType)
+          doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
         }
       } else {
-        doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback)
+        doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback)
       }
     }
   }
 
   private def doJoinGroup(group: GroupMetadata,
                           memberId: String,
+                          clientId: String,
                           sessionTimeoutMs: Int,
                           protocolType: String,
                           protocols: List[(String, Array[Byte])],
@@ -165,7 +166,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case PreparingRebalance =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               updateMemberAndRebalance(group, member, protocols, responseCallback)
@@ -173,7 +174,7 @@ class GroupCoordinator(val brokerId: Int,
 
           case AwaitingSync =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
-              addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (member.matches(protocols)) {
@@ -200,7 +201,7 @@ class GroupCoordinator(val brokerId: Int,
           case Stable =>
             if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
               // if the member id is unknown, register the member to the group
-              addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback)
+              addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback)
             } else {
               val member = group.get(memberId)
               if (memberId == group.leaderId || !member.matches(protocols)) {
@@ -238,7 +239,7 @@ class GroupCoordinator(val brokerId: Int,
     } else if (!isCoordinatorForGroup(groupId)) {
       responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code)
     } else {
-      val group = coordinatorMetadata.getGroup(groupId)
+      val group = groupManager.getGroup(groupId)
       if (group == null)
         responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code)
       else
@@ -272,7 +273,9 @@ class GroupCoordinator(val brokerId: Int,
             // propagate the assignment to any awaiting members
             if (memberId == group.leaderId) {
               group.transitionTo(Stable)
-              propagateAssignment(group, groupAssignment)
+
+              // persist the group metadata and upon finish propagate the assignment
+              groupManager.storeGroup(group, groupAssignment)
             }
 
           case Stable =>
@@ -290,8 +293,10 @@ class GroupCoordinator(val brokerId: Int,
       responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
     } else if (!isCoordinatorForGroup(groupId)) {
       responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code)
     } else {
-      val group = coordinatorMetadata.getGroup(groupId)
+      val group = groupManager.getGroup(groupId)
       if (group == null) {
         // if the group is marked as dead, it means some other thread has just removed the group
         // from the coordinator metadata; this is likely that the group has migrated to some other
@@ -323,17 +328,20 @@ class GroupCoordinator(val brokerId: Int,
       responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)
     } else if (!isCoordinatorForGroup(groupId)) {
       responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code)
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      // the group is still loading, so respond just blindly
+      responseCallback(Errors.NONE.code)
     } else {
-      val group = coordinatorMetadata.getGroup(groupId)
+      val group = groupManager.getGroup(groupId)
       if (group == null) {
-        // if the group is marked as dead, it means some other thread has just removed the group
-        // from the coordinator metadata; this is likely that the group has migrated to some other
-        // coordinator OR the group is in a transient unstable phase. Let the member retry
-        // joining without the specified member id,
         responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
       } else {
         group synchronized {
           if (group.is(Dead)) {
+            // if the group is marked as dead, it means some other thread has just removed the group
+            // from the coordinator metadata; this is likely that the group has migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let the member retry
+            // joining without the specified member id,
             responseCallback(Errors.UNKNOWN_MEMBER_ID.code)
           } else if (!group.is(Stable)) {
             responseCallback(Errors.REBALANCE_IN_PROGRESS.code)
@@ -360,12 +368,14 @@ class GroupCoordinator(val brokerId: Int,
       responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))
     } else if (!isCoordinatorForGroup(groupId)) {
       responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code))
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code))
     } else {
-      val group = coordinatorMetadata.getGroup(groupId)
+      val group = groupManager.getGroup(groupId)
       if (group == null) {
         if (generationId < 0)
           // the group is not relying on Kafka for partition management, so allow the commit
-          offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+          groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
         else
           // the group has failed over to this coordinator (which will be handled in KAFKA-2017),
           // or this is a request coming from an older generation. either way, reject the commit
@@ -381,7 +391,7 @@ class GroupCoordinator(val brokerId: Int,
           } else if (generationId != group.generationId) {
             responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
           } else {
-            offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
+            groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback)
           }
         }
       }
@@ -394,21 +404,21 @@ class GroupCoordinator(val brokerId: Int,
       partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap
     } else if (!isCoordinatorForGroup(groupId)) {
       partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+    } else if (isCoordinatorLoadingInProgress(groupId)) {
+      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupLoading)}.toMap
     } else {
       // return offsets blindly regardless the current group state since the group may be using
       // Kafka commit storage without automatic group management
-      offsetManager.getOffsets(groupId, partitions)
+      groupManager.getOffsets(groupId, partitions)
     }
   }
 
   def handleGroupImmigration(offsetTopicPartitionId: Int) = {
-    // TODO we may need to add more logic in KAFKA-2017
-    offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
+    groupManager.loadGroupsForPartition(offsetTopicPartitionId)
   }
 
   def handleGroupEmigration(offsetTopicPartitionId: Int) = {
-    // TODO we may need to add more logic in KAFKA-2017
-    offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
+    groupManager.removeGroupsForPartition(offsetTopicPartitionId)
   }
 
   private def validGroupId(groupId: String): Boolean = {
@@ -425,17 +435,6 @@ class GroupCoordinator(val brokerId: Int,
       errorCode=errorCode)
   }
 
-  private def propagateAssignment(group: GroupMetadata,
-                                  assignment: Map[String, Array[Byte]]) {
-    for (member <- group.allMembers) {
-      member.assignment = assignment.getOrElse(member.memberId, Array.empty[Byte])
-      if (member.awaitingSyncCallback != null) {
-        member.awaitingSyncCallback(member.assignment, Errors.NONE.code)
-        member.awaitingSyncCallback = null
-      }
-    }
-  }
-
   /**
    * Complete existing DelayedHeartbeats for the given member and schedule the next one
    */
@@ -458,10 +457,12 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   private def addMemberAndRebalance(sessionTimeoutMs: Int,
+                                    clientId: String,
                                     protocols: List[(String, Array[Byte])],
                                     group: GroupMetadata,
                                     callback: JoinCallback) = {
-    val memberId = group.generateNextMemberId
+    // use the client-id with a random id suffix as the member-id
+    val memberId = clientId + "-" + group.generateMemberIdSuffix
     val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols)
     member.awaitingJoinCallback = callback
     group.add(member.memberId, member)
@@ -488,12 +489,7 @@ class GroupCoordinator(val brokerId: Int,
   private def prepareRebalance(group: GroupMetadata) {
     // if any members are awaiting sync, cancel their request and have them rejoin
     if (group.is(AwaitingSync)) {
-      for (member <- group.allMembers) {
-        if (member.awaitingSyncCallback != null) {
-          member.awaitingSyncCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code)
-          member.awaitingSyncCallback = null
-        }
-      }
+      groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code)
     }
 
     group.allMembers.foreach(_.assignment = null)
@@ -537,14 +533,14 @@ class GroupCoordinator(val brokerId: Int,
           // TODO: cut the socket connection to the client
         }
 
+        // TODO KAFKA-2720: only remove group in the background thread
         if (group.isEmpty) {
-          group.transitionTo(Dead)
+          groupManager.removeGroup(group)
           info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId))
-          coordinatorMetadata.removeGroup(group.groupId)
         }
       }
       if (!group.is(Dead)) {
-        group.initNextGeneration
+        group.initNextGeneration()
         info("Stabilized group %s generation %s".format(group.groupId, group.generationId))
 
         // trigger the awaiting join group response callback for all the members after rebalancing
@@ -585,27 +581,31 @@ class GroupCoordinator(val brokerId: Int,
     // TODO: add metrics for complete heartbeats
   }
 
-  def partitionFor(group: String): Int = offsetManager.partitionFor(group)
+  def partitionFor(group: String): Int = groupManager.partitionFor(group)
 
   private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) =
     member.awaitingJoinCallback != null ||
       member.awaitingSyncCallback != null ||
       member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline
 
-  private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
+  private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId)
+
+  private def isCoordinatorLoadingInProgress(groupId: String) = groupManager.isGroupLoading(groupId)
 }
 
 object GroupCoordinator {
 
   val NoProtocol = ""
   val NoLeader = ""
-  val OffsetsTopicName = "__consumer_offsets"
+
+  // TODO: we store both group metadata and offset data here despite the topic name being offsets only
+  val GroupMetadataTopicName = "__consumer_offsets"
 
   def create(config: KafkaConfig,
              zkUtils: ZkUtils,
              replicaManager: ReplicaManager,
-             kafkaScheduler: KafkaScheduler): GroupCoordinator = {
-    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+             scheduler: Scheduler): GroupCoordinator = {
+    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
       offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
@@ -613,16 +613,15 @@ object GroupCoordinator {
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, scheduler)
   }
 
   def create(config: KafkaConfig,
-             zkUtils: ZkUtils,
-             offsetManager: OffsetManager): GroupCoordinator = {
-    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+             groupManager: GroupMetadataManager): GroupCoordinator = {
+    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
       loadBufferSize = config.offsetsLoadBufferSize,
       offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
       offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
@@ -630,9 +629,9 @@ object GroupCoordinator {
       offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
+    val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
       groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs)
 
-    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager)
+    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 60ee987..652a3a4 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -86,7 +86,7 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 }
 
 private object GroupMetadata {
   private val validPreviousStates: Map[GroupState, Set[GroupState]] =
-    Map(Dead -> Set(PreparingRebalance),
+    Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync),
       AwaitingSync -> Set(PreparingRebalance),
       Stable -> Set(AwaitingSync),
       PreparingRebalance -> Set(Stable, AwaitingSync))
@@ -151,7 +151,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
   }
 
   // TODO: decide if ids should be predictable or random
-  def generateNextMemberId = UUID.randomUUID().toString
+  def generateMemberIdSuffix = UUID.randomUUID().toString
 
   def canRebalance = state == Stable || state == AwaitingSync
 
@@ -188,7 +188,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType:
     isEmpty || (memberProtocols & candidateProtocols).nonEmpty
   }
 
-  def initNextGeneration = {
+  def initNextGeneration() = {
     assert(notYetRejoinedMembers == List.empty[MemberMetadata])
     generationId += 1
     protocol = selectProtocol

http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
new file mode 100644
index 0000000..81ed548
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -0,0 +1,952 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.utils.CoreUtils._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field}
+import org.apache.kafka.common.protocol.types.Type.STRING
+import org.apache.kafka.common.protocol.types.Type.INT32
+import org.apache.kafka.common.protocol.types.Type.INT64
+import org.apache.kafka.common.protocol.types.Type.BYTES
+import org.apache.kafka.common.utils.Utils
+
+import kafka.utils._
+import kafka.common._
+import kafka.message._
+import kafka.log.FileMessageSet
+import kafka.metrics.KafkaMetricsGroup
+import kafka.common.TopicAndPartition
+import kafka.tools.MessageFormatter
+import kafka.api.ProducerResponseStatus
+import kafka.server.ReplicaManager
+
+import scala.collection._
+import java.io.PrintStream
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import com.yammer.metrics.core.Gauge
+
+class GroupMetadataManager(val brokerId: Int,
+                           val config: OffsetConfig,
+                           replicaManager: ReplicaManager,
+                           zkUtils: ZkUtils,
+                           scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
+
+  /* offsets cache */
+  private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
+
+  /* group metadata cache */
+  private val groupsCache = new Pool[String, GroupMetadata]
+
+  /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */
+  private val loadingPartitions: mutable.Set[Int] = mutable.Set()
+
+  /* partitions of consumer groups that are assigned, using the same loading partition lock */
+  private val ownedPartitions: mutable.Set[Int] = mutable.Set()
+
+  /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */
+  private val offsetExpireLock = new ReentrantReadWriteLock()
+
+  /* lock for removing offsets of a range partition, it should be always called BEFORE the group lock if needed */
+  private val offsetRemoveLock = new ReentrantReadWriteLock()
+
+  /* shutting down flag */
+  private val shuttingDown = new AtomicBoolean(false)
+
+  /* number of partitions for the consumer metadata topic */
+  private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
+
+  this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: "
+
+  scheduler.schedule(name = "delete-expired-consumer-offsets",
+    fun = deleteExpiredOffsets,
+    period = config.offsetsRetentionCheckIntervalMs,
+    unit = TimeUnit.MILLISECONDS)
+
+  newGauge("NumOffsets",
+    new Gauge[Int] {
+      def value = offsetsCache.size
+    }
+  )
+
+  newGauge("NumGroups",
+    new Gauge[Int] {
+      def value = groupsCache.size
+    }
+  )
+
+  def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount
+
+  def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId))
+
+  def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId))
+
+  /**
+   * Get the group associated with the given groupId, or null if not found
+   */
+  def getGroup(groupId: String): GroupMetadata = {
+      groupsCache.get(groupId)
+  }
+
+  /**
+   * Add a group or get the group associated with the given groupId if it already exists
+   */
+  def addGroup(groupId: String, protocolType: String): GroupMetadata = {
+    addGroup(groupId, new GroupMetadata(groupId, protocolType))
+  }
+
+  private def addGroup(groupId: String, group: GroupMetadata): GroupMetadata = {
+    groupsCache.putIfNotExists(groupId, group)
+    groupsCache.get(groupId)
+  }
+
+  /**
+   * Remove all metadata associated with the group, note this function needs to be
+   * called inside the group lock
+   * @param group
+   */
+  def removeGroup(group: GroupMetadata) {
+    // first mark the group as dead
+    group.transitionTo(Dead)
+
+    if (groupsCache.remove(group.groupId) != group)
+      throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.")
+
+    // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say,
+    // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and
+    // retry removing this group.
+    val groupPartition = partitionFor(group.groupId)
+    val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId))
+
+    val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+    partitionOpt.foreach { partition =>
+      val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition)
+
+      trace("Marking group %s as deleted.".format(group.groupId))
+
+      try {
+        // do not need to require acks since even if the tombstone is lost,
+        // it will be appended again by the new leader
+        // TODO KAFKA-2720: periodic purging instead of immediate removal of groups
+        partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone))
+      } catch {
+        case t: Throwable =>
+          error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t)
+          // ignore and continue
+      }
+    }
+  }
+
+  def storeGroup(group: GroupMetadata,
+                 groupAssignment: Map[String, Array[Byte]]) {
+    // construct the message to append
+    val message = new Message(
+      key = GroupMetadataManager.groupMetadataKey(group.groupId),
+      bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment)
+    )
+
+    val groupMetadataPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId))
+
+    val groupMetadataMessageSet = Map(groupMetadataPartition ->
+      new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message))
+
+    // set the callback function to insert the created group into cache after log append completed
+    def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+      // the append response should only contain the topics partition
+      if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition))
+        throw new IllegalStateException("Append status %s should only have one partition %s"
+          .format(responseStatus, groupMetadataPartition))
+
+      // construct the error status in the propagated assignment response
+      // in the cache
+      val status = responseStatus(groupMetadataPartition)
+
+      var responseCode = Errors.NONE.code
+      if (status.error != ErrorMapping.NoError) {
+        debug("Metadata from group %s with generation %d failed when appending to log due to %s"
+          .format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error)))
+
+        // transform the log append error code to the corresponding the commit status error code
+        responseCode = if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) {
+          Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code
+        } else if (status.error == ErrorMapping.NotLeaderForPartitionCode) {
+          Errors.NOT_COORDINATOR_FOR_GROUP.code
+        } else if (status.error == ErrorMapping.MessageSizeTooLargeCode
+          || status.error == ErrorMapping.MessageSetSizeTooLargeCode
+          || status.error == ErrorMapping.InvalidFetchSizeCode) {
+
+          error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client"
+            .format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error)))
+
+          Errors.UNKNOWN.code
+        } else {
+
+          error("Appending metadata message for group %s generation %d failed due to unexpected error: %s"
+            .format(group.groupId, group.generationId, status.error))
+
+          status.error
+        }
+      }
+
+      for (member <- group.allMembers) {
+        member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte])
+      }
+
+      // propagate the assignments
+      propagateAssignment(group, responseCode)
+    }
+
+    // call replica manager to append the group message
+    replicaManager.appendMessages(
+      config.offsetCommitTimeoutMs.toLong,
+      config.offsetCommitRequiredAcks,
+      true, // allow appending to internal offset topic
+      groupMetadataMessageSet,
+      putCacheCallback)
+  }
+
+  def propagateAssignment(group: GroupMetadata,
+                          errorCode: Short) {
+    val hasError = errorCode != Errors.NONE.code
+    for (member <- group.allMembers) {
+      if (member.awaitingSyncCallback != null) {
+        member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode)
+        member.awaitingSyncCallback = null
+      }
+    }
+  }
+
+  /**
+   * Store offsets by appending it to the replicated log and then inserting to cache
+   */
+  def storeOffsets(groupId: String,
+                   consumerId: String,
+                   generationId: Int,
+                   offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
+                   responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+    // first filter out partitions with offset metadata size exceeding limit
+    val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
+    }
+
+    // construct the message set to append
+    val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+      new Message(
+        key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition),
+        bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+      )
+    }.toSeq
+
+    val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId))
+
+    val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
+      new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
+
+    // set the callback function to insert offsets into cache after log append completed
+    def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) {
+      // the append response should only contain the topics partition
+      if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition))
+        throw new IllegalStateException("Append status %s should only have one partition %s"
+          .format(responseStatus, offsetTopicPartition))
+
+      // construct the commit response status and insert
+      // the offset and metadata to cache if the append status has no error
+      val status = responseStatus(offsetTopicPartition)
+
+      val responseCode =
+        if (status.error == ErrorMapping.NoError) {
+          filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) =>
+            putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata)
+          }
+          ErrorMapping.NoError
+        } else {
+          debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s"
+            .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error)))
+
+          // transform the log append error code to the corresponding the commit status error code
+          if (status.error == ErrorMapping.UnknownTopicOrPartitionCode)
+            ErrorMapping.ConsumerCoordinatorNotAvailableCode
+          else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
+            ErrorMapping.NotCoordinatorForConsumerCode
+          else if (status.error == ErrorMapping.MessageSizeTooLargeCode
+            || status.error == ErrorMapping.MessageSetSizeTooLargeCode
+            || status.error == ErrorMapping.InvalidFetchSizeCode)
+            Errors.INVALID_COMMIT_OFFSET_SIZE.code
+          else
+            status.error
+        }
+
+
+      // compute the final error codes for the commit response
+      val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
+        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+          (topicAndPartition, responseCode)
+        else
+          (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+      }
+
+      // finally trigger the callback logic passed from the API layer
+      responseCallback(commitStatus)
+    }
+
+    // call replica manager to append the offset messages
+    replicaManager.appendMessages(
+      config.offsetCommitTimeoutMs.toLong,
+      config.offsetCommitRequiredAcks,
+      true, // allow appending to internal offset topic
+      offsetsAndMetadataMessageSet,
+      putCacheCallback)
+  }
+
+  /**
+   * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
+   * returns the current offset or it begins to sync the cache from the log (and returns an error code).
+   */
+  def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+    trace("Getting offsets %s for group %s.".format(topicPartitions, group))
+
+    if (isGroupLocal(group)) {
+      if (topicPartitions.isEmpty) {
+        // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
+        offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
+          (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError))
+        }.toMap
+      } else {
+        topicPartitions.map { topicAndPartition =>
+          val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+          (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
+        }.toMap
+      }
+    } else {
+      debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
+      topicPartitions.map { topicAndPartition =>
+        val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+        (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
+      }.toMap
+    }
+  }
+
+  /**
+   * Asynchronously read the partition from the offsets topic and populate the cache
+   */
+  def loadGroupsForPartition(offsetsPartition: Int) {
+
+    val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+
+    loadingPartitions synchronized {
+      ownedPartitions.add(offsetsPartition)
+
+      if (loadingPartitions.contains(offsetsPartition)) {
+        info("Offset load from %s already in progress.".format(topicPartition))
+      } else {
+        loadingPartitions.add(offsetsPartition)
+        scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
+      }
+    }
+
+    def loadGroupsAndOffsets() {
+      info("Loading offsets from " + topicPartition)
+
+      val startMs = SystemTime.milliseconds
+      try {
+        replicaManager.logManager.getLog(topicPartition) match {
+          case Some(log) =>
+            var currOffset = log.logSegments.head.baseOffset
+            val buffer = ByteBuffer.allocate(config.loadBufferSize)
+            // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
+            inWriteLock(offsetExpireLock) {
+              while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
+                buffer.clear()
+                val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
+                messages.readInto(buffer, 0)
+                val messageSet = new ByteBufferMessageSet(buffer)
+                messageSet.foreach { msgAndOffset =>
+                  require(msgAndOffset.message.key != null, "Offset entry key should not be null")
+                  val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key)
+
+                  if (baseKey.isInstanceOf[OffsetKey]) {
+                    // load offset
+                    val key = baseKey.key.asInstanceOf[GroupTopicPartition]
+                    if (msgAndOffset.message.payload == null) {
+                      if (offsetsCache.remove(key) != null)
+                        trace("Removed offset for %s due to tombstone entry.".format(key))
+                      else
+                        trace("Ignoring redundant tombstone for %s.".format(key))
+                    } else {
+                      // special handling for version 0:
+                      // set the expiration time stamp as commit time stamp + server default retention time
+                      val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload)
+                      putOffset(key, value.copy (
+                        expireTimestamp = {
+                          if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+                            value.commitTimestamp + config.offsetsRetentionMs
+                          else
+                            value.expireTimestamp
+                        }
+                      ))
+                      trace("Loaded offset %s for %s.".format(value, key))
+                    }
+                  } else {
+                    // load group metadata
+                    val groupId = baseKey.key.asInstanceOf[String]
+                    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload)
+
+                    addGroup(groupId, groupMetadata)
+                  }
+
+                  currOffset = msgAndOffset.nextOffset
+                }
+              }
+            }
+
+            if (!shuttingDown.get())
+              info("Finished loading offsets from %s in %d milliseconds."
+                .format(topicPartition, SystemTime.milliseconds - startMs))
+          case None =>
+            warn("No log found for " + topicPartition)
+        }
+      }
+      catch {
+        case t: Throwable =>
+          error("Error in loading offsets from " + topicPartition, t)
+      }
+      finally {
+        loadingPartitions synchronized loadingPartitions.remove(offsetsPartition)
+      }
+    }
+  }
+
+  /**
+   * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to
+   * that partition.
+   * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache.
+   */
+  def removeGroupsForPartition(offsetsPartition: Int) {
+    var numOffsetsRemoved = 0
+    var numGroupsRemoved = 0
+
+    loadingPartitions synchronized {
+      // we need to guard the group removal in cache in the loading partition lock
+      // to prevent coordinator's check-and-get-group race condition
+      ownedPartitions.remove(offsetsPartition)
+
+      // clear the offsets for this partition in the cache
+
+      /**
+       * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check
+       * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower
+       * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code
+       */
+      offsetsCache.keys.foreach { key =>
+        if (partitionFor(key.group) == offsetsPartition) {
+          offsetsCache.remove(key)
+          numOffsetsRemoved += 1
+        }
+      }
+
+      // clear the groups for this partition in the cache
+      for (group <- groupsCache.values) {
+        group synchronized {
+          // mark the group as dead and then remove it from cache
+          group.transitionTo(Dead)
+
+          if (groupsCache.remove(group.groupId) != group)
+            throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.")
+
+          numGroupsRemoved += 1
+        }
+      }
+    }
+
+    if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
+      .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+
+    if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition."
+      .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)))
+  }
+
+  /**
+   * Fetch the current offset for the given group/topic/partition from the underlying offsets storage.
+   *
+   * @param key The requested group-topic-partition
+   * @return If the key is present, return the offset and metadata; otherwise return None
+   */
+  private def getOffset(key: GroupTopicPartition) = {
+    val offsetAndMetadata = offsetsCache.get(key)
+    if (offsetAndMetadata == null)
+      OffsetMetadataAndError.NoOffset
+    else
+      OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)
+  }
+
+  /**
+   * Put the (already committed) offset for the given group/topic/partition into the cache.
+   *
+   * @param key The group-topic-partition
+   * @param offsetAndMetadata The offset/metadata to be stored
+   */
+  private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) {
+    offsetsCache.put(key, offsetAndMetadata)
+  }
+
+  private def deleteExpiredOffsets() {
+    debug("Collecting expired offsets.")
+    val startMs = SystemTime.milliseconds
+
+    val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
+      val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) =>
+        offsetAndMetadata.expireTimestamp < startMs
+      }
+
+      debug("Found %d expired offsets.".format(expiredOffsets.size))
+
+      // delete the expired offsets from the table and generate tombstone messages to remove them from the log
+      val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) =>
+        val offsetsPartition = partitionFor(groupTopicAndPartition.group)
+        trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
+
+        offsetsCache.remove(groupTopicAndPartition)
+
+        val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
+          groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
+
+        (offsetsPartition, new Message(bytes = null, key = commitKey))
+      }.groupBy { case (partition, tombstone) => partition }
+
+      // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
+      // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
+      tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
+        val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+        partitionOpt.map { partition =>
+          val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition)
+          val messages = tombstones.map(_._2).toSeq
+
+          trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
+
+          try {
+            // do not need to require acks since even if the tombsone is lost,
+            // it will be appended again in the next purge cycle
+            partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*))
+            tombstones.size
+          }
+          catch {
+            case t: Throwable =>
+              error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t)
+              // ignore and continue
+              0
+          }
+        }
+      }.sum
+    }
+
+    info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs))
+  }
+
+  private def getHighWatermark(partitionId: Int): Long = {
+    val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, partitionId)
+
+    val hw = partitionOpt.map { partition =>
+      partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
+    }.getOrElse(-1L)
+
+    hw
+  }
+
+  /*
+   * Check if the offset metadata length is valid
+   */
+  private def validateOffsetMetadataLength(metadata: String) : Boolean = {
+    metadata == null || metadata.length() <= config.maxMetadataSize
+  }
+
+  def shutdown() {
+    shuttingDown.set(true)
+
+    // TODO: clear the caches
+  }
+
+  /**
+   * Gets the partition count of the offsets topic from ZooKeeper.
+   * If the topic does not exist, the configured partition count is returned.
+   */
+  private def getOffsetsTopicPartitionCount = {
+    val topic = GroupCoordinator.GroupMetadataTopicName
+    val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
+    if (topicData(topic).nonEmpty)
+      topicData(topic).size
+    else
+      config.offsetsTopicNumPartitions
+  }
+
+  /**
+   * Add the partition into the owned list
+   *
+   * NOTE: this is for test only
+   */
+  def addPartitionOwnership(partition: Int) {
+    loadingPartitions synchronized {
+      ownedPartitions.add(partition)
+    }
+  }
+}
+
+/**
+ * Messages stored for the group topic has versions for both the key and value fields. Key
+ * version is used to indicate the type of the message (also to differentiate different types
+ * of messages from being compacted together if they have the same field values); and value
+ * version is used to evolve the messages within their data types:
+ *
+ * key version 0:       group consumption offset
+ *    -> value version 0:       [offset, metadata, timestamp]
+ *
+ * key version 1:       group consumption offset
+ *    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]
+ *
+ * key version 2:       group metadata
+ *     -> value version 0:       [protocol_type, generation, protocol, leader, members]
+ */
+object GroupMetadataManager {
+
+  private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort
+  private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort
+
+  private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING),
+    new Field("topic", STRING),
+    new Field("partition", INT32))
+  private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group")
+  private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic")
+  private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition")
+
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64),
+    new Field("metadata", STRING, "Associated metadata.", ""),
+    new Field("timestamp", INT64))
+  private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset")
+  private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata")
+  private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp")
+
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64),
+    new Field("metadata", STRING, "Associated metadata.", ""),
+    new Field("commit_timestamp", INT64),
+    new Field("expire_timestamp", INT64))
+  private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset")
+  private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata")
+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
+  private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
+
+  private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
+  private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
+
+  private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING),
+    new Field("session_timeout", INT32),
+    new Field("subscription", BYTES),
+    new Field("assignment", BYTES))
+  private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id")
+  private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout")
+  private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription")
+  private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment")
+
+
+  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING),
+    new Field("generation", INT32),
+    new Field("protocol", STRING),
+    new Field("leader", STRING),
+    new Field("members", new ArrayOf(MEMBER_METADATA_V0)))
+  private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type")
+  private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation")
+  private val GROUP_METADATA_PROTOCOL_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol")
+  private val GROUP_METADATA_LEADER_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("leader")
+  private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members")
+
+  // map of versions to key schemas as data types
+  private val MESSAGE_TYPE_SCHEMAS = Map(0 -> OFFSET_COMMIT_KEY_SCHEMA,
+    1 -> OFFSET_COMMIT_KEY_SCHEMA,
+    2 -> GROUP_METADATA_KEY_SCHEMA)
+
+  // map of version of offset value schemas
+  private val OFFSET_VALUE_SCHEMAS = Map(0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
+    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
+  private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
+
+  // map of version of group metadata value schemas
+  private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0)
+  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort
+
+  private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+  private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+
+  private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+  private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
+
+  private def schemaForKey(version: Int) = {
+    val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown offset schema version " + version)
+    }
+  }
+
+  private def schemaForOffset(version: Int) = {
+    val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown offset schema version " + version)
+    }
+  }
+
+  private def schemaForGroup(version: Int) = {
+    val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
+    schemaOpt match {
+      case Some(schema) => schema
+      case _ => throw new KafkaException("Unknown offset schema version " + version)
+    }
+  }
+
+  /**
+   * Generates the key for offset commit message for given (group, topic, partition)
+   *
+   * @return key for offset commit message
+   */
+  private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = {
+    val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
+    key.set(OFFSET_KEY_GROUP_FIELD, group)
+    key.set(OFFSET_KEY_TOPIC_FIELD, topic)
+    key.set(OFFSET_KEY_PARTITION_FIELD, partition)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+    key.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the key for group metadata message for given group
+   *
+   * @return key bytes for group metadata message
+   */
+  private def groupMetadataKey(group: String): Array[Byte] = {
+    val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
+    key.set(GROUP_KEY_GROUP_FIELD, group)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
+    byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION)
+    key.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the payload for offset commit message from given offset and metadata
+   *
+   * @param offsetAndMetadata consumer's current offset and metadata
+   * @return payload for offset commit message
+   */
+  private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+    // generate commit value with schema version 1
+    val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
+    value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+    value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+    value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+    value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+    value.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Generates the payload for group metadata message from given offset and metadata
+   * assuming the generation id, selected protocol, leader and member assignment are all available
+   *
+   * @param groupMetadata
+   * @return payload for offset commit message
+   */
+  private def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = {
+    // generate commit value with schema version 1
+    val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+    value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType)
+    value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId)
+    value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol)
+    value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId)
+
+    val memberArray = groupMetadata.allMembers.map {
+      case memberMetadata =>
+        val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0)
+        memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId)
+        memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs)
+
+        val metadata = memberMetadata.metadata(groupMetadata.protocol)
+        memberStruct.set(MEMBER_METADATA_SUBSCRIPTION_V0, ByteBuffer.wrap(metadata))
+
+        val memberAssignment = assignment(memberMetadata.memberId)
+        assert(memberAssignment != null)
+
+        memberStruct.set(MEMBER_METADATA_ASSIGNMENT_V0, ByteBuffer.wrap(memberAssignment))
+
+        memberStruct
+    }
+
+    value.set(GROUP_METADATA_MEMBERS_V0, memberArray.toArray)
+
+    val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
+    byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
+    value.writeTo(byteBuffer)
+    byteBuffer.array()
+  }
+
+  /**
+   * Decodes the offset messages' key
+   *
+   * @param buffer input byte-buffer
+   * @return an GroupTopicPartition object
+   */
+  private def readMessageKey(buffer: ByteBuffer): BaseKey = {
+    val version = buffer.getShort
+    val keySchema = schemaForKey(version)
+    val key = keySchema.read(buffer).asInstanceOf[Struct]
+
+    if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) {
+      // version 0 and 1 refer to offset
+      val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String]
+      val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
+      val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
+
+      OffsetKey(version, GroupTopicPartition(group, TopicAndPartition(topic, partition)))
+
+    } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
+      // version 2 refers to offset
+      val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String]
+
+      GroupKey(version, group)
+    } else {
+      throw new IllegalStateException("Unknown version " + version + " for group metadata message")
+    }
+  }
+
+  /**
+   * Decodes the offset messages' payload and retrieves offset and metadata from it
+   *
+   * @param buffer input byte-buffer
+   * @return an offset-metadata object from the message
+   */
+  private def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = {
+    if(buffer == null) { // tombstone
+      null
+    } else {
+      val version = buffer.getShort
+      val valueSchema = schemaForOffset(version)
+      val value = valueSchema.read(buffer).asInstanceOf[Struct]
+
+      if (version == 0) {
+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long]
+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String]
+        val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, timestamp)
+      } else if (version == 1) {
+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long]
+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String]
+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+        val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+      } else {
+        throw new IllegalStateException("Unknown offset message version")
+      }
+    }
+  }
+
+  /**
+   * Decodes the group metadata messages' payload and retrieves its member metadatafrom it
+   *
+   * @param buffer input byte-buffer
+   * @return a group metadata object from the message
+   */
+  private def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
+    if(buffer == null) { // tombstone
+      null
+    } else {
+      val version = buffer.getShort
+      val valueSchema = schemaForGroup(version)
+      val value = valueSchema.read(buffer).asInstanceOf[Struct]
+
+      if (version == 0) {
+        val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String]
+
+        val group = new GroupMetadata(groupId, protocolType)
+
+        group.generationId = value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int]
+        group.leaderId = value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String]
+        group.protocol = value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String]
+
+        value.getArray(GROUP_METADATA_MEMBERS_V0).foreach {
+          case memberMetadataObj =>
+            val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
+            val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String]
+            val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int]
+            val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer])
+
+            val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription)))
+
+            member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer])
+
+            group.add(memberId, member)
+        }
+
+        group
+      } else {
+        throw new IllegalStateException("Unknown group metadata message version")
+      }
+    }
+  }
+
+  // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false.
+  // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets)
+  class OffsetsMessageFormatter extends MessageFormatter {
+    def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {
+      val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key)).toString
+      val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString
+      output.write(formattedKey.getBytes)
+      output.write("::".getBytes)
+      output.write(formattedValue.getBytes)
+      output.write("\n".getBytes)
+    }
+  }
+}
+
+case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
+
+  def this(group: String, topic: String, partition: Int) =
+    this(group, new TopicAndPartition(topic, partition))
+
+  override def toString =
+    "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)
+}
+
+trait BaseKey{
+  def version: Short
+  def key: Object
+}
+
+case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey
+
+case class GroupKey(version: Short, key: String) extends BaseKey
+


Mime
View raw message