From commits-return-8764-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jan 24 05:20:59 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 78DF7180621 for ; Wed, 24 Jan 2018 05:20:59 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 6867C160C4D; Wed, 24 Jan 2018 04:20:59 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E18CF160C3A for ; Wed, 24 Jan 2018 05:20:56 +0100 (CET) Received: (qmail 65534 invoked by uid 500); 24 Jan 2018 04:20:55 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 65525 invoked by uid 99); 24 Jan 2018 04:20:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jan 2018 04:20:55 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5361D80660; Wed, 24 Jan 2018 04:20:54 +0000 (UTC) Date: Wed, 24 Jan 2018 04:20:54 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 1.0 updated: KAFKA-6287; Consumer group command should list simple consumer groups (#4407) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151676765385.7257.7470412187348829563@gitbox.apache.org> From: jgus@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/1.0 X-Git-Reftype: branch X-Git-Oldrev: 06148566dd9fde16b43cae3e896c8f47ba04baff X-Git-Newrev: 4f5cb2d9088863ea9782313fd995e48d55c57b01 X-Git-Rev: 4f5cb2d9088863ea9782313fd995e48d55c57b01 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 1.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.0 by this push: new 4f5cb2d KAFKA-6287; Consumer group command should list simple consumer groups (#4407) 4f5cb2d is described below commit 4f5cb2d9088863ea9782313fd995e48d55c57b01 Author: Jason Gustafson AuthorDate: Tue Jan 23 18:58:49 2018 -0800 KAFKA-6287; Consumer group command should list simple consumer groups (#4407) With this patch, simple consumer groups which only use Kafka for offset storage will be viewable using the `--list` option in consumer-groups.sh. In addition, this patch fixes a bug in the offset loading logic which caused us to lose the protocol type of empty groups on coordinator failover. I also did some cleanup of the various consumer group command test cases. For testing, I have added new integration tests which cover listing and describing simple consumer groups. I also added unit tests to cover loading empty groups with assertions on the protocol type. Reviewers: Rajini Sivaram --- core/src/main/scala/kafka/admin/AdminClient.scala | 18 +- .../scala/kafka/admin/ConsumerGroupCommand.scala | 30 ++-- .../kafka/coordinator/group/GroupCoordinator.scala | 33 ++-- .../kafka/coordinator/group/GroupMetadata.scala | 66 ++++--- .../coordinator/group/GroupMetadataManager.scala | 44 +++-- .../main/scala/kafka/tools/DumpLogSegments.scala | 2 +- .../SaslClientsWithInvalidCredentialsTest.scala | 2 +- .../kafka/admin/ConsumerGroupCommandTest.scala | 197 +++++++++++++++++++++ .../kafka/admin/DescribeConsumerGroupTest.scala | 173 +++++------------- .../unit/kafka/admin/ListConsumerGroupTest.scala | 81 ++++----- .../kafka/admin/ResetConsumerGroupOffsetTest.scala | 193 ++++++++------------ .../coordinator/group/GroupCoordinatorTest.scala | 32 +++- .../group/GroupMetadataManagerTest.scala | 183 ++++++++++++++----- .../coordinator/group/GroupMetadataTest.scala | 10 +- 14 files changed, 630 insertions(+), 434 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 24149d7..d794eb2 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -167,7 +167,7 @@ class AdminClient(val time: Time, } def listAllGroups(): Map[Node, List[GroupOverview]] = { - findAllBrokers.map { broker => + findAllBrokers().map { broker => broker -> { try { listGroups(broker) @@ -182,16 +182,22 @@ class AdminClient(val time: Time, def listAllConsumerGroups(): Map[Node, List[GroupOverview]] = { listAllGroups().mapValues { groups => - groups.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) + groups.filter(isConsumerGroup) } } def listAllGroupsFlattened(): List[GroupOverview] = { - listAllGroups.values.flatten.toList + listAllGroups().values.flatten.toList } def listAllConsumerGroupsFlattened(): List[GroupOverview] = { - listAllGroupsFlattened.filter(_.protocolType == ConsumerProtocol.PROTOCOL_TYPE) + listAllGroupsFlattened().filter(isConsumerGroup) + } + + private def isConsumerGroup(group: GroupOverview): Boolean = { + // Consumer groups which are using group management use the "consumer" protocol type. + // Consumer groups which are only using offset storage will have an empty protocol type. + group.protocolType.isEmpty || group.protocolType == ConsumerProtocol.PROTOCOL_TYPE } def listGroupOffsets(groupId: String): Map[TopicPartition, Long] = { @@ -200,12 +206,12 @@ class AdminClient(val time: Time, val response = responseBody.asInstanceOf[OffsetFetchResponse] if (response.hasError) throw response.error.exception - response.maybeThrowFirstPartitionError + response.maybeThrowFirstPartitionError() response.responseData.asScala.map { case (tp, partitionData) => (tp, partitionData.offset) }.toMap } def listAllBrokerVersionInfo(): Map[Node, Try[NodeApiVersions]] = - findAllBrokers.map { broker => + findAllBrokers().map { broker => broker -> Try[NodeApiVersions](new NodeApiVersions(getApiVersions(broker).asJava)) }.toMap diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 2120657..f1c397e 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -146,6 +146,19 @@ object ConsumerGroupCommand extends Logging { } } + def convertTimestamp(timeString: String): java.lang.Long = { + val datetime: String = timeString match { + case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString + case ts => s"${ts}Z" + } + val date = try { + new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime) + } catch { + case _: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime) + } + date.getTime + } + def printOffsetsToReset(groupAssignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): Unit = { print("\n%-30s %-10s %-15s".format("TOPIC", "PARTITION", "NEW-OFFSET")) println() @@ -606,8 +619,8 @@ object ConsumerGroupCommand extends Logging { (topicPartition, new OffsetAndMetadata(newOffset)) }.toMap } else if (opts.options.has(opts.resetToDatetimeOpt)) { + val timestamp = convertTimestamp(opts.options.valueOf(opts.resetToDatetimeOpt)) partitionsToReset.map { topicPartition => - val timestamp = getDateTime val logTimestampOffset = getLogTimestampOffset(topicPartition, timestamp) logTimestampOffset match { case LogOffsetResult.LogOffset(offset) => (topicPartition, new OffsetAndMetadata(offset)) @@ -668,21 +681,6 @@ object ConsumerGroupCommand extends Logging { } } - private[admin] def getDateTime: java.lang.Long = { - val datetime: String = opts.options.valueOf(opts.resetToDatetimeOpt) match { - case ts if ts.split("T")(1).contains("+") || ts.split("T")(1).contains("-") || ts.split("T")(1).contains("Z") => ts.toString - case ts => s"${ts}Z" - } - val date = { - try { - new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX").parse(datetime) - } catch { - case e: ParseException => new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX").parse(datetime) - } - } - date.getTime - } - override def exportOffsetsToReset(assignmentsToReset: Map[TopicPartition, OffsetAndMetadata]): String = { val rows = assignmentsToReset.map { case (k,v) => s"${k.topic()},${k.partition()},${v.offset()}" }(collection.breakOut): List[String] rows.foldRight("")(_ + "\n" + _) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 94622f6..bea4e83 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -34,7 +34,6 @@ import org.apache.kafka.common.utils.Time import scala.collection.{Map, Seq, immutable} import scala.math.max - /** * GroupCoordinator handles general group membership and offset management. * @@ -129,7 +128,7 @@ class GroupCoordinator(val brokerId: Int, if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) { responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID)) } else { - val group = groupManager.addGroup(new GroupMetadata(groupId)) + val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback) } @@ -185,15 +184,15 @@ class GroupCoordinator(val brokerId: Int, // receive the initial JoinGroup response), so just return current group information // for the current generation. responseCallback(JoinGroupResult( - members = if (memberId == group.leaderId) { + members = if (group.isLeader(memberId)) { group.currentMemberMetadata } else { Map.empty }, memberId = memberId, generationId = group.generationId, - subProtocol = group.protocol, - leaderId = group.leaderId, + subProtocol = group.protocolOrNull, + leaderId = group.leaderOrNull, error = Errors.NONE)) } else { // member has changed metadata, so force a rebalance @@ -207,7 +206,7 @@ class GroupCoordinator(val brokerId: Int, addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback) } else { val member = group.get(memberId) - if (memberId == group.leaderId || !member.matches(protocols)) { + if (group.isLeader(memberId) || !member.matches(protocols)) { // force a rebalance if a member has changed metadata or if the leader sends JoinGroup. // The latter allows the leader to trigger rebalances for changes affecting assignment // which do not affect the member metadata (such as topic metadata changes for the consumer) @@ -219,8 +218,8 @@ class GroupCoordinator(val brokerId: Int, members = Map.empty, memberId = memberId, generationId = group.generationId, - subProtocol = group.protocol, - leaderId = group.leaderId, + subProtocol = group.protocolOrNull, + leaderId = group.leaderOrNull, error = Errors.NONE)) } } @@ -271,7 +270,7 @@ class GroupCoordinator(val brokerId: Int, group.get(memberId).awaitingSyncCallback = responseCallback // if this is the leader, then we can attempt to persist state and transition to stable - if (memberId == group.leaderId) { + if (group.isLeader(memberId)) { info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}") // fill any missing members with an empty assignment @@ -408,7 +407,9 @@ class GroupCoordinator(val brokerId: Int, validateGroup(groupId) match { case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error)) case None => - val group = groupManager.getGroup(groupId).getOrElse(groupManager.addGroup(new GroupMetadata(groupId))) + val group = groupManager.getGroup(groupId).getOrElse { + groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) + } doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback) } } @@ -425,7 +426,7 @@ class GroupCoordinator(val brokerId: Int, case None => if (generationId < 0) { // the group is not relying on Kafka for group management, so allow the commit - val group = groupManager.addGroup(new GroupMetadata(groupId)) + val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty)) doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH, offsetMetadata, responseCallback) } else { @@ -459,7 +460,7 @@ class GroupCoordinator(val brokerId: Int, if (group.is(Dead)) { responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID)) } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) { - // the group is only using Kafka to store offsets + // The group is only using Kafka to store offsets. // Also, for transactional offset commits we don't need to validate group membership and the generation. groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch) } else if (group.is(AwaitingSync)) { @@ -481,7 +482,7 @@ class GroupCoordinator(val brokerId: Int, if (!isActive.get) (Errors.COORDINATOR_NOT_AVAILABLE, Map()) else if (!isCoordinatorForGroup(groupId)) { - debug("Could not fetch offsets for group %s (not group coordinator).".format(groupId)) + debug(s"Could not fetch offsets for group $groupId (not group coordinator)") (Errors.NOT_COORDINATOR, Map()) } else if (isCoordinatorLoadInProgress(groupId)) (Errors.COORDINATOR_LOAD_IN_PROGRESS, Map()) @@ -753,15 +754,15 @@ class GroupCoordinator(val brokerId: Int, for (member <- group.allMemberMetadata) { assert(member.awaitingJoinCallback != null) val joinResult = JoinGroupResult( - members = if (member.memberId == group.leaderId) { + members = if (group.isLeader(member.memberId)) { group.currentMemberMetadata } else { Map.empty }, memberId = member.memberId, generationId = group.generationId, - subProtocol = group.protocol, - leaderId = group.leaderId, + subProtocol = group.protocolOrNull, + leaderId = group.leaderOrNull, error = Errors.NONE) member.awaitingJoinCallback(joinResult) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala index 537d944..3257f51 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala @@ -111,6 +111,22 @@ private object GroupMetadata { Stable -> Set(AwaitingSync), PreparingRebalance -> Set(Stable, AwaitingSync, Empty), Empty -> Set(PreparingRebalance)) + + def loadGroup(groupId: String, + initialState: GroupState, + generationId: Int, + protocolType: String, + protocol: String, + leaderId: String, + members: Iterable[MemberMetadata]): GroupMetadata = { + val group = new GroupMetadata(groupId, initialState) + group.generationId = generationId + group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType) + group.protocol = Option(protocol) + group.leaderId = Option(leaderId) + members.foreach(group.add) + group + } } /** @@ -151,28 +167,22 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs * 3. leader id */ @nonthreadsafe -private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) extends Logging { +private[group] class GroupMetadata(val groupId: String, initialState: GroupState) extends Logging { + private[group] val lock = new ReentrantLock private var state: GroupState = initialState - - private[group] val lock = new ReentrantLock + var protocolType: Option[String] = None + var generationId = 0 + private var leaderId: Option[String] = None + private var protocol: Option[String] = None private val members = new mutable.HashMap[String, MemberMetadata] - private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset] - private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata] - private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]() - private var receivedTransactionalOffsetCommits = false - private var receivedConsumerOffsetCommits = false - var protocolType: Option[String] = None - var generationId = 0 - var leaderId: String = null - var protocol: String = null var newMemberAdded: Boolean = false def inLock[T](fun: => T): T = CoreUtils.inLock(lock)(fun) @@ -182,6 +192,10 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def has(memberId: String) = members.contains(memberId) def get(memberId: String) = members(memberId) + def isLeader(memberId: String): Boolean = leaderId.contains(memberId) + def leaderOrNull: String = leaderId.orNull + def protocolOrNull: String = protocol.orNull + def add(member: MemberMetadata) { if (members.isEmpty) this.protocolType = Some(member.protocolType) @@ -190,18 +204,18 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState assert(this.protocolType.orNull == member.protocolType) assert(supportsProtocols(member.protocols)) - if (leaderId == null) - leaderId = member.memberId + if (leaderId.isEmpty) + leaderId = Some(member.memberId) members.put(member.memberId, member) } def remove(memberId: String) { members.remove(memberId) - if (memberId == leaderId) { + if (isLeader(memberId)) { leaderId = if (members.isEmpty) { - null + None } else { - members.keys.head + Some(members.keys.head) } } } @@ -260,11 +274,11 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState assert(notYetRejoinedMembers == List.empty[MemberMetadata]) if (members.nonEmpty) { generationId += 1 - protocol = selectProtocol + protocol = Some(selectProtocol) transitionTo(AwaitingSync) } else { generationId += 1 - protocol = null + protocol = None transitionTo(Empty) } receivedConsumerOffsetCommits = false @@ -274,16 +288,20 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def currentMemberMetadata: Map[String, Array[Byte]] = { if (is(Dead) || is(PreparingRebalance)) throw new IllegalStateException("Cannot obtain member metadata for group in state %s".format(state)) - members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol))}.toMap + members.map{ case (memberId, memberMetadata) => (memberId, memberMetadata.metadata(protocol.get))}.toMap } def summary: GroupSummary = { if (is(Stable)) { - val members = this.members.values.map { member => member.summary(protocol) }.toList - GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members) + val protocol = protocolOrNull + if (protocol == null) + throw new IllegalStateException("Invalid null group protocol for stable group") + + val members = this.members.values.map { member => member.summary(protocol) } + GroupSummary(state.toString, protocolType.getOrElse(""), protocol, members.toList) } else { - val members = this.members.values.map{ member => member.summaryNoMetadata() }.toList - GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members) + val members = this.members.values.map{ member => member.summaryNoMetadata() } + GroupSummary(state.toString, protocolType.getOrElse(""), GroupCoordinator.NoProtocol, members.toList) } } diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 0298509..dbae86f 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -26,7 +26,7 @@ import java.util.concurrent.locks.ReentrantLock import com.yammer.metrics.core.Gauge import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0} -import kafka.common.{MessageFormatter, _} +import kafka.common.{KafkaException, MessageFormatter, OffsetAndMetadata} import kafka.metrics.KafkaMetricsGroup import kafka.server.ReplicaManager import kafka.utils.CoreUtils.inLock @@ -182,8 +182,7 @@ class GroupMetadataManager(brokerId: Int, throw new IllegalStateException("Append status %s should only have one partition %s" .format(responseStatus, groupMetadataPartition)) - // construct the error status in the propagated assignment response - // in the cache + // construct the error status in the propagated assignment response in the cache val status = responseStatus(groupMetadataPartition) val responseError = if (status.error == Errors.NONE) { @@ -263,7 +262,7 @@ class GroupMetadataManager(brokerId: Int, group.inLock { if (!group.hasReceivedConsistentOffsetCommits) - warn(s"group: ${group.groupId} with leader: ${group.leaderId} has received offset commits from consumers as well " + + warn(s"group: ${group.groupId} with leader: ${group.leaderOrNull} has received offset commits from consumers as well " + s"as transactional producers. Mixing both types of offset commits will generally result in surprises and " + s"should be avoided.") } @@ -585,8 +584,8 @@ class GroupMetadataManager(brokerId: Int, // load groups which store offsets in kafka, but which have no active members and thus no group // metadata stored in the log - (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) => - val group = new GroupMetadata(groupId) + (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId => + val group = new GroupMetadata(groupId, initialState = Empty) val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") @@ -1036,8 +1035,8 @@ object GroupMetadataManager { value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse("")) value.set(GENERATION_KEY, groupMetadata.generationId) - value.set(PROTOCOL_KEY, groupMetadata.protocol) - value.set(LEADER_KEY, groupMetadata.leaderId) + value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull) + value.set(LEADER_KEY, groupMetadata.leaderOrNull) val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata => val memberStruct = value.instance(MEMBERS_KEY) @@ -1049,7 +1048,12 @@ object GroupMetadataManager { if (version > 0) memberStruct.set(REBALANCE_TIMEOUT_KEY, memberMetadata.rebalanceTimeoutMs) - val metadata = memberMetadata.metadata(groupMetadata.protocol) + // The group is non-empty, so the current protocol must be defined + val protocol = groupMetadata.protocolOrNull + if (protocol == null) + throw new IllegalStateException("Attempted to write non-empty group metadata with no defined protocol") + + val metadata = memberMetadata.metadata(protocol) memberStruct.set(SUBSCRIPTION_KEY, ByteBuffer.wrap(metadata)) val memberAssignment = assignment(memberMetadata.memberId) @@ -1145,36 +1149,28 @@ object GroupMetadataManager { val value = valueSchema.read(buffer) if (version == 0 || version == 1) { + val generationId = value.get(GENERATION_KEY).asInstanceOf[Int] val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String] - + val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String] + val leaderId = value.get(LEADER_KEY).asInstanceOf[String] val memberMetadataArray = value.getArray(MEMBERS_KEY) val initialState = if (memberMetadataArray.isEmpty) Empty else Stable - val group = new GroupMetadata(groupId, initialState) - - group.generationId = value.get(GENERATION_KEY).asInstanceOf[Int] - group.leaderId = value.get(LEADER_KEY).asInstanceOf[String] - group.protocol = value.get(PROTOCOL_KEY).asInstanceOf[String] - - memberMetadataArray.foreach { memberMetadataObj => + val members = memberMetadataArray.map { memberMetadataObj => val memberMetadata = memberMetadataObj.asInstanceOf[Struct] val memberId = memberMetadata.get(MEMBER_ID_KEY).asInstanceOf[String] val clientId = memberMetadata.get(CLIENT_ID_KEY).asInstanceOf[String] val clientHost = memberMetadata.get(CLIENT_HOST_KEY).asInstanceOf[String] val sessionTimeout = memberMetadata.get(SESSION_TIMEOUT_KEY).asInstanceOf[Int] val rebalanceTimeout = if (version == 0) sessionTimeout else memberMetadata.get(REBALANCE_TIMEOUT_KEY).asInstanceOf[Int] - val subscription = Utils.toArray(memberMetadata.get(SUBSCRIPTION_KEY).asInstanceOf[ByteBuffer]) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, - protocolType, List((group.protocol, subscription))) - + protocolType, List((protocol, subscription))) member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer]) - - group.add(member) + member } - - group + GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, members) } else { throw new IllegalStateException("Unknown group metadata message version") } diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala index fe82dc2..855ca75 100755 --- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala +++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala @@ -337,7 +337,7 @@ object DumpLogSegments { val keyString = Json.encode(Map("metadata" -> groupId)) val valueString = Json.encode(Map( "protocolType" -> protocolType, - "protocol" -> group.protocol, + "protocol" -> group.protocolOrNull, "generationId" -> group.generationId, "assignment" -> assignment)) diff --git a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala index b309b80..65e72bd 100644 --- a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala @@ -151,7 +151,7 @@ class SaslClientsWithInvalidCredentialsTest extends IntegrationTestHarness with createClientCredential() verifyWithRetry(describeTopic()) } finally { - adminClient.close + adminClient.close() } } diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala new file mode 100644 index 0000000..b42c3e2 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import java.util.concurrent.{ExecutorService, Executors, TimeUnit} +import java.util.{Collections, Properties} + +import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService} +import kafka.consumer.{OldConsumer, Whitelist} +import kafka.integration.KafkaServerTestHarness +import kafka.server.KafkaConfig +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{KafkaConsumer, RangeAssignor} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException +import org.apache.kafka.common.serialization.StringDeserializer +import org.junit.{After, Before} + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConverters._ + +class ConsumerGroupCommandTest extends KafkaServerTestHarness { + import ConsumerGroupCommandTest._ + + val topic = "foo" + val group = "test.group" + + @deprecated("This field will be removed in a future release", "0.11.0.0") + private val oldConsumers = new ArrayBuffer[OldConsumer] + private var consumerGroupService: List[ConsumerGroupService] = List() + private var consumerGroupExecutors: List[AbstractConsumerGroupExecutor] = List() + + // configure the servers and clients + override def generateConfigs = { + TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props => + KafkaConfig.fromProps(props) + } + } + + @Before + override def setUp() { + super.setUp() + AdminUtils.createTopic(zkUtils, topic, 1, 1) + } + + @After + override def tearDown(): Unit = { + consumerGroupService.foreach(_.close()) + consumerGroupExecutors.foreach(_.shutdown()) + oldConsumers.foreach(_.stop()) + super.tearDown() + } + + @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0") + def createOldConsumer(): Unit = { + val consumerProps = new Properties + consumerProps.setProperty("group.id", group) + consumerProps.setProperty("zookeeper.connect", zkConnect) + oldConsumers += new OldConsumer(Whitelist(topic), consumerProps) + } + + def stopRandomOldConsumer(): Unit = { + oldConsumers.head.stop() + } + + def getConsumerGroupService(args: Array[String]): ConsumerGroupService = { + val opts = new ConsumerGroupCommandOptions(args) + val service = if (opts.useOldConsumer) new ZkConsumerGroupService(opts) else new KafkaConsumerGroupService(opts) + consumerGroupService = service :: consumerGroupService + service + } + + def addConsumerGroupExecutor(numConsumers: Int, + topic: String = topic, + group: String = group, + strategy: String = classOf[RangeAssignor].getName): ConsumerGroupExecutor = { + val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, topic, strategy) + addExecutor(executor) + executor + } + + def addSimpleGroupExecutor(partitions: Iterable[TopicPartition] = Seq(new TopicPartition(topic, 0)), + group: String = group): SimpleConsumerGroupExecutor = { + val executor = new SimpleConsumerGroupExecutor(brokerList, group, partitions) + addExecutor(executor) + executor + } + + private def addExecutor(executor: AbstractConsumerGroupExecutor): AbstractConsumerGroupExecutor = { + consumerGroupExecutors = executor :: consumerGroupExecutors + executor + } + +} + +object ConsumerGroupCommandTest { + + abstract class AbstractConsumerRunnable(broker: String, groupId: String) extends Runnable { + val props = new Properties + configure(props) + val consumer = new KafkaConsumer(props) + + def configure(props: Properties): Unit = { + props.put("bootstrap.servers", broker) + props.put("group.id", groupId) + props.put("key.deserializer", classOf[StringDeserializer].getName) + props.put("value.deserializer", classOf[StringDeserializer].getName) + } + + def subscribe(): Unit + + def run() { + try { + subscribe() + while (true) + consumer.poll(Long.MaxValue) + } catch { + case _: WakeupException => // OK + } finally { + consumer.close() + } + } + + def shutdown() { + consumer.wakeup() + } + } + + class ConsumerRunnable(broker: String, groupId: String, topic: String, strategy: String) + extends AbstractConsumerRunnable(broker, groupId) { + + override def configure(props: Properties): Unit = { + super.configure(props) + props.put("partition.assignment.strategy", strategy) + } + + override def subscribe(): Unit = { + consumer.subscribe(Collections.singleton(topic)) + } + } + + class SimpleConsumerRunnable(broker: String, groupId: String, partitions: Iterable[TopicPartition]) + extends AbstractConsumerRunnable(broker, groupId) { + + override def subscribe(): Unit = { + consumer.assign(partitions.toList.asJava) + } + } + + class AbstractConsumerGroupExecutor(numThreads: Int) { + private val executor: ExecutorService = Executors.newFixedThreadPool(numThreads) + private val consumers = new ArrayBuffer[AbstractConsumerRunnable]() + + def submit(consumerThread: AbstractConsumerRunnable) { + consumers += consumerThread + executor.submit(consumerThread) + } + + def shutdown() { + consumers.foreach(_.shutdown()) + executor.shutdown() + executor.awaitTermination(5000, TimeUnit.MILLISECONDS) + } + } + + class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String, strategy: String) + extends AbstractConsumerGroupExecutor(numConsumers) { + + for (_ <- 1 to numConsumers) { + submit(new ConsumerRunnable(broker, groupId, topic, strategy)) + } + + } + + class SimpleConsumerGroupExecutor(broker: String, groupId: String, partitions: Iterable[TopicPartition]) + extends AbstractConsumerGroupExecutor(1) { + + submit(new SimpleConsumerRunnable(broker, groupId, partitions)) + } + +} + diff --git a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala index 7000308..c782c3f 100644 --- a/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala @@ -16,76 +16,49 @@ */ package kafka.admin -import java.util.concurrent.ExecutorService -import java.util.concurrent.Executors -import java.util.concurrent.TimeUnit -import java.util.Collections -import java.util.Properties - -import org.junit.Assert._ -import org.junit.{After, Before, Test} -import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService, ZkConsumerGroupService} -import kafka.consumer.OldConsumer -import kafka.consumer.Whitelist -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.TimeoutException -import org.apache.kafka.common.errors.WakeupException -import org.apache.kafka.common.serialization.StringDeserializer - -import scala.collection.mutable.ArrayBuffer - -class DescribeConsumerGroupTest extends KafkaServerTestHarness { - private val topic = "foo" - private val group = "test.group" - - @deprecated("This field will be removed in a future release", "0.11.0.0") - private val oldConsumers = new ArrayBuffer[OldConsumer] - private var consumerGroupService: ConsumerGroupService = _ - private var consumerGroupExecutor: ConsumerGroupExecutor = _ - - // configure the servers and clients - override def generateConfigs = { - TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map { props => - KafkaConfig.fromProps(props) - } - } - - @Before - override def setUp() { - super.setUp() - AdminUtils.createTopic(zkUtils, topic, 1, 1) - } +import org.junit.Assert._ +import org.junit.Test - @After - override def tearDown(): Unit = { - if (consumerGroupService != null) - consumerGroupService.close() - if (consumerGroupExecutor != null) - consumerGroupExecutor.shutdown() - oldConsumers.foreach(_.stop()) - super.tearDown() - } +class DescribeConsumerGroupTest extends ConsumerGroupCommandTest { @Test @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0") def testDescribeNonExistingGroup() { TestUtils.createOffsetsTopic(zkUtils, servers) createOldConsumer() - val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group")) - consumerGroupService = new ZkConsumerGroupService(opts) + val cgcArgs = Array("--zookeeper", zkConnect, "--describe", "--group", "missing.group") + val consumerGroupService = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => consumerGroupService.describeGroup()._2.isEmpty, "Expected no rows in describe group results.") } @Test + def testDescribeSimpleConsumerGroup() { + // Ensure that the offsets of consumers which don't use group management are still displayed + + TestUtils.createOffsetsTopic(zkUtils, servers) + val topic2 = "foo2" + AdminUtils.createTopic(zkUtils, topic2, 2, 1) + addSimpleGroupExecutor(Seq(new TopicPartition(topic2, 0), new TopicPartition(topic2, 1))) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) + val service = getConsumerGroupService(cgcArgs) + + TestUtils.waitUntilTrue(() => { + val (state, assignments) = service.describeGroup() + println(assignments.get.map(x => (x.topic, x.partition))) + state.contains("Empty") && assignments.isDefined && assignments.get.count(_.group == group) == 2 + }, "Expected two partition assignment results in describe group state result.") + } + + @Test @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.0.0") def testDescribeExistingGroup() { TestUtils.createOffsetsTopic(zkUtils, servers) createOldConsumer() - val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) - consumerGroupService = new ZkConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group)) TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupService.describeGroup() assignments.isDefined && @@ -99,8 +72,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { def testDescribeExistingGroupWithNoMembers() { TestUtils.createOffsetsTopic(zkUtils, servers) createOldConsumer() - val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) - consumerGroupService = new ZkConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group)) TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupService.describeGroup() @@ -108,7 +80,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { assignments.get.count(_.group == group) == 1 && assignments.get.filter(_.group == group).head.consumerId.exists(_.trim != ConsumerGroupCommand.MISSING_COLUMN_VALUE) }, "Expected rows and a consumer id column in describe group results.") - oldConsumers.head.stop() + stopRandomOldConsumer() TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupService.describeGroup() @@ -124,8 +96,7 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { TestUtils.createOffsetsTopic(zkUtils, servers) createOldConsumer() createOldConsumer() - val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect, "--describe", "--group", group)) - consumerGroupService = new ZkConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(Array("--zookeeper", zkConnect, "--describe", "--group", group)) TestUtils.waitUntilTrue(() => { val (_, assignments) = consumerGroupService.describeGroup() assignments.isDefined && @@ -139,12 +110,11 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { def testDescribeNonExistingGroupWithNewConsumer() { TestUtils.createOffsetsTopic(zkUtils, servers) // run one consumer in the group consuming from a single-partition topic - consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic) + addConsumerGroupExecutor(numConsumers = 1) // note the group to be queried is a different (non-existing) group val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "missing.group") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - consumerGroupService = new KafkaConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(cgcArgs) val (state, assignments) = consumerGroupService.describeGroup() assertTrue("Expected the state to be 'Dead' with no members in the group.", state == Some("Dead") && assignments == Some(List())) @@ -154,11 +124,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { def testDescribeExistingGroupWithNewConsumer() { TestUtils.createOffsetsTopic(zkUtils, servers) // run one consumer in the group consuming from a single-partition topic - consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic) + addConsumerGroupExecutor(numConsumers = 1) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) - val opts = new ConsumerGroupCommandOptions(cgcArgs) - consumerGroupService = new KafkaConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { val (state, assignments) = consumerGroupService.describeGroup() @@ -175,11 +144,9 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { def testDescribeExistingGroupWithNoMembersWithNewConsumer() { TestUtils.createOffsetsTopic(zkUtils, servers) // run one consumer in the group consuming from a single-partition topic - consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic) - + val consumerGroupExecutor = addConsumerGroupExecutor(numConsumers = 1) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) - val opts = new ConsumerGroupCommandOptions(cgcArgs) - consumerGroupService = new KafkaConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { val (state, _) = consumerGroupService.describeGroup() @@ -214,11 +181,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { def testDescribeConsumersWithNoAssignedPartitionsWithNewConsumer() { TestUtils.createOffsetsTopic(zkUtils, servers) // run two consumers in the group consuming from a single-partition topic - consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic) + addConsumerGroupExecutor(numConsumers = 2) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) - val opts = new ConsumerGroupCommandOptions(cgcArgs) - consumerGroupService = new KafkaConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { val (state, assignments) = consumerGroupService.describeGroup() @@ -237,11 +203,10 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { AdminUtils.createTopic(zkUtils, topic2, 2, 1) // run two consumers in the group consuming from a two-partition topic - consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 2, group, topic2) + addConsumerGroupExecutor(numConsumers = 2, topic = topic2) val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", group) - val opts = new ConsumerGroupCommandOptions(cgcArgs) - consumerGroupService = new KafkaConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { val (state, assignments) = consumerGroupService.describeGroup() @@ -259,12 +224,11 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { // complete before the timeout expires // run one consumer in the group consuming from a single-partition topic - consumerGroupExecutor = new ConsumerGroupExecutor(brokerList, 1, group, topic) + addConsumerGroupExecutor(numConsumers = 1) // set the group initialization timeout too low for the group to stabilize val cgcArgs = Array("--bootstrap-server", brokerList, "--describe", "--group", "group", "--timeout", "1") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - consumerGroupService = new KafkaConsumerGroupService(opts) + val consumerGroupService = getConsumerGroupService(cgcArgs) try { consumerGroupService.describeGroup() @@ -274,59 +238,4 @@ class DescribeConsumerGroupTest extends KafkaServerTestHarness { } } - @deprecated("This test has been deprecated and will be removed in a future release.", "0.11.1.0") - private def createOldConsumer(): Unit = { - val consumerProps = new Properties - consumerProps.setProperty("group.id", group) - consumerProps.setProperty("zookeeper.connect", zkConnect) - oldConsumers += new OldConsumer(Whitelist(topic), consumerProps) - } -} - - -class ConsumerThread(broker: String, id: Int, groupId: String, topic: String) extends Runnable { - val props = new Properties - props.put("bootstrap.servers", broker) - props.put("group.id", groupId) - props.put("key.deserializer", classOf[StringDeserializer].getName) - props.put("value.deserializer", classOf[StringDeserializer].getName) - val consumer = new KafkaConsumer(props) - - def run() { - try { - consumer.subscribe(Collections.singleton(topic)) - while (true) - consumer.poll(Long.MaxValue) - } catch { - case _: WakeupException => // OK - } finally { - consumer.close() - } - } - - def shutdown() { - consumer.wakeup() - } -} - - -class ConsumerGroupExecutor(broker: String, numConsumers: Int, groupId: String, topic: String) { - val executor: ExecutorService = Executors.newFixedThreadPool(numConsumers) - private val consumers = new ArrayBuffer[ConsumerThread]() - for (i <- 1 to numConsumers) { - val consumer = new ConsumerThread(broker, i, groupId, topic) - consumers += consumer - executor.submit(consumer) - } - - def shutdown() { - consumers.foreach(_.shutdown) - executor.shutdown() - try { - executor.awaitTermination(5000, TimeUnit.MILLISECONDS) - } catch { - case e: InterruptedException => - e.printStackTrace() - } - } } diff --git a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala index 6727fad..13dccbe 100644 --- a/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ListConsumerGroupTest.scala @@ -18,53 +18,21 @@ package kafka.admin import java.util.Properties -import org.easymock.EasyMock -import org.junit.Before import org.junit.Test - import kafka.admin.ConsumerGroupCommand.ConsumerGroupCommandOptions import kafka.admin.ConsumerGroupCommand.ZkConsumerGroupService -import kafka.consumer.OldConsumer -import kafka.consumer.Whitelist -import kafka.integration.KafkaServerTestHarness -import kafka.server.KafkaConfig +import kafka.consumer.{OldConsumer, Whitelist} import kafka.utils.TestUtils +import org.easymock.EasyMock +class ListConsumerGroupTest extends ConsumerGroupCommandTest { -class ListConsumerGroupTest extends KafkaServerTestHarness { - - val overridingProps = new Properties() - val topic = "foo" - val topicFilter = Whitelist(topic) - val group = "test.group" - val props = new Properties - - // configure the servers and clients - override def generateConfigs = - TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) - - @Before - override def setUp() { - super.setUp() - - AdminUtils.createTopic(zkUtils, topic, 1, 1) + @Test + def testListOldConsumerGroups() { + val topicFilter = Whitelist(topic) + val props = new Properties props.setProperty("group.id", group) props.setProperty("zookeeper.connect", zkConnect) - } - - @Test - def testListGroupWithNoExistingGroup() { - val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect)) - val consumerGroupCommand = new ZkConsumerGroupService(opts) - try { - assert(consumerGroupCommand.listGroups().isEmpty) - } finally { - consumerGroupCommand.close() - } - } - - @Test - def testListGroupWithSomeGroups() { // mocks val consumer1Mock = EasyMock.createMockBuilder(classOf[OldConsumer]).withConstructor(topicFilter, props).createMock() props.setProperty("group.id", "some.other.group") @@ -80,13 +48,42 @@ class ListConsumerGroupTest extends KafkaServerTestHarness { // action/test TestUtils.waitUntilTrue(() => { - val groups = consumerGroupCommand.listGroups() - groups.size == 2 && groups.contains(group) - }, "Expected a different list group results.") + val groups = consumerGroupCommand.listGroups() + groups.size == 2 && groups.contains(group) + }, "Expected a different list group results.") // cleanup consumerGroupCommand.close() consumer1Mock.stop() consumer2Mock.stop() } + + @Test + def testListGroupWithNoExistingGroup() { + val opts = new ConsumerGroupCommandOptions(Array("--zookeeper", zkConnect)) + val consumerGroupCommand = new ZkConsumerGroupService(opts) + try { + assert(consumerGroupCommand.listGroups().isEmpty) + } finally { + consumerGroupCommand.close() + } + } + + @Test + def testListConsumerGroups() { + val simpleGroup = "simple-group" + addSimpleGroupExecutor(group = simpleGroup) + addConsumerGroupExecutor(numConsumers = 1) + + val cgcArgs = Array("--bootstrap-server", brokerList, "--list") + val service = getConsumerGroupService(cgcArgs) + + val expectedGroups = Set(group, simpleGroup) + var foundGroups = Set.empty[String] + TestUtils.waitUntilTrue(() => { + foundGroups = service.listGroups().toSet + expectedGroups == foundGroups + }, s"Expected --list to show groups $expectedGroups, but found $foundGroups.") + } + } diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 6853b16..50356ff 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -16,13 +16,46 @@ import java.io.{BufferedWriter, File, FileWriter} import java.text.{ParseException, SimpleDateFormat} import java.util.{Calendar, Date, Properties} -import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, KafkaConsumerGroupService} -import kafka.integration.KafkaServerTestHarness +import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions, ConsumerGroupService, KafkaConsumerGroupService} import kafka.server.KafkaConfig import kafka.utils.TestUtils -import org.junit.{After, Before, Test} +import org.junit.Assert._ +import org.junit.Test -import scala.collection.mutable.ArrayBuffer +class TimeConversionTests { + + @Test + def testDateTimeFormats() { + //check valid formats + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")) + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")) + + //check some invalid formats + try { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")) + fail("Call to getDateTime should fail") + } catch { + case _: ParseException => + } + + try { + invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")) + fail("Call to getDateTime should fail") + } catch { + case _: ParseException => + } + } + + private def invokeGetDateTimeMethod(format: SimpleDateFormat) { + val checkpoint = new Date() + val timestampString = format.format(checkpoint) + ConsumerGroupCommand.convertTimestamp(timestampString) + } + +} /** * Test cases by: @@ -34,46 +67,27 @@ import scala.collection.mutable.ArrayBuffer * - scope=topics+partitions, scenario=to-earliest * - export/import */ -class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { - +class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { + val overridingProps = new Properties() val topic1 = "foo1" val topic2 = "foo2" - val group = "test.group" - val props = new Properties - val consumerGroupServices = new ArrayBuffer[KafkaConsumerGroupService] - val executors = new ArrayBuffer[ConsumerGroupExecutor] /** * Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every * test and should not reuse previous configurations unless they select their ports randomly when servers are started. */ - override def generateConfigs: Seq[KafkaConfig] = TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false).map(KafkaConfig.fromProps(_, overridingProps)) - - @Before - override def setUp() { - super.setUp() - - props.setProperty("group.id", group) - } - - @After - override def tearDown() { - try { - executors.foreach(_.shutdown()) - consumerGroupServices.foreach(_.close()) - } finally { - super.tearDown() - } - } + override def generateConfigs: Seq[KafkaConfig] = { + TestUtils.createBrokerConfigs(1, zkConnect, enableControlledShutdown = false) + .map(KafkaConfig.fromProps(_, overridingProps)) + } @Test def testResetOffsetsNotExistingGroup() { - createConsumerGroupExecutor(brokerList, 1, group, topic1) + addConsumerGroupExecutor(numConsumers = 1, topic1) val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", "missing.group", "--all-topics", "--to-current") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) TestUtils.waitUntilTrue(() => { val assignmentsToReset = consumerGroupCommand.resetOffsets() @@ -113,10 +127,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { TestUtils.produceMessages(servers, topic1, 100, acks = 1, 100 * 1000) val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) - - val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) + val executor = addConsumerGroupExecutor(numConsumers = 1, topic1) TestUtils.waitUntilTrue(() => { val (_, assignmentsOption) = consumerGroupCommand.describeGroup() @@ -134,8 +146,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { executor.shutdown() val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(calendar.getTime), "--execute") - val opts1 = new ConsumerGroupCommandOptions(cgcArgs1) - val consumerGroupCommand1 = createConsumerGroupService(opts1) + val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1) TestUtils.waitUntilTrue(() => { val assignmentsToReset = consumerGroupCommand1.resetOffsets() @@ -158,10 +169,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { TestUtils.produceMessages(servers, topic1, 50, acks = 1, 100 * 1000) val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) - - val executor = createConsumerGroupExecutor(brokerList, 1, group, topic1) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) + val executor = addConsumerGroupExecutor(numConsumers = 1, topic1) TestUtils.waitUntilTrue(() => { val (_, assignmentsOption) = consumerGroupCommand.describeGroup() @@ -178,8 +187,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { executor.shutdown() val cgcArgs1 = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute") - val opts1 = new ConsumerGroupCommandOptions(cgcArgs1) - val consumerGroupCommand1 = createConsumerGroupService(opts1) + val consumerGroupCommand1 = getConsumerGroupService(cgcArgs1) TestUtils.waitUntilTrue(() => { val assignmentsToReset = consumerGroupCommand1.resetOffsets() @@ -192,43 +200,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { } @Test - def testDateTimeFormats() { - //check valid formats - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSX")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXX")) - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX")) - - //check some invalid formats - try { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")) - fail("Call to getDateTime should fail") - } catch { - case _: ParseException => - } - - try { - invokeGetDateTimeMethod(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.X")) - fail("Call to getDateTime should fail") - } catch { - case _: ParseException => - } - } - - private def invokeGetDateTimeMethod(format: SimpleDateFormat) { - val checkpoint = new Date() - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) - consumerGroupCommand.getDateTime - } - - @Test def testResetOffsetsByDuration() { - val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT1M") + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -247,8 +221,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsByDurationToEarliest() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--by-duration", "PT0.1S", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -266,8 +239,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToEarliest() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-earliest", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -285,8 +257,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToLatest() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-latest", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -307,8 +278,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToCurrentOffset() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-current", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -325,9 +295,9 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { AdminUtils.deleteTopic(zkUtils, topic1) } - private def produceConsumeAndShutdown(consumerGroupCommand: KafkaConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) { + private def produceConsumeAndShutdown(consumerGroupCommand: ConsumerGroupService, numConsumers: Int, topic: String, totalMessages: Int) { TestUtils.produceMessages(servers, topic, totalMessages, acks = 1, 100 * 1000) - val executor = createConsumerGroupExecutor(brokerList, numConsumers, group, topic) + val executor = addConsumerGroupExecutor(numConsumers, topic) TestUtils.waitUntilTrue(() => { val (_, assignmentsOption) = consumerGroupCommand.describeGroup() @@ -348,8 +318,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToSpecificOffset() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "1", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -368,8 +337,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftPlus() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "50", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -389,8 +357,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftMinus() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-50", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -411,8 +378,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftByLowerThanEarliest() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "-150", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -432,8 +398,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsShiftByHigherThanLatest() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--shift-by", "150", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -453,8 +418,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToEarliestOnOneTopic() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", topic1, "--to-earliest", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) @@ -472,8 +436,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsToEarliestOnOneTopicAndPartition() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--topic", String.format("%s:1", topic1), "--to-earliest", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 2, 1) @@ -495,8 +458,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { "--topic", topic1, "--topic", topic2, "--to-earliest", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 1, 1) AdminUtils.createTopic(zkUtils, topic2, 1, 1) @@ -522,8 +484,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { "--topic", String.format("%s:1", topic1), "--topic", String.format("%s:1", topic2), "--to-earliest", "--execute") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 2, 1) AdminUtils.createTopic(zkUtils, topic2, 2, 1) @@ -545,8 +506,7 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { @Test def testResetOffsetsExportImportPlan() { val cgcArgs = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--to-offset","2", "--export") - val opts = new ConsumerGroupCommandOptions(cgcArgs) - val consumerGroupCommand = createConsumerGroupService(opts) + val consumerGroupCommand = getConsumerGroupService(cgcArgs) AdminUtils.createTopic(zkUtils, topic1, 2, 1) @@ -562,10 +522,8 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { assignmentsToReset.exists { assignment => assignment._2.offset() == 2 } && file.exists() }, "Expected the consume all messages and save reset offsets plan to file") - val cgcArgsExec = Array("--bootstrap-server", brokerList, "--reset-offsets", "--group", group, "--all-topics", "--from-file", file.getCanonicalPath) - val optsExec = new ConsumerGroupCommandOptions(cgcArgsExec) - val consumerGroupCommandExec = createConsumerGroupService(optsExec) + val consumerGroupCommandExec = getConsumerGroupService(cgcArgsExec) TestUtils.waitUntilTrue(() => { val assignmentsToReset = consumerGroupCommandExec.resetOffsets() @@ -588,15 +546,4 @@ class ResetConsumerGroupOffsetTest extends KafkaServerTestHarness { ConsumerGroupCommand.main(cgcArgs) } - private def createConsumerGroupExecutor(brokerList: String, numConsumers: Int, groupId: String, topic: String): ConsumerGroupExecutor = { - val executor = new ConsumerGroupExecutor(brokerList, numConsumers, groupId, topic) - executors += executor - executor - } - - private def createConsumerGroupService(opts: ConsumerGroupCommandOptions): KafkaConsumerGroupService = { - val service = new KafkaConsumerGroupService(opts) - consumerGroupServices += service - service - } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 7075c63..aa44d14 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -777,6 +777,36 @@ class GroupCoordinatorTest extends JUnitSuite { } @Test + def testCommitOffsetsAfterGroupIsEmpty(): Unit = { + // Tests the scenario where the reset offset tool modifies the offsets + // of a group after it becomes empty + + // A group member joins + val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID + val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols) + val assignedMemberId = joinGroupResult.memberId + val joinGroupError = joinGroupResult.error + assertEquals(Errors.NONE, joinGroupError) + + // and leaves. + EasyMock.reset(replicaManager) + val leaveGroupResult = leaveGroup(groupId, assignedMemberId) + assertEquals(Errors.NONE, leaveGroupResult) + + // The simple offset commit should now fail + EasyMock.reset(replicaManager) + val tp = new TopicPartition("topic", 0) + val offset = OffsetAndMetadata(0) + val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID, + OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset)) + assertEquals(Errors.NONE, commitOffsetResult(tp)) + + val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) + assertEquals(Errors.NONE, error) + assertEquals(Some(0), partitionData.get(tp).map(_.offset)) + } + + @Test def testFetchOffsets() { val tp = new TopicPartition("topic", 0) val offset = OffsetAndMetadata(0) @@ -866,7 +896,7 @@ class GroupCoordinatorTest extends JUnitSuite { groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT) val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp))) - assertEquals(Errors.NONE, secondReqError) + assertEquals(Errors.NONE, thirdReqError) assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), thirdReqPartitionData.get(tp).map(_.offset)) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 78a5eaa..3bdffbd 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -30,7 +30,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.easymock.{Capture, EasyMock, IAnswer} -import org.junit.Assert.{assertEquals, assertFalse, assertTrue} +import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull} import org.junit.{Before, Test} import java.nio.ByteBuffer @@ -111,6 +111,41 @@ class GroupMetadataManagerTest { } @Test + def testLoadEmptyGroupWithOffsets() { + val groupMetadataTopicPartition = groupTopicPartition + val generation = 15 + val protocolType = "consumer" + val startOffset = 15L + val committedOffsets = Map( + new TopicPartition("foo", 0) -> 23L, + new TopicPartition("foo", 1) -> 455L, + new TopicPartition("bar", 0) -> 8992L + ) + + val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) + val groupMetadataRecord = buildEmptyGroupRecord(generation, protocolType) + val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, + offsetCommitRecords ++ Seq(groupMetadataRecord): _*) + + expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records) + + EasyMock.replay(replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ()) + + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Empty, group.currentState) + assertEquals(generation, group.generationId) + assertEquals(Some(protocolType), group.protocolType) + assertNull(group.leaderOrNull) + assertNull(group.protocolOrNull) + committedOffsets.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + @Test def testLoadTransactionalOffsetsWithoutGroup() { val groupMetadataTopicPartition = groupTopicPartition val producerId = 1000L @@ -524,6 +559,9 @@ class GroupMetadataManagerTest { @Test def testLoadOffsetsAndGroup() { val groupMetadataTopicPartition = groupTopicPartition + val generation = 935 + val protocolType = "consumer" + val protocol = "range" val startOffset = 15L val committedOffsets = Map( new TopicPartition("foo", 0) -> 23L, @@ -533,7 +571,7 @@ class GroupMetadataManagerTest { val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val memberId = "98098230493" - val groupMetadataRecord = buildStableGroupRecordWithMember(memberId) + val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords ++ Seq(groupMetadataRecord): _*) @@ -546,7 +584,10 @@ class GroupMetadataManagerTest { val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) assertEquals(groupId, group.groupId) assertEquals(Stable, group.currentState) - assertEquals(memberId, group.leaderId) + assertEquals(memberId, group.leaderOrNull) + assertEquals(generation, group.generationId) + assertEquals(Some(protocolType), group.protocolType) + assertEquals(protocol, group.protocolOrNull) assertEquals(Set(memberId), group.allMembers) assertEquals(committedOffsets.size, group.allOffsets.size) committedOffsets.foreach { case (topicPartition, offset) => @@ -558,9 +599,9 @@ class GroupMetadataManagerTest { def testLoadGroupWithTombstone() { val groupMetadataTopicPartition = groupTopicPartition val startOffset = 15L - val memberId = "98098230493" - val groupMetadataRecord = buildStableGroupRecordWithMember(memberId) + val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15, + protocolType = "consumer", protocol = "range", memberId) val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, Seq(groupMetadataRecord, groupMetadataTombstone): _*) @@ -581,6 +622,9 @@ class GroupMetadataManagerTest { // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets val groupMetadataTopicPartition = groupTopicPartition + val generation = 293 + val protocolType = "consumer" + val protocol = "range" val startOffset = 15L val committedOffsets = Map( @@ -590,7 +634,7 @@ class GroupMetadataManagerTest { ) val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets) val memberId = "98098230493" - val groupMetadataRecord = buildStableGroupRecordWithMember(memberId) + val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId) val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null) val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*) @@ -612,6 +656,9 @@ class GroupMetadataManagerTest { @Test def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = { + val generation = 293 + val protocolType = "consumer" + val protocol = "range" val startOffset = 15L val tp0 = new TopicPartition("foo", 0) val tp1 = new TopicPartition("foo", 1) @@ -624,13 +671,15 @@ class GroupMetadataManagerTest { val segment1MemberId = "a" val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L) val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, - createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(segment1MemberId)): _*) + createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember( + generation, protocolType, protocol, segment1MemberId)): _*) val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records) val segment2MemberId = "b" val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L) val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE, - createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(segment2MemberId)): _*) + createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember( + generation, protocolType, protocol, segment2MemberId)): _*) val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records) EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End)) @@ -643,7 +692,7 @@ class GroupMetadataManagerTest { assertEquals(groupId, group.groupId) assertEquals(Stable, group.currentState) - assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderId) + assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderOrNull) assertEquals("segment2 group record member should be only member", Set(segment2MemberId), group.allMembers) // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions @@ -654,20 +703,22 @@ class GroupMetadataManagerTest { } } - @Test def testAddGroup() { - val group = new GroupMetadata("foo") + val group = new GroupMetadata("foo", initialState = Empty) assertEquals(group, groupMetadataManager.addGroup(group)) - assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo"))) + assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", initialState = Empty))) } @Test def testStoreEmptyGroup() { - val group = new GroupMetadata(groupId) + val generation = 27 + val protocolType = "consumer" + + val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty) groupMetadataManager.addGroup(group) - expectAppendMessage(Errors.NONE) + val capturedRecords = expectAppendMessage(Errors.NONE) EasyMock.replay(replicaManager) var maybeError: Option[Errors] = None @@ -677,6 +728,45 @@ class GroupMetadataManagerTest { groupMetadataManager.storeGroup(group, Map.empty, callback) assertEquals(Some(Errors.NONE), maybeError) + assertTrue(capturedRecords.hasCaptured) + val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)) + .records.asScala.toList + assertEquals(1, records.size) + + val record = records.head + val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) + assertTrue(groupMetadata.is(Empty)) + assertEquals(generation, groupMetadata.generationId) + assertEquals(Some(protocolType), groupMetadata.protocolType) + } + + @Test + def testStoreEmptySimpleGroup() { + val group = new GroupMetadata(groupId, initialState = Empty) + groupMetadataManager.addGroup(group) + + val capturedRecords = expectAppendMessage(Errors.NONE) + EasyMock.replay(replicaManager) + + var maybeError: Option[Errors] = None + def callback(error: Errors) { + maybeError = Some(error) + } + + groupMetadataManager.storeGroup(group, Map.empty, callback) + assertEquals(Some(Errors.NONE), maybeError) + assertTrue(capturedRecords.hasCaptured) + + assertTrue(capturedRecords.hasCaptured) + val records = capturedRecords.getValue()(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)) + .records.asScala.toList + assertEquals(1, records.size) + + val record = records.head + val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value) + assertTrue(groupMetadata.is(Empty)) + assertEquals(0, groupMetadata.generationId) + assertEquals(None, groupMetadata.protocolType) } @Test @@ -695,7 +785,7 @@ class GroupMetadataManagerTest { private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) { EasyMock.reset(replicaManager) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) expectAppendMessage(appendError) @@ -718,7 +808,7 @@ class GroupMetadataManagerTest { val clientId = "clientId" val clientHost = "localhost" - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, @@ -749,7 +839,7 @@ class GroupMetadataManagerTest { val clientId = "clientId" val clientHost = "localhost" - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, protocolType, List(("protocol", Array[Byte]()))) @@ -779,7 +869,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) @@ -821,7 +911,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) @@ -861,7 +951,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) @@ -900,7 +990,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) @@ -938,7 +1028,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) @@ -980,7 +1070,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset)) @@ -1018,7 +1108,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) // expire the offset after 1 millisecond @@ -1071,7 +1161,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) group.generationId = 5 @@ -1119,7 +1209,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) group.generationId = 5 @@ -1173,7 +1263,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) // expire the offset after 1 millisecond @@ -1244,7 +1334,7 @@ class GroupMetadataManagerTest { groupMetadataManager.addPartitionOwnership(groupPartitionId) - val group = new GroupMetadata(groupId) + val group = new GroupMetadata(groupId, initialState = Empty) groupMetadataManager.addGroup(group) val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout, @@ -1314,40 +1404,47 @@ class GroupMetadataManagerTest { capturedArgument } - private def expectAppendMessage(error: Errors) { - val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + private def expectAppendMessage(error: Errors): Capture[Map[TopicPartition, MemoryRecords]] = { + val capturedCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture() + val capturedRecords: Capture[Map[TopicPartition, MemoryRecords]] = EasyMock.newCapture() EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(), EasyMock.anyShort(), internalTopicsAllowed = EasyMock.eq(true), isFromClient = EasyMock.eq(false), - EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]], - EasyMock.capture(capturedArgument), + EasyMock.capture(capturedRecords), + EasyMock.capture(capturedCallback), EasyMock.anyObject().asInstanceOf[Option[ReentrantLock]], EasyMock.anyObject()) ).andAnswer(new IAnswer[Unit] { - override def answer = capturedArgument.getValue.apply( + override def answer = capturedCallback.getValue.apply( Map(groupTopicPartition -> new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP, 0L) ) )}) EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE)) + capturedRecords } - private def buildStableGroupRecordWithMember(memberId: String): SimpleRecord = { - val group = new GroupMetadata(groupId) - group.transitionTo(PreparingRebalance) - val memberProtocols = List(("roundrobin", Array.emptyByteArray)) - val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, "consumer", memberProtocols) - group.add(member) - member.awaitingJoinCallback = _ => {} - group.initNextGeneration() - group.transitionTo(Stable) - + private def buildStableGroupRecordWithMember(generation: Int, + protocolType: String, + protocol: String, + memberId: String): SimpleRecord = { + val memberProtocols = List((protocol, Array.emptyByteArray)) + val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols) + val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, + leaderId = memberId, Seq(member)) val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId) val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte])) new SimpleRecord(groupMetadataKey, groupMetadataValue) } + private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = { + val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty) + val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId) + val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty) + new SimpleRecord(groupMetadataKey, groupMetadataValue) + } + private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition, startOffset: Long, records: MemoryRecords): Unit = { diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala index 2db6603..871a2d3 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala @@ -40,7 +40,7 @@ class GroupMetadataTest extends JUnitSuite { @Before def setUp() { - group = new GroupMetadata("groupId") + group = new GroupMetadata("groupId", initialState = Empty) } @Test @@ -271,25 +271,25 @@ class GroupMetadataTest extends JUnitSuite { group.add(member) assertEquals(0, group.generationId) - assertNull(group.protocol) + assertNull(group.protocolOrNull) group.initNextGeneration() assertEquals(1, group.generationId) - assertEquals("roundrobin", group.protocol) + assertEquals("roundrobin", group.protocolOrNull) } @Test def testInitNextGenerationEmptyGroup() { assertEquals(Empty, group.currentState) assertEquals(0, group.generationId) - assertNull(group.protocol) + assertNull(group.protocolOrNull) group.transitionTo(PreparingRebalance) group.initNextGeneration() assertEquals(1, group.generationId) - assertNull(group.protocol) + assertNull(group.protocolOrNull) } @Test -- To stop receiving notification emails like this one, please contact jgus@apache.org.