Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B4ACA17B74 for ; Tue, 3 Nov 2015 07:54:50 +0000 (UTC) Received: (qmail 1703 invoked by uid 500); 3 Nov 2015 07:39:22 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 1069 invoked by uid 500); 3 Nov 2015 07:39:22 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 98983 invoked by uid 99); 3 Nov 2015 07:36:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Nov 2015 07:36:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DAEF4E0B28; Tue, 3 Nov 2015 07:36:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Date: Tue, 03 Nov 2015 07:36:22 -0000 Message-Id: <7eb8b5df15444871baac002cbbf83656@git.apache.org> In-Reply-To: <12bdf5e004364d608d66ea95f601a820@git.apache.org> References: <12bdf5e004364d608d66ea95f601a820@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] kafka git commit: KAFKA-2017: Persist Group Metadata and Assignment before Responding KAFKA-2017: Persist Group Metadata and Assignment before Responding Author: Guozhang Wang Reviewers: Onur Karaman, Jason Gustafson, Jun Rao Closes #386 from guozhangwang/K2017 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c334752 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c334752 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c334752 Branch: refs/heads/trunk Commit: 7c33475274cb6e65a8e8d907e7fef6e56bc8c8e6 Parents: e466ccd Author: Guozhang Wang Authored: Mon Nov 2 23:41:58 2015 -0800 Committer: Guozhang Wang Committed: Mon Nov 2 23:41:58 2015 -0800 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 6 +- .../consumer/internals/ConsumerCoordinator.java | 9 +- .../errors/GroupLoadInProgressException.java | 40 + .../errors/OffsetLoadInProgressException.java | 40 - .../apache/kafka/common/protocol/Errors.java | 4 +- .../common/requests/JoinGroupResponse.java | 1 + .../common/requests/LeaveGroupResponse.java | 1 + .../common/requests/OffsetCommitResponse.java | 1 + .../common/requests/OffsetFetchResponse.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 2 +- .../main/scala/kafka/admin/TopicCommand.scala | 2 +- .../main/scala/kafka/common/ErrorMapping.scala | 2 +- .../kafka/common/OffsetMetadataAndError.scala | 2 +- core/src/main/scala/kafka/common/Topic.scala | 2 +- .../kafka/coordinator/CoordinatorMetadata.scala | 81 -- .../kafka/coordinator/GroupCoordinator.scala | 139 ++- .../scala/kafka/coordinator/GroupMetadata.scala | 6 +- .../coordinator/GroupMetadataManager.scala | 952 +++++++++++++++++++ .../kafka/coordinator/MemberMetadata.scala | 7 +- .../scala/kafka/coordinator/OffsetConfig.scala | 61 ++ .../src/main/scala/kafka/server/KafkaApis.scala | 12 +- .../main/scala/kafka/server/KafkaConfig.scala | 19 +- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../main/scala/kafka/server/OffsetManager.scala | 627 ------------ .../kafka/api/AuthorizerIntegrationTest.scala | 2 +- .../kafka/api/BaseConsumerTest.scala | 2 +- .../kafka/api/IntegrationTestHarness.scala | 2 +- .../unit/kafka/admin/TopicCommandTest.scala | 8 +- .../unit/kafka/consumer/TopicFilterTest.scala | 9 +- .../coordinator/CoordinatorMetadataTest.scala | 71 -- .../GroupCoordinatorResponseTest.scala | 515 ++++------ .../kafka/coordinator/GroupMetadataTest.scala | 12 - 32 files changed, 1390 insertions(+), 1251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 4b2a824..d8f3c25 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -351,6 +351,10 @@ public abstract class AbstractCoordinator { } else { onJoinFollower().chain(future); } + } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { + log.debug("Attempt to join group {} rejected since coordinator is loading the group.", groupId); + // backoff and retry + future.raise(Errors.forCode(errorCode)); } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) { // reset the member id and retry immediately AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID; @@ -362,7 +366,7 @@ public abstract class AbstractCoordinator { // re-discover the coordinator and retry with backoff coordinatorDead(); log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.", - groupId); + groupId); future.raise(Errors.forCode(errorCode)); } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code() || errorCode == Errors.INVALID_SESSION_TIMEOUT.code() http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 641939a..97d25c3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -451,7 +451,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl // update the local cache only if the partition is still assigned subscriptions.committed(tp, offsetAndMetadata); } else { - if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() + if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { + // just retry + future.raise(Errors.GROUP_LOAD_IN_PROGRESS); + } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code() || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { coordinatorDead(); } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code() @@ -511,9 +514,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator implements Cl log.debug("Error fetching offset for topic-partition {}: {}", tp, Errors.forCode(data.errorCode) .exception() .getMessage()); - if (data.errorCode == Errors.OFFSET_LOAD_IN_PROGRESS.code()) { + if (data.errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) { // just retry - future.raise(Errors.OFFSET_LOAD_IN_PROGRESS); + future.raise(Errors.GROUP_LOAD_IN_PROGRESS); } else if (data.errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) { // re-discover the coordinator and retry coordinatorDead(); http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java new file mode 100644 index 0000000..17e205f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/GroupLoadInProgressException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package org.apache.kafka.common.errors; + +/** + * The broker returns this error code for any coordiantor request if it is still loading the metadata (after a leader change + * for that offsets topic partition) for this group. + */ +public class GroupLoadInProgressException extends RetriableException { + + private static final long serialVersionUID = 1L; + + public GroupLoadInProgressException() { + super(); + } + + public GroupLoadInProgressException(String message) { + super(message); + } + + public GroupLoadInProgressException(String message, Throwable cause) { + super(message, cause); + } + + public GroupLoadInProgressException(Throwable cause) { + super(cause); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java deleted file mode 100644 index 016506e..0000000 --- a/clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE - * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file - * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - */ - -package org.apache.kafka.common.errors; - -/** - * The broker returns this error code for an offset fetch request if it is still loading offsets (after a leader change - * for that offsets topic partition). - */ -public class OffsetLoadInProgressException extends RetriableException { - - private static final long serialVersionUID = 1L; - - public OffsetLoadInProgressException() { - super(); - } - - public OffsetLoadInProgressException(String message) { - super(message); - } - - public OffsetLoadInProgressException(String message, Throwable cause) { - super(message, cause); - } - - public OffsetLoadInProgressException(Throwable cause) { - super(cause); - } - -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index bc607f0..2c9cb20 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -58,8 +58,8 @@ public enum Errors { new OffsetMetadataTooLarge("The metadata field of the offset request was too large.")), NETWORK_EXCEPTION(13, new NetworkException("The server disconnected before a response was received.")), - OFFSET_LOAD_IN_PROGRESS(14, - new OffsetLoadInProgressException("The coordinator is loading offsets and can't process requests.")), + GROUP_LOAD_IN_PROGRESS(14, + new GroupLoadInProgressException("The coordinator is loading and hence can't process requests for this group.")), GROUP_COORDINATOR_NOT_AVAILABLE(15, new GroupCoordinatorNotAvailableException("The group coordinator is not available.")), NOT_COORDINATOR_FOR_GROUP(16, http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index c65a4bb..0615e5e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -31,6 +31,7 @@ public class JoinGroupResponse extends AbstractRequestResponse { /** * Possible error code: * + * GROUP_LOAD_IN_PROGRESS (14) * GROUP_COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR_FOR_GROUP (16) * INCONSISTENT_GROUP_PROTOCOL (23) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index d2af1a1..278e3e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -26,6 +26,7 @@ public class LeaveGroupResponse extends AbstractRequestResponse { /** * Possible error code: * + * GROUP_LOAD_IN_PROGRESS (14) * CONSUMER_COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR_FOR_CONSUMER (16) * UNKNOWN_CONSUMER_ID (25) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index dae9c37..baea6f9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -42,6 +42,7 @@ public class OffsetCommitResponse extends AbstractRequestResponse { * Possible error code: * * OFFSET_METADATA_TOO_LARGE (12) + * GROUP_LOAD_IN_PROGRESS (14) * GROUP_COORDINATOR_NOT_AVAILABLE (15) * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index 09ac74a..afc5618 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -48,7 +48,7 @@ public class OffsetFetchResponse extends AbstractRequestResponse { * Possible error code: * * UNKNOWN_TOPIC_OR_PARTITION (3) <- only for request v0 - * OFFSET_LOAD_IN_PROGRESS (14) + * GROUP_LOAD_IN_PROGRESS (14) * NOT_COORDINATOR_FOR_GROUP (16) * ILLEGAL_GENERATION (22) * UNKNOWN_MEMBER_ID (25) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 963da42..2029e92 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -693,7 +693,7 @@ public class ConsumerCoordinatorTest { subscriptions.assignFromUser(Arrays.asList(tp)); subscriptions.needRefreshCommits(); - client.prepareResponse(offsetFetchResponse(tp, Errors.OFFSET_LOAD_IN_PROGRESS.code(), "", 100L)); + client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L)); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); coordinator.refreshCommittedOffsetsIfNeeded(); assertFalse(subscriptions.refreshCommitsNeeded()); http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/admin/TopicCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala index 51b4957..37ef9dc 100755 --- a/core/src/main/scala/kafka/admin/TopicCommand.scala +++ b/core/src/main/scala/kafka/admin/TopicCommand.scala @@ -129,7 +129,7 @@ object TopicCommand extends Logging { } if(opts.options.has(opts.partitionsOpt)) { - if (topic == GroupCoordinator.OffsetsTopicName) { + if (topic == GroupCoordinator.GroupMetadataTopicName) { throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.") } println("WARNING: If partitions are increased for a topic that has a key, the partition " + http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/common/ErrorMapping.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala index 23224ec..81cb51b 100644 --- a/core/src/main/scala/kafka/common/ErrorMapping.scala +++ b/core/src/main/scala/kafka/common/ErrorMapping.scala @@ -59,7 +59,7 @@ object ErrorMapping { // 26: INVALID_SESSION_TIMEOUT // 27: COMMITTING_PARTITIONS_NOT_ASSIGNED // 28: INVALID_COMMIT_OFFSET_SIZE - val AuthorizationCode: Short = 29; + val AuthorizationCode: Short = 29 private val exceptionToCode = Map[Class[Throwable], Short]( http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala index bbee894..a94e58c 100644 --- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala +++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala @@ -63,7 +63,7 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = object OffsetMetadataAndError { val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code) - val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code) + val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_LOAD_IN_PROGRESS.code) val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID.code) val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_GROUP.code) val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/common/Topic.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala index ca41eba..982955e 100644 --- a/core/src/main/scala/kafka/common/Topic.scala +++ b/core/src/main/scala/kafka/common/Topic.scala @@ -26,7 +26,7 @@ object Topic { private val maxNameLength = 255 private val rgx = new Regex(legalChars + "+") - val InternalTopics = Set(GroupCoordinator.OffsetsTopicName) + val InternalTopics = Set(GroupCoordinator.GroupMetadataTopicName) def validate(topic: String) { if (topic.length <= 0) http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala deleted file mode 100644 index 2279924..0000000 --- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala +++ /dev/null @@ -1,81 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.coordinator - -import kafka.utils.CoreUtils.{inReadLock, inWriteLock} -import kafka.utils.threadsafe - -import java.util.concurrent.locks.ReentrantReadWriteLock - -import scala.collection.mutable - -/** - * CoordinatorMetadata manages group and topic metadata. - * It delegates all group logic to the callers. - */ -@threadsafe -private[coordinator] class CoordinatorMetadata(brokerId: Int) { - - /** - * NOTE: If a group lock and metadataLock are simultaneously needed, - * be sure to acquire the group lock before metadataLock to prevent deadlock - */ - private val metadataLock = new ReentrantReadWriteLock() - - /** - * These should be guarded by metadataLock - */ - private val groups = new mutable.HashMap[String, GroupMetadata] - - def shutdown() { - inWriteLock(metadataLock) { - groups.clear() - } - } - - /** - * Get the group associated with the given groupId, or null if not found - */ - def getGroup(groupId: String) = { - inReadLock(metadataLock) { - groups.get(groupId).orNull - } - } - - /** - * Add a group or get the group associated with the given groupId if it already exists - */ - def addGroup(groupId: String, protocolType: String) = { - inWriteLock(metadataLock) { - groups.getOrElseUpdate(groupId, new GroupMetadata(groupId, protocolType)) - } - } - - /** - * Remove all metadata associated with the group, including its topics - * @param groupId the groupId of the group we are removing - */ - def removeGroup(groupId: String) { - inWriteLock(metadataLock) { - if (!groups.contains(groupId)) - throw new IllegalArgumentException("Cannot remove non-existing group") - groups.remove(groupId) - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala index 0ef542d..97ce22b 100644 --- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala @@ -29,8 +29,8 @@ import org.apache.kafka.common.requests.JoinGroupRequest import scala.collection.{Map, Seq, immutable} -case class GroupManagerConfig(groupMinSessionTimeoutMs: Int, - groupMaxSessionTimeoutMs: Int) +case class GroupConfig(groupMinSessionTimeoutMs: Int, + groupMaxSessionTimeoutMs: Int) case class JoinGroupResult(members: Map[String, Array[Byte]], memberId: String, @@ -46,9 +46,9 @@ case class JoinGroupResult(members: Map[String, Array[Byte]], * groups. Groups are assigned to coordinators based on their group names. */ class GroupCoordinator(val brokerId: Int, - val groupConfig: GroupManagerConfig, - val offsetConfig: OffsetManagerConfig, - private val offsetManager: OffsetManager) extends Logging { + val groupConfig: GroupConfig, + val offsetConfig: OffsetConfig, + val groupManager: GroupMetadataManager) extends Logging { type JoinCallback = JoinGroupResult => Unit type SyncCallback = (Array[Byte], Short) => Unit @@ -58,15 +58,14 @@ class GroupCoordinator(val brokerId: Int, private var heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat] = null private var joinPurgatory: DelayedOperationPurgatory[DelayedJoin] = null - private var coordinatorMetadata: CoordinatorMetadata = null def this(brokerId: Int, - groupConfig: GroupManagerConfig, - offsetConfig: OffsetManagerConfig, + groupConfig: GroupConfig, + offsetConfig: OffsetConfig, replicaManager: ReplicaManager, zkUtils: ZkUtils, - scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig, - new OffsetManager(offsetConfig, replicaManager, zkUtils, scheduler)) + scheduler: Scheduler) = this(brokerId, groupConfig, offsetConfig, + new GroupMetadataManager(brokerId, offsetConfig, replicaManager, zkUtils, scheduler)) def offsetsTopicConfigs: Properties = { val props = new Properties @@ -88,7 +87,6 @@ class GroupCoordinator(val brokerId: Int, info("Starting up.") heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId) joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", brokerId) - coordinatorMetadata = new CoordinatorMetadata(brokerId) isActive.set(true) info("Startup complete.") } @@ -100,8 +98,7 @@ class GroupCoordinator(val brokerId: Int, def shutdown() { info("Shutting down.") isActive.set(false) - offsetManager.shutdown() - coordinatorMetadata.shutdown() + groupManager.shutdown() heartbeatPurgatory.shutdown() joinPurgatory.shutdown() info("Shutdown complete.") @@ -109,6 +106,7 @@ class GroupCoordinator(val brokerId: Int, def handleJoinGroup(groupId: String, memberId: String, + clientId: String, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], @@ -118,7 +116,9 @@ class GroupCoordinator(val brokerId: Int, } else if (!validGroupId(groupId)) { responseCallback(joinError(memberId, Errors.INVALID_GROUP_ID.code)) } else if (!isCoordinatorForGroup(groupId)) { - responseCallback(joinError(memberId,Errors.NOT_COORDINATOR_FOR_GROUP.code)) + responseCallback(joinError(memberId, Errors.NOT_COORDINATOR_FOR_GROUP.code)) + } else if (isCoordinatorLoadingInProgress(groupId)) { + responseCallback(joinError(memberId, Errors.GROUP_LOAD_IN_PROGRESS.code)) } else if (sessionTimeoutMs < groupConfig.groupMinSessionTimeoutMs || sessionTimeoutMs > groupConfig.groupMaxSessionTimeoutMs) { responseCallback(joinError(memberId, Errors.INVALID_SESSION_TIMEOUT.code)) @@ -126,22 +126,23 @@ class GroupCoordinator(val brokerId: Int, // only try to create the group if the group is not unknown AND // the member id is UNKNOWN, if member is specified but group does not // exist we should reject the request - var group = coordinatorMetadata.getGroup(groupId) + var group = groupManager.getGroup(groupId) if (group == null) { if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) } else { - group = coordinatorMetadata.addGroup(groupId, protocolType) - doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback) + group = groupManager.addGroup(groupId, protocolType) + doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback) } } else { - doJoinGroup(group, memberId, sessionTimeoutMs, protocolType, protocols, responseCallback) + doJoinGroup(group, memberId, clientId, sessionTimeoutMs, protocolType, protocols, responseCallback) } } } private def doJoinGroup(group: GroupMetadata, memberId: String, + clientId: String, sessionTimeoutMs: Int, protocolType: String, protocols: List[(String, Array[Byte])], @@ -165,7 +166,7 @@ class GroupCoordinator(val brokerId: Int, case PreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { - addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback) } else { val member = group.get(memberId) updateMemberAndRebalance(group, member, protocols, responseCallback) @@ -173,7 +174,7 @@ class GroupCoordinator(val brokerId: Int, case AwaitingSync => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { - addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback) } else { val member = group.get(memberId) if (member.matches(protocols)) { @@ -200,7 +201,7 @@ class GroupCoordinator(val brokerId: Int, case Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { // if the member id is unknown, register the member to the group - addMemberAndRebalance(sessionTimeoutMs, protocols, group, responseCallback) + addMemberAndRebalance(sessionTimeoutMs, clientId, protocols, group, responseCallback) } else { val member = group.get(memberId) if (memberId == group.leaderId || !member.matches(protocols)) { @@ -238,7 +239,7 @@ class GroupCoordinator(val brokerId: Int, } else if (!isCoordinatorForGroup(groupId)) { responseCallback(Array.empty, Errors.NOT_COORDINATOR_FOR_GROUP.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + val group = groupManager.getGroup(groupId) if (group == null) responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID.code) else @@ -272,7 +273,9 @@ class GroupCoordinator(val brokerId: Int, // propagate the assignment to any awaiting members if (memberId == group.leaderId) { group.transitionTo(Stable) - propagateAssignment(group, groupAssignment) + + // persist the group metadata and upon finish propagate the assignment + groupManager.storeGroup(group, groupAssignment) } case Stable => @@ -290,8 +293,10 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) } else if (!isCoordinatorForGroup(groupId)) { responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) + } else if (isCoordinatorLoadingInProgress(groupId)) { + responseCallback(Errors.GROUP_LOAD_IN_PROGRESS.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + val group = groupManager.getGroup(groupId) if (group == null) { // if the group is marked as dead, it means some other thread has just removed the group // from the coordinator metadata; this is likely that the group has migrated to some other @@ -323,17 +328,20 @@ class GroupCoordinator(val brokerId: Int, responseCallback(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code) } else if (!isCoordinatorForGroup(groupId)) { responseCallback(Errors.NOT_COORDINATOR_FOR_GROUP.code) + } else if (isCoordinatorLoadingInProgress(groupId)) { + // the group is still loading, so respond just blindly + responseCallback(Errors.NONE.code) } else { - val group = coordinatorMetadata.getGroup(groupId) + val group = groupManager.getGroup(groupId) if (group == null) { - // if the group is marked as dead, it means some other thread has just removed the group - // from the coordinator metadata; this is likely that the group has migrated to some other - // coordinator OR the group is in a transient unstable phase. Let the member retry - // joining without the specified member id, responseCallback(Errors.UNKNOWN_MEMBER_ID.code) } else { group synchronized { if (group.is(Dead)) { + // if the group is marked as dead, it means some other thread has just removed the group + // from the coordinator metadata; this is likely that the group has migrated to some other + // coordinator OR the group is in a transient unstable phase. Let the member retry + // joining without the specified member id, responseCallback(Errors.UNKNOWN_MEMBER_ID.code) } else if (!group.is(Stable)) { responseCallback(Errors.REBALANCE_IN_PROGRESS.code) @@ -360,12 +368,14 @@ class GroupCoordinator(val brokerId: Int, responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code)) } else if (!isCoordinatorForGroup(groupId)) { responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_GROUP.code)) + } else if (isCoordinatorLoadingInProgress(groupId)) { + responseCallback(offsetMetadata.mapValues(_ => Errors.GROUP_LOAD_IN_PROGRESS.code)) } else { - val group = coordinatorMetadata.getGroup(groupId) + val group = groupManager.getGroup(groupId) if (group == null) { if (generationId < 0) // the group is not relying on Kafka for partition management, so allow the commit - offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback) + groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback) else // the group has failed over to this coordinator (which will be handled in KAFKA-2017), // or this is a request coming from an older generation. either way, reject the commit @@ -381,7 +391,7 @@ class GroupCoordinator(val brokerId: Int, } else if (generationId != group.generationId) { responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code)) } else { - offsetManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback) + groupManager.storeOffsets(groupId, memberId, generationId, offsetMetadata, responseCallback) } } } @@ -394,21 +404,21 @@ class GroupCoordinator(val brokerId: Int, partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap } else if (!isCoordinatorForGroup(groupId)) { partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap + } else if (isCoordinatorLoadingInProgress(groupId)) { + partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupLoading)}.toMap } else { // return offsets blindly regardless the current group state since the group may be using // Kafka commit storage without automatic group management - offsetManager.getOffsets(groupId, partitions) + groupManager.getOffsets(groupId, partitions) } } def handleGroupImmigration(offsetTopicPartitionId: Int) = { - // TODO we may need to add more logic in KAFKA-2017 - offsetManager.loadOffsetsFromLog(offsetTopicPartitionId) + groupManager.loadGroupsForPartition(offsetTopicPartitionId) } def handleGroupEmigration(offsetTopicPartitionId: Int) = { - // TODO we may need to add more logic in KAFKA-2017 - offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId) + groupManager.removeGroupsForPartition(offsetTopicPartitionId) } private def validGroupId(groupId: String): Boolean = { @@ -425,17 +435,6 @@ class GroupCoordinator(val brokerId: Int, errorCode=errorCode) } - private def propagateAssignment(group: GroupMetadata, - assignment: Map[String, Array[Byte]]) { - for (member <- group.allMembers) { - member.assignment = assignment.getOrElse(member.memberId, Array.empty[Byte]) - if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(member.assignment, Errors.NONE.code) - member.awaitingSyncCallback = null - } - } - } - /** * Complete existing DelayedHeartbeats for the given member and schedule the next one */ @@ -458,10 +457,12 @@ class GroupCoordinator(val brokerId: Int, } private def addMemberAndRebalance(sessionTimeoutMs: Int, + clientId: String, protocols: List[(String, Array[Byte])], group: GroupMetadata, callback: JoinCallback) = { - val memberId = group.generateNextMemberId + // use the client-id with a random id suffix as the member-id + val memberId = clientId + "-" + group.generateMemberIdSuffix val member = new MemberMetadata(memberId, group.groupId, sessionTimeoutMs, protocols) member.awaitingJoinCallback = callback group.add(member.memberId, member) @@ -488,12 +489,7 @@ class GroupCoordinator(val brokerId: Int, private def prepareRebalance(group: GroupMetadata) { // if any members are awaiting sync, cancel their request and have them rejoin if (group.is(AwaitingSync)) { - for (member <- group.allMembers) { - if (member.awaitingSyncCallback != null) { - member.awaitingSyncCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS.code) - member.awaitingSyncCallback = null - } - } + groupManager.propagateAssignment(group, Errors.REBALANCE_IN_PROGRESS.code) } group.allMembers.foreach(_.assignment = null) @@ -537,14 +533,14 @@ class GroupCoordinator(val brokerId: Int, // TODO: cut the socket connection to the client } + // TODO KAFKA-2720: only remove group in the background thread if (group.isEmpty) { - group.transitionTo(Dead) + groupManager.removeGroup(group) info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) - coordinatorMetadata.removeGroup(group.groupId) } } if (!group.is(Dead)) { - group.initNextGeneration + group.initNextGeneration() info("Stabilized group %s generation %s".format(group.groupId, group.generationId)) // trigger the awaiting join group response callback for all the members after rebalancing @@ -585,27 +581,31 @@ class GroupCoordinator(val brokerId: Int, // TODO: add metrics for complete heartbeats } - def partitionFor(group: String): Int = offsetManager.partitionFor(group) + def partitionFor(group: String): Int = groupManager.partitionFor(group) private def shouldKeepMemberAlive(member: MemberMetadata, heartbeatDeadline: Long) = member.awaitingJoinCallback != null || member.awaitingSyncCallback != null || member.latestHeartbeat + member.sessionTimeoutMs > heartbeatDeadline - private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId)) + private def isCoordinatorForGroup(groupId: String) = groupManager.isGroupLocal(groupId) + + private def isCoordinatorLoadingInProgress(groupId: String) = groupManager.isGroupLoading(groupId) } object GroupCoordinator { val NoProtocol = "" val NoLeader = "" - val OffsetsTopicName = "__consumer_offsets" + + // TODO: we store both group metadata and offset data here despite the topic name being offsets only + val GroupMetadataTopicName = "__consumer_offsets" def create(config: KafkaConfig, zkUtils: ZkUtils, replicaManager: ReplicaManager, - kafkaScheduler: KafkaScheduler): GroupCoordinator = { - val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + scheduler: Scheduler): GroupCoordinator = { + val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, @@ -613,16 +613,15 @@ object GroupCoordinator { offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, + val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs) - new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, kafkaScheduler) + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkUtils, scheduler) } def create(config: KafkaConfig, - zkUtils: ZkUtils, - offsetManager: OffsetManager): GroupCoordinator = { - val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize, + groupManager: GroupMetadataManager): GroupCoordinator = { + val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L, offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, @@ -630,9 +629,9 @@ object GroupCoordinator { offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks) - val groupConfig = GroupManagerConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, + val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs, groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs) - new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager) + new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupManager) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/GroupMetadata.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala index 60ee987..652a3a4 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala @@ -86,7 +86,7 @@ private[coordinator] case object Dead extends GroupState { val state: Byte = 4 } private object GroupMetadata { private val validPreviousStates: Map[GroupState, Set[GroupState]] = - Map(Dead -> Set(PreparingRebalance), + Map(Dead -> Set(Stable, PreparingRebalance, AwaitingSync), AwaitingSync -> Set(PreparingRebalance), Stable -> Set(AwaitingSync), PreparingRebalance -> Set(Stable, AwaitingSync)) @@ -151,7 +151,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: } // TODO: decide if ids should be predictable or random - def generateNextMemberId = UUID.randomUUID().toString + def generateMemberIdSuffix = UUID.randomUUID().toString def canRebalance = state == Stable || state == AwaitingSync @@ -188,7 +188,7 @@ private[coordinator] class GroupMetadata(val groupId: String, val protocolType: isEmpty || (memberProtocols & candidateProtocols).nonEmpty } - def initNextGeneration = { + def initNextGeneration() = { assert(notYetRejoinedMembers == List.empty[MemberMetadata]) generationId += 1 protocol = selectProtocol http://git-wip-us.apache.org/repos/asf/kafka/blob/7c334752/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala new file mode 100644 index 0000000..81ed548 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -0,0 +1,952 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.coordinator + +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.utils.CoreUtils._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.protocol.types.{ArrayOf, Struct, Schema, Field} +import org.apache.kafka.common.protocol.types.Type.STRING +import org.apache.kafka.common.protocol.types.Type.INT32 +import org.apache.kafka.common.protocol.types.Type.INT64 +import org.apache.kafka.common.protocol.types.Type.BYTES +import org.apache.kafka.common.utils.Utils + +import kafka.utils._ +import kafka.common._ +import kafka.message._ +import kafka.log.FileMessageSet +import kafka.metrics.KafkaMetricsGroup +import kafka.common.TopicAndPartition +import kafka.tools.MessageFormatter +import kafka.api.ProducerResponseStatus +import kafka.server.ReplicaManager + +import scala.collection._ +import java.io.PrintStream +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import com.yammer.metrics.core.Gauge + +class GroupMetadataManager(val brokerId: Int, + val config: OffsetConfig, + replicaManager: ReplicaManager, + zkUtils: ZkUtils, + scheduler: Scheduler) extends Logging with KafkaMetricsGroup { + + /* offsets cache */ + private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata] + + /* group metadata cache */ + private val groupsCache = new Pool[String, GroupMetadata] + + /* partitions of consumer groups that are being loaded, its lock should be always called BEFORE the group lock if needed */ + private val loadingPartitions: mutable.Set[Int] = mutable.Set() + + /* partitions of consumer groups that are assigned, using the same loading partition lock */ + private val ownedPartitions: mutable.Set[Int] = mutable.Set() + + /* lock for expiring stale offsets, it should be always called BEFORE the group lock if needed */ + private val offsetExpireLock = new ReentrantReadWriteLock() + + /* lock for removing offsets of a range partition, it should be always called BEFORE the group lock if needed */ + private val offsetRemoveLock = new ReentrantReadWriteLock() + + /* shutting down flag */ + private val shuttingDown = new AtomicBoolean(false) + + /* number of partitions for the consumer metadata topic */ + private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount + + this.logIdent = "[Group Metadata Manager on Broker " + brokerId + "]: " + + scheduler.schedule(name = "delete-expired-consumer-offsets", + fun = deleteExpiredOffsets, + period = config.offsetsRetentionCheckIntervalMs, + unit = TimeUnit.MILLISECONDS) + + newGauge("NumOffsets", + new Gauge[Int] { + def value = offsetsCache.size + } + ) + + newGauge("NumGroups", + new Gauge[Int] { + def value = groupsCache.size + } + ) + + def partitionFor(groupId: String): Int = Utils.abs(groupId.hashCode) % groupMetadataTopicPartitionCount + + def isGroupLocal(groupId: String): Boolean = loadingPartitions synchronized ownedPartitions.contains(partitionFor(groupId)) + + def isGroupLoading(groupId: String): Boolean = loadingPartitions synchronized loadingPartitions.contains(partitionFor(groupId)) + + /** + * Get the group associated with the given groupId, or null if not found + */ + def getGroup(groupId: String): GroupMetadata = { + groupsCache.get(groupId) + } + + /** + * Add a group or get the group associated with the given groupId if it already exists + */ + def addGroup(groupId: String, protocolType: String): GroupMetadata = { + addGroup(groupId, new GroupMetadata(groupId, protocolType)) + } + + private def addGroup(groupId: String, group: GroupMetadata): GroupMetadata = { + groupsCache.putIfNotExists(groupId, group) + groupsCache.get(groupId) + } + + /** + * Remove all metadata associated with the group, note this function needs to be + * called inside the group lock + * @param group + */ + def removeGroup(group: GroupMetadata) { + // first mark the group as dead + group.transitionTo(Dead) + + if (groupsCache.remove(group.groupId) != group) + throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.") + + // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, + // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and + // retry removing this group. + val groupPartition = partitionFor(group.groupId) + val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId)) + + val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition) + partitionOpt.foreach { partition => + val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition) + + trace("Marking group %s as deleted.".format(group.groupId)) + + try { + // do not need to require acks since even if the tombstone is lost, + // it will be appended again by the new leader + // TODO KAFKA-2720: periodic purging instead of immediate removal of groups + partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone)) + } catch { + case t: Throwable => + error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t) + // ignore and continue + } + } + } + + def storeGroup(group: GroupMetadata, + groupAssignment: Map[String, Array[Byte]]) { + // construct the message to append + val message = new Message( + key = GroupMetadataManager.groupMetadataKey(group.groupId), + bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment) + ) + + val groupMetadataPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId)) + + val groupMetadataMessageSet = Map(groupMetadataPartition -> + new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, message)) + + // set the callback function to insert the created group into cache after log append completed + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + // the append response should only contain the topics partition + if (responseStatus.size != 1 || ! responseStatus.contains(groupMetadataPartition)) + throw new IllegalStateException("Append status %s should only have one partition %s" + .format(responseStatus, groupMetadataPartition)) + + // construct the error status in the propagated assignment response + // in the cache + val status = responseStatus(groupMetadataPartition) + + var responseCode = Errors.NONE.code + if (status.error != ErrorMapping.NoError) { + debug("Metadata from group %s with generation %d failed when appending to log due to %s" + .format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error))) + + // transform the log append error code to the corresponding the commit status error code + responseCode = if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) { + Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code + } else if (status.error == ErrorMapping.NotLeaderForPartitionCode) { + Errors.NOT_COORDINATOR_FOR_GROUP.code + } else if (status.error == ErrorMapping.MessageSizeTooLargeCode + || status.error == ErrorMapping.MessageSetSizeTooLargeCode + || status.error == ErrorMapping.InvalidFetchSizeCode) { + + error("Appending metadata message for group %s generation %d failed due to %s, returning UNKNOWN error code to the client" + .format(group.groupId, group.generationId, ErrorMapping.exceptionNameFor(status.error))) + + Errors.UNKNOWN.code + } else { + + error("Appending metadata message for group %s generation %d failed due to unexpected error: %s" + .format(group.groupId, group.generationId, status.error)) + + status.error + } + } + + for (member <- group.allMembers) { + member.assignment = groupAssignment.getOrElse(member.memberId, Array.empty[Byte]) + } + + // propagate the assignments + propagateAssignment(group, responseCode) + } + + // call replica manager to append the group message + replicaManager.appendMessages( + config.offsetCommitTimeoutMs.toLong, + config.offsetCommitRequiredAcks, + true, // allow appending to internal offset topic + groupMetadataMessageSet, + putCacheCallback) + } + + def propagateAssignment(group: GroupMetadata, + errorCode: Short) { + val hasError = errorCode != Errors.NONE.code + for (member <- group.allMembers) { + if (member.awaitingSyncCallback != null) { + member.awaitingSyncCallback(if (hasError) Array.empty else member.assignment, errorCode) + member.awaitingSyncCallback = null + } + } + } + + /** + * Store offsets by appending it to the replicated log and then inserting to cache + */ + def storeOffsets(groupId: String, + consumerId: String, + generationId: Int, + offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) { + // first filter out partitions with offset metadata size exceeding limit + val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) => + validateOffsetMetadataLength(offsetAndMetadata.metadata) + } + + // construct the message set to append + val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + new Message( + key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), + bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata) + ) + }.toSeq + + val offsetTopicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(groupId)) + + val offsetsAndMetadataMessageSet = Map(offsetTopicPartition -> + new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*)) + + // set the callback function to insert offsets into cache after log append completed + def putCacheCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { + // the append response should only contain the topics partition + if (responseStatus.size != 1 || ! responseStatus.contains(offsetTopicPartition)) + throw new IllegalStateException("Append status %s should only have one partition %s" + .format(responseStatus, offsetTopicPartition)) + + // construct the commit response status and insert + // the offset and metadata to cache if the append status has no error + val status = responseStatus(offsetTopicPartition) + + val responseCode = + if (status.error == ErrorMapping.NoError) { + filteredOffsetMetadata.foreach { case (topicAndPartition, offsetAndMetadata) => + putOffset(GroupTopicPartition(groupId, topicAndPartition), offsetAndMetadata) + } + ErrorMapping.NoError + } else { + debug("Offset commit %s from group %s consumer %s with generation %d failed when appending to log due to %s" + .format(filteredOffsetMetadata, groupId, consumerId, generationId, ErrorMapping.exceptionNameFor(status.error))) + + // transform the log append error code to the corresponding the commit status error code + if (status.error == ErrorMapping.UnknownTopicOrPartitionCode) + ErrorMapping.ConsumerCoordinatorNotAvailableCode + else if (status.error == ErrorMapping.NotLeaderForPartitionCode) + ErrorMapping.NotCoordinatorForConsumerCode + else if (status.error == ErrorMapping.MessageSizeTooLargeCode + || status.error == ErrorMapping.MessageSetSizeTooLargeCode + || status.error == ErrorMapping.InvalidFetchSizeCode) + Errors.INVALID_COMMIT_OFFSET_SIZE.code + else + status.error + } + + + // compute the final error codes for the commit response + val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + if (validateOffsetMetadataLength(offsetAndMetadata.metadata)) + (topicAndPartition, responseCode) + else + (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode) + } + + // finally trigger the callback logic passed from the API layer + responseCallback(commitStatus) + } + + // call replica manager to append the offset messages + replicaManager.appendMessages( + config.offsetCommitTimeoutMs.toLong, + config.offsetCommitRequiredAcks, + true, // allow appending to internal offset topic + offsetsAndMetadataMessageSet, + putCacheCallback) + } + + /** + * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either + * returns the current offset or it begins to sync the cache from the log (and returns an error code). + */ + def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = { + trace("Getting offsets %s for group %s.".format(topicPartitions, group)) + + if (isGroupLocal(group)) { + if (topicPartitions.isEmpty) { + // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.) + offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) => + (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError)) + }.toMap + } else { + topicPartitions.map { topicAndPartition => + val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) + (groupTopicPartition.topicPartition, getOffset(groupTopicPartition)) + }.toMap + } + } else { + debug("Could not fetch offsets for group %s (not offset coordinator).".format(group)) + topicPartitions.map { topicAndPartition => + val groupTopicPartition = GroupTopicPartition(group, topicAndPartition) + (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup) + }.toMap + } + } + + /** + * Asynchronously read the partition from the offsets topic and populate the cache + */ + def loadGroupsForPartition(offsetsPartition: Int) { + + val topicPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + + loadingPartitions synchronized { + ownedPartitions.add(offsetsPartition) + + if (loadingPartitions.contains(offsetsPartition)) { + info("Offset load from %s already in progress.".format(topicPartition)) + } else { + loadingPartitions.add(offsetsPartition) + scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets) + } + } + + def loadGroupsAndOffsets() { + info("Loading offsets from " + topicPartition) + + val startMs = SystemTime.milliseconds + try { + replicaManager.logManager.getLog(topicPartition) match { + case Some(log) => + var currOffset = log.logSegments.head.baseOffset + val buffer = ByteBuffer.allocate(config.loadBufferSize) + // loop breaks if leader changes at any time during the load, since getHighWatermark is -1 + inWriteLock(offsetExpireLock) { + while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) { + buffer.clear() + val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet] + messages.readInto(buffer, 0) + val messageSet = new ByteBufferMessageSet(buffer) + messageSet.foreach { msgAndOffset => + require(msgAndOffset.message.key != null, "Offset entry key should not be null") + val baseKey = GroupMetadataManager.readMessageKey(msgAndOffset.message.key) + + if (baseKey.isInstanceOf[OffsetKey]) { + // load offset + val key = baseKey.key.asInstanceOf[GroupTopicPartition] + if (msgAndOffset.message.payload == null) { + if (offsetsCache.remove(key) != null) + trace("Removed offset for %s due to tombstone entry.".format(key)) + else + trace("Ignoring redundant tombstone for %s.".format(key)) + } else { + // special handling for version 0: + // set the expiration time stamp as commit time stamp + server default retention time + val value = GroupMetadataManager.readOffsetMessageValue(msgAndOffset.message.payload) + putOffset(key, value.copy ( + expireTimestamp = { + if (value.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) + value.commitTimestamp + config.offsetsRetentionMs + else + value.expireTimestamp + } + )) + trace("Loaded offset %s for %s.".format(value, key)) + } + } else { + // load group metadata + val groupId = baseKey.key.asInstanceOf[String] + val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, msgAndOffset.message.payload) + + addGroup(groupId, groupMetadata) + } + + currOffset = msgAndOffset.nextOffset + } + } + } + + if (!shuttingDown.get()) + info("Finished loading offsets from %s in %d milliseconds." + .format(topicPartition, SystemTime.milliseconds - startMs)) + case None => + warn("No log found for " + topicPartition) + } + } + catch { + case t: Throwable => + error("Error in loading offsets from " + topicPartition, t) + } + finally { + loadingPartitions synchronized loadingPartitions.remove(offsetsPartition) + } + } + } + + /** + * When this broker becomes a follower for an offsets topic partition clear out the cache for groups that belong to + * that partition. + * @param offsetsPartition Groups belonging to this partition of the offsets topic will be deleted from the cache. + */ + def removeGroupsForPartition(offsetsPartition: Int) { + var numOffsetsRemoved = 0 + var numGroupsRemoved = 0 + + loadingPartitions synchronized { + // we need to guard the group removal in cache in the loading partition lock + // to prevent coordinator's check-and-get-group race condition + ownedPartitions.remove(offsetsPartition) + + // clear the offsets for this partition in the cache + + /** + * NOTE: we need to put this in the loading partition lock as well to prevent race condition of the leader-is-local check + * in getOffsets to protects against fetching from an empty/cleared offset cache (i.e., cleared due to a leader->follower + * transition right after the check and clear the cache), causing offset fetch return empty offsets with NONE error code + */ + offsetsCache.keys.foreach { key => + if (partitionFor(key.group) == offsetsPartition) { + offsetsCache.remove(key) + numOffsetsRemoved += 1 + } + } + + // clear the groups for this partition in the cache + for (group <- groupsCache.values) { + group synchronized { + // mark the group as dead and then remove it from cache + group.transitionTo(Dead) + + if (groupsCache.remove(group.groupId) != group) + throw new IllegalArgumentException("Cannot remove group " + group.groupId + " since it has been replaced.") + + numGroupsRemoved += 1 + } + } + } + + if (numOffsetsRemoved > 0) info("Removed %d cached offsets for %s on follower transition." + .format(numOffsetsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition))) + + if (numGroupsRemoved > 0) info("Removed %d cached groups for %s on follower transition." + .format(numGroupsRemoved, TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition))) + } + + /** + * Fetch the current offset for the given group/topic/partition from the underlying offsets storage. + * + * @param key The requested group-topic-partition + * @return If the key is present, return the offset and metadata; otherwise return None + */ + private def getOffset(key: GroupTopicPartition) = { + val offsetAndMetadata = offsetsCache.get(key) + if (offsetAndMetadata == null) + OffsetMetadataAndError.NoOffset + else + OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, ErrorMapping.NoError) + } + + /** + * Put the (already committed) offset for the given group/topic/partition into the cache. + * + * @param key The group-topic-partition + * @param offsetAndMetadata The offset/metadata to be stored + */ + private def putOffset(key: GroupTopicPartition, offsetAndMetadata: OffsetAndMetadata) { + offsetsCache.put(key, offsetAndMetadata) + } + + private def deleteExpiredOffsets() { + debug("Collecting expired offsets.") + val startMs = SystemTime.milliseconds + + val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) { + val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata) => + offsetAndMetadata.expireTimestamp < startMs + } + + debug("Found %d expired offsets.".format(expiredOffsets.size)) + + // delete the expired offsets from the table and generate tombstone messages to remove them from the log + val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata) => + val offsetsPartition = partitionFor(groupTopicAndPartition.group) + trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata)) + + offsetsCache.remove(groupTopicAndPartition) + + val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group, + groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) + + (offsetsPartition, new Message(bytes = null, key = commitKey)) + }.groupBy { case (partition, tombstone) => partition } + + // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, + // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles. + tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) => + val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + partitionOpt.map { partition => + val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, offsetsPartition) + val messages = tombstones.map(_._2).toSeq + + trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition)) + + try { + // do not need to require acks since even if the tombsone is lost, + // it will be appended again in the next purge cycle + partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages: _*)) + tombstones.size + } + catch { + case t: Throwable => + error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size, appendPartition), t) + // ignore and continue + 0 + } + } + }.sum + } + + info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved, SystemTime.milliseconds - startMs)) + } + + private def getHighWatermark(partitionId: Int): Long = { + val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, partitionId) + + val hw = partitionOpt.map { partition => + partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L) + }.getOrElse(-1L) + + hw + } + + /* + * Check if the offset metadata length is valid + */ + private def validateOffsetMetadataLength(metadata: String) : Boolean = { + metadata == null || metadata.length() <= config.maxMetadataSize + } + + def shutdown() { + shuttingDown.set(true) + + // TODO: clear the caches + } + + /** + * Gets the partition count of the offsets topic from ZooKeeper. + * If the topic does not exist, the configured partition count is returned. + */ + private def getOffsetsTopicPartitionCount = { + val topic = GroupCoordinator.GroupMetadataTopicName + val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic)) + if (topicData(topic).nonEmpty) + topicData(topic).size + else + config.offsetsTopicNumPartitions + } + + /** + * Add the partition into the owned list + * + * NOTE: this is for test only + */ + def addPartitionOwnership(partition: Int) { + loadingPartitions synchronized { + ownedPartitions.add(partition) + } + } +} + +/** + * Messages stored for the group topic has versions for both the key and value fields. Key + * version is used to indicate the type of the message (also to differentiate different types + * of messages from being compacted together if they have the same field values); and value + * version is used to evolve the messages within their data types: + * + * key version 0: group consumption offset + * -> value version 0: [offset, metadata, timestamp] + * + * key version 1: group consumption offset + * -> value version 1: [offset, metadata, commit_timestamp, expire_timestamp] + * + * key version 2: group metadata + * -> value version 0: [protocol_type, generation, protocol, leader, members] + */ +object GroupMetadataManager { + + private val CURRENT_OFFSET_KEY_SCHEMA_VERSION = 1.toShort + private val CURRENT_GROUP_KEY_SCHEMA_VERSION = 2.toShort + + private val OFFSET_COMMIT_KEY_SCHEMA = new Schema(new Field("group", STRING), + new Field("topic", STRING), + new Field("partition", INT32)) + private val OFFSET_KEY_GROUP_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("group") + private val OFFSET_KEY_TOPIC_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("topic") + private val OFFSET_KEY_PARTITION_FIELD = OFFSET_COMMIT_KEY_SCHEMA.get("partition") + + private val OFFSET_COMMIT_VALUE_SCHEMA_V0 = new Schema(new Field("offset", INT64), + new Field("metadata", STRING, "Associated metadata.", ""), + new Field("timestamp", INT64)) + private val OFFSET_VALUE_OFFSET_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("offset") + private val OFFSET_VALUE_METADATA_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("metadata") + private val OFFSET_VALUE_TIMESTAMP_FIELD_V0 = OFFSET_COMMIT_VALUE_SCHEMA_V0.get("timestamp") + + private val OFFSET_COMMIT_VALUE_SCHEMA_V1 = new Schema(new Field("offset", INT64), + new Field("metadata", STRING, "Associated metadata.", ""), + new Field("commit_timestamp", INT64), + new Field("expire_timestamp", INT64)) + private val OFFSET_VALUE_OFFSET_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("offset") + private val OFFSET_VALUE_METADATA_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("metadata") + private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp") + private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp") + + private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING)) + private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group") + + private val MEMBER_METADATA_V0 = new Schema(new Field("member_id", STRING), + new Field("session_timeout", INT32), + new Field("subscription", BYTES), + new Field("assignment", BYTES)) + private val MEMBER_METADATA_MEMBER_ID_V0 = MEMBER_METADATA_V0.get("member_id") + private val MEMBER_METADATA_SESSION_TIMEOUT_V0 = MEMBER_METADATA_V0.get("session_timeout") + private val MEMBER_METADATA_SUBSCRIPTION_V0 = MEMBER_METADATA_V0.get("subscription") + private val MEMBER_METADATA_ASSIGNMENT_V0 = MEMBER_METADATA_V0.get("assignment") + + + private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(new Field("protocol_type", STRING), + new Field("generation", INT32), + new Field("protocol", STRING), + new Field("leader", STRING), + new Field("members", new ArrayOf(MEMBER_METADATA_V0))) + private val GROUP_METADATA_PROTOCOL_TYPE_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol_type") + private val GROUP_METADATA_GENERATION_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("generation") + private val GROUP_METADATA_PROTOCOL_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("protocol") + private val GROUP_METADATA_LEADER_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("leader") + private val GROUP_METADATA_MEMBERS_V0 = GROUP_METADATA_VALUE_SCHEMA_V0.get("members") + + // map of versions to key schemas as data types + private val MESSAGE_TYPE_SCHEMAS = Map(0 -> OFFSET_COMMIT_KEY_SCHEMA, + 1 -> OFFSET_COMMIT_KEY_SCHEMA, + 2 -> GROUP_METADATA_KEY_SCHEMA) + + // map of version of offset value schemas + private val OFFSET_VALUE_SCHEMAS = Map(0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0, + 1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1) + private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort + + // map of version of group metadata value schemas + private val GROUP_VALUE_SCHEMAS = Map(0 -> GROUP_METADATA_VALUE_SCHEMA_V0) + private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 0.toShort + + private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION) + + private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION) + private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION) + + private def schemaForKey(version: Int) = { + val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown offset schema version " + version) + } + } + + private def schemaForOffset(version: Int) = { + val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown offset schema version " + version) + } + } + + private def schemaForGroup(version: Int) = { + val schemaOpt = GROUP_VALUE_SCHEMAS.get(version) + schemaOpt match { + case Some(schema) => schema + case _ => throw new KafkaException("Unknown offset schema version " + version) + } + } + + /** + * Generates the key for offset commit message for given (group, topic, partition) + * + * @return key for offset commit message + */ + private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short = 0): Array[Byte] = { + val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA) + key.set(OFFSET_KEY_GROUP_FIELD, group) + key.set(OFFSET_KEY_TOPIC_FIELD, topic) + key.set(OFFSET_KEY_PARTITION_FIELD, partition) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf) + byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + key.writeTo(byteBuffer) + byteBuffer.array() + } + + /** + * Generates the key for group metadata message for given group + * + * @return key bytes for group metadata message + */ + private def groupMetadataKey(group: String): Array[Byte] = { + val key = new Struct(CURRENT_GROUP_KEY_SCHEMA) + key.set(GROUP_KEY_GROUP_FIELD, group) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf) + byteBuffer.putShort(CURRENT_GROUP_KEY_SCHEMA_VERSION) + key.writeTo(byteBuffer) + byteBuffer.array() + } + + /** + * Generates the payload for offset commit message from given offset and metadata + * + * @param offsetAndMetadata consumer's current offset and metadata + * @return payload for offset commit message + */ + private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = { + // generate commit value with schema version 1 + val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA) + value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset) + value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata) + value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp) + value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp) + val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) + byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + value.writeTo(byteBuffer) + byteBuffer.array() + } + + /** + * Generates the payload for group metadata message from given offset and metadata + * assuming the generation id, selected protocol, leader and member assignment are all available + * + * @param groupMetadata + * @return payload for offset commit message + */ + private def groupMetadataValue(groupMetadata: GroupMetadata, assignment: Map[String, Array[Byte]]): Array[Byte] = { + // generate commit value with schema version 1 + val value = new Struct(CURRENT_GROUP_VALUE_SCHEMA) + value.set(GROUP_METADATA_PROTOCOL_TYPE_V0, groupMetadata.protocolType) + value.set(GROUP_METADATA_GENERATION_V0, groupMetadata.generationId) + value.set(GROUP_METADATA_PROTOCOL_V0, groupMetadata.protocol) + value.set(GROUP_METADATA_LEADER_V0, groupMetadata.leaderId) + + val memberArray = groupMetadata.allMembers.map { + case memberMetadata => + val memberStruct = value.instance(GROUP_METADATA_MEMBERS_V0) + memberStruct.set(MEMBER_METADATA_MEMBER_ID_V0, memberMetadata.memberId) + memberStruct.set(MEMBER_METADATA_SESSION_TIMEOUT_V0, memberMetadata.sessionTimeoutMs) + + val metadata = memberMetadata.metadata(groupMetadata.protocol) + memberStruct.set(MEMBER_METADATA_SUBSCRIPTION_V0, ByteBuffer.wrap(metadata)) + + val memberAssignment = assignment(memberMetadata.memberId) + assert(memberAssignment != null) + + memberStruct.set(MEMBER_METADATA_ASSIGNMENT_V0, ByteBuffer.wrap(memberAssignment)) + + memberStruct + } + + value.set(GROUP_METADATA_MEMBERS_V0, memberArray.toArray) + + val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf) + byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION) + value.writeTo(byteBuffer) + byteBuffer.array() + } + + /** + * Decodes the offset messages' key + * + * @param buffer input byte-buffer + * @return an GroupTopicPartition object + */ + private def readMessageKey(buffer: ByteBuffer): BaseKey = { + val version = buffer.getShort + val keySchema = schemaForKey(version) + val key = keySchema.read(buffer).asInstanceOf[Struct] + + if (version <= CURRENT_OFFSET_KEY_SCHEMA_VERSION) { + // version 0 and 1 refer to offset + val group = key.get(OFFSET_KEY_GROUP_FIELD).asInstanceOf[String] + val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String] + val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int] + + OffsetKey(version, GroupTopicPartition(group, TopicAndPartition(topic, partition))) + + } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) { + // version 2 refers to offset + val group = key.get(GROUP_KEY_GROUP_FIELD).asInstanceOf[String] + + GroupKey(version, group) + } else { + throw new IllegalStateException("Unknown version " + version + " for group metadata message") + } + } + + /** + * Decodes the offset messages' payload and retrieves offset and metadata from it + * + * @param buffer input byte-buffer + * @return an offset-metadata object from the message + */ + private def readOffsetMessageValue(buffer: ByteBuffer): OffsetAndMetadata = { + if(buffer == null) { // tombstone + null + } else { + val version = buffer.getShort + val valueSchema = schemaForOffset(version) + val value = valueSchema.read(buffer).asInstanceOf[Struct] + + if (version == 0) { + val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V0).asInstanceOf[Long] + val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V0).asInstanceOf[String] + val timestamp = value.get(OFFSET_VALUE_TIMESTAMP_FIELD_V0).asInstanceOf[Long] + + OffsetAndMetadata(offset, metadata, timestamp) + } else if (version == 1) { + val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V1).asInstanceOf[Long] + val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V1).asInstanceOf[String] + val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1).asInstanceOf[Long] + val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long] + + OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp) + } else { + throw new IllegalStateException("Unknown offset message version") + } + } + } + + /** + * Decodes the group metadata messages' payload and retrieves its member metadatafrom it + * + * @param buffer input byte-buffer + * @return a group metadata object from the message + */ + private def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = { + if(buffer == null) { // tombstone + null + } else { + val version = buffer.getShort + val valueSchema = schemaForGroup(version) + val value = valueSchema.read(buffer).asInstanceOf[Struct] + + if (version == 0) { + val protocolType = value.get(GROUP_METADATA_PROTOCOL_TYPE_V0).asInstanceOf[String] + + val group = new GroupMetadata(groupId, protocolType) + + group.generationId = value.get(GROUP_METADATA_GENERATION_V0).asInstanceOf[Int] + group.leaderId = value.get(GROUP_METADATA_LEADER_V0).asInstanceOf[String] + group.protocol = value.get(GROUP_METADATA_PROTOCOL_V0).asInstanceOf[String] + + value.getArray(GROUP_METADATA_MEMBERS_V0).foreach { + case memberMetadataObj => + val memberMetadata = memberMetadataObj.asInstanceOf[Struct] + val memberId = memberMetadata.get(MEMBER_METADATA_MEMBER_ID_V0).asInstanceOf[String] + val sessionTimeout = memberMetadata.get(MEMBER_METADATA_SESSION_TIMEOUT_V0).asInstanceOf[Int] + val subscription = Utils.toArray(memberMetadata.get(MEMBER_METADATA_SUBSCRIPTION_V0).asInstanceOf[ByteBuffer]) + + val member = new MemberMetadata(memberId, groupId, sessionTimeout, List((group.protocol, subscription))) + + member.assignment = Utils.toArray(memberMetadata.get(MEMBER_METADATA_ASSIGNMENT_V0).asInstanceOf[ByteBuffer]) + + group.add(memberId, member) + } + + group + } else { + throw new IllegalStateException("Unknown group metadata message version") + } + } + } + + // Formatter for use with tools such as console consumer: Consumer should also set exclude.internal.topics to false. + // (specify --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter" when consuming __consumer_offsets) + class OffsetsMessageFormatter extends MessageFormatter { + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { + val formattedKey = if (key == null) "NULL" else GroupMetadataManager.readMessageKey(ByteBuffer.wrap(key)).toString + val formattedValue = if (value == null) "NULL" else GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value)).toString + output.write(formattedKey.getBytes) + output.write("::".getBytes) + output.write(formattedValue.getBytes) + output.write("\n".getBytes) + } + } +} + +case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) { + + def this(group: String, topic: String, partition: Int) = + this(group, new TopicAndPartition(topic, partition)) + + override def toString = + "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition) +} + +trait BaseKey{ + def version: Short + def key: Object +} + +case class OffsetKey(version: Short, key: GroupTopicPartition) extends BaseKey + +case class GroupKey(version: Short, key: String) extends BaseKey +