kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/5] kafka git commit: KAFKA-5259; TransactionalId auth implies ProducerId auth
Date Wed, 24 May 2017 22:28:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 88200938f -> 38f6cae9e


http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
new file mode 100644
index 0000000..358e12c
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -0,0 +1,1492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.coordinator.group
+
+import kafka.common.OffsetAndMetadata
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
+import kafka.utils._
+import kafka.utils.timer.MockTimer
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.{JoinGroupRequest, OffsetCommitRequest, OffsetFetchResponse, TransactionResult}
+import org.easymock.{Capture, EasyMock, IAnswer}
+import java.util.concurrent.TimeUnit
+
+import org.apache.kafka.common.internals.Topic
+import org.junit.Assert._
+import org.junit.{After, Assert, Before, Test}
+import org.scalatest.junit.JUnitSuite
+
+import scala.collection._
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, Future, Promise, TimeoutException}
+
+class GroupCoordinatorTest extends JUnitSuite {
+  type JoinGroupCallback = JoinGroupResult => Unit
+  type SyncGroupCallbackParams = (Array[Byte], Errors)
+  type SyncGroupCallback = (Array[Byte], Errors) => Unit
+  type HeartbeatCallbackParams = Errors
+  type HeartbeatCallback = Errors => Unit
+  type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
+  type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
+  type LeaveGroupCallbackParams = Errors
+  type LeaveGroupCallback = Errors => Unit
+
+  val ClientId = "consumer-test"
+  val ClientHost = "localhost"
+  val ConsumerMinSessionTimeout = 10
+  val ConsumerMaxSessionTimeout = 1000
+  val DefaultRebalanceTimeout = 500
+  val DefaultSessionTimeout = 500
+  val GroupInitialRebalanceDelay = 50
+  var timer: MockTimer = null
+  var groupCoordinator: GroupCoordinator = null
+  var replicaManager: ReplicaManager = null
+  var scheduler: KafkaScheduler = null
+  var zkUtils: ZkUtils = null
+
+  private val groupId = "groupId"
+  private val protocolType = "consumer"
+  private val memberId = "memberId"
+  private val metadata = Array[Byte]()
+  private val protocols = List(("range", metadata))
+  private var groupPartitionId: Int = -1
+
+  // we use this string value since its hashcode % #.partitions is different
+  private val otherGroupId = "otherGroup"
+
+  @Before
+  def setUp() {
+    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    props.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
+    props.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, GroupInitialRebalanceDelay.toString)
+    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
+    val ret = mutable.Map[String, Map[Int, Seq[Int]]]()
+    ret += (Topic.GROUP_METADATA_TOPIC_NAME -> Map(0 -> Seq(1), 1 -> Seq(1)))
+
+    replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+
+    zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
+    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
+    EasyMock.replay(zkUtils)
+
+    timer = new MockTimer
+
+    val config = KafkaConfig.fromProps(props)
+
+    val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
+    val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
+
+    groupCoordinator = GroupCoordinator(config, zkUtils, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
+    groupCoordinator.startup(false)
+
+    // add the partition into the owned partition list
+    groupPartitionId = groupCoordinator.partitionFor(groupId)
+    groupCoordinator.groupManager.addPartitionOwnership(groupPartitionId)
+  }
+
+  @After
+  def tearDown() {
+    EasyMock.reset(replicaManager)
+    if (groupCoordinator != null)
+      groupCoordinator.shutdown()
+  }
+
+  @Test
+  def testOffsetsRetentionMsIntegerOverflow() {
+    val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
+    props.setProperty(KafkaConfig.OffsetsRetentionMinutesProp, Integer.MAX_VALUE.toString)
+    val config = KafkaConfig.fromProps(props)
+    val offsetConfig = GroupCoordinator.offsetConfig(config)
+    assertEquals(offsetConfig.offsetsRetentionMs, Integer.MAX_VALUE * 60L * 1000L)
+  }
+
+  @Test
+  def testJoinGroupWrongCoordinator() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(otherGroupId, memberId, protocolType, protocols)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NOT_COORDINATOR, joinGroupError)
+  }
+
+  @Test
+  def testJoinGroupSessionTimeoutTooSmall() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMinSessionTimeout - 1)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
+  }
+
+  @Test
+  def testJoinGroupSessionTimeoutTooLarge() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols, sessionTimeout = ConsumerMaxSessionTimeout + 1)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.INVALID_SESSION_TIMEOUT, joinGroupError)
+  }
+
+  @Test
+  def testJoinGroupUnknownConsumerNewGroup() {
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, joinGroupError)
+  }
+
+  @Test
+  def testInvalidGroupId() {
+    val groupId = ""
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    assertEquals(Errors.INVALID_GROUP_ID, joinGroupResult.error)
+  }
+
+  @Test
+  def testValidJoinGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+  }
+
+  @Test
+  def testJoinGroupInconsistentProtocolType() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, "connect", protocols), 1)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
+  }
+
+  @Test
+  def testJoinGroupInconsistentGroupProtocol() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val otherMemberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, List(("range", metadata)))
+
+    EasyMock.reset(replicaManager)
+    val otherJoinGroupResult = joinGroup(groupId, otherMemberId, protocolType, List(("roundrobin", metadata)))
+
+    val joinGroupResult = await(joinGroupFuture, 1)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+    assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL, otherJoinGroupResult.error)
+  }
+
+  @Test
+  def testJoinGroupUnknownConsumerExistingGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = "memberId"
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinGroupResult = await(sendJoinGroup(groupId, otherMemberId, protocolType, protocols), 1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, otherJoinGroupResult.error)
+  }
+
+  @Test
+  def testHeartbeatWrongCoordinator() {
+
+    val heartbeatResult = heartbeat(otherGroupId, memberId, -1)
+    assertEquals(Errors.NOT_COORDINATOR, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatUnknownGroup() {
+
+    val heartbeatResult = heartbeat(groupId, memberId, -1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatUnknownConsumerExistingGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = "memberId"
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, otherMemberId, 1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatRebalanceInProgress() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatIllegalGeneration() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedMemberId, 2)
+    assertEquals(Errors.ILLEGAL_GENERATION, heartbeatResult)
+  }
+
+  @Test
+  def testValidHeartbeat() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE, heartbeatResult)
+  }
+
+  @Test
+  def testSessionTimeout() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    timer.advanceClock(DefaultSessionTimeout + 100)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+  }
+
+  @Test
+  def testHeartbeatMaintainsSession() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val sessionTimeout = 1000
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
+      rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupError)
+
+    timer.advanceClock(sessionTimeout / 2)
+
+    EasyMock.reset(replicaManager)
+    var heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE, heartbeatResult)
+
+    timer.advanceClock(sessionTimeout / 2 + 100)
+
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE, heartbeatResult)
+  }
+
+  @Test
+  def testCommitMaintainsSession() {
+    val sessionTimeout = 1000
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
+      rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val (_, syncGroupError) = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.NONE, syncGroupError)
+
+    timer.advanceClock(sessionTimeout / 2)
+
+    EasyMock.reset(replicaManager)
+    val commitOffsetResult = commitOffsets(groupId, assignedConsumerId, generationId, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    timer.advanceClock(sessionTimeout / 2 + 100)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE, heartbeatResult)
+  }
+
+  @Test
+  def testSessionTimeoutDuringRebalance() {
+    // create a group with a single member
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+      rebalanceTimeout = 2000, sessionTimeout = 1000)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    // now have a new member join to trigger a rebalance
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    timer.advanceClock(500)
+
+    EasyMock.reset(replicaManager)
+    var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+
+    // letting the session expire should make the member fall out of the group
+    timer.advanceClock(1100)
+
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, heartbeatResult)
+
+    // and the rebalance should complete with only the new member
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+  }
+
+  @Test
+  def testRebalanceCompletesBeforeMemberJoins() {
+    // create a group with a single member
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols,
+      rebalanceTimeout = 1200, sessionTimeout = 1000)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    // now have a new member join to trigger a rebalance
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    // send a couple heartbeats to keep the member alive while the rebalance finishes
+    timer.advanceClock(500)
+    EasyMock.reset(replicaManager)
+    var heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+
+    timer.advanceClock(500)
+    EasyMock.reset(replicaManager)
+    heartbeatResult = heartbeat(groupId, firstMemberId, firstGenerationId)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+
+    // now timeout the rebalance, which should kick the unjoined member out of the group
+    // and let the rebalance finish with only the new member
+    timer.advanceClock(500)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+  }
+
+  @Test
+  def testSyncGroupEmptyAssignment() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map())
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+    assertTrue(syncGroupResult._1.isEmpty)
+
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, 1)
+    assertEquals(Errors.NONE, heartbeatResult)
+  }
+
+  @Test
+  def testSyncGroupNotCoordinator() {
+    val generation = 1
+
+    val syncGroupResult = syncGroupFollower(otherGroupId, generation, memberId)
+    assertEquals(Errors.NOT_COORDINATOR, syncGroupResult._2)
+  }
+
+  @Test
+  def testSyncGroupFromUnknownGroup() {
+    val generation = 1
+
+    val syncGroupResult = syncGroupFollower(groupId, generation, memberId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, syncGroupResult._2)
+  }
+
+  @Test
+  def testSyncGroupFromUnknownMember() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val unknownMemberId = "blah"
+    val unknownMemberSyncResult = syncGroupFollower(groupId, generationId, unknownMemberId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, unknownMemberSyncResult._2)
+  }
+
+  @Test
+  def testSyncGroupFromIllegalGeneration() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    // send the sync group with an invalid generation
+    val syncGroupResult = syncGroupLeader(groupId, generationId+1, assignedConsumerId, Map(assignedConsumerId -> Array[Byte]()))
+    assertEquals(Errors.ILLEGAL_GENERATION, syncGroupResult._2)
+  }
+
+  @Test
+  def testJoinGroupFromUnchangedFollowerDoesNotRebalance() {
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    val nextGenerationId = joinResult.generationId
+
+    // this shouldn't cause a rebalance since protocol information hasn't changed
+    EasyMock.reset(replicaManager)
+    val followerJoinResult = await(sendJoinGroup(groupId, otherJoinResult.memberId, protocolType, protocols), 1)
+
+    assertEquals(Errors.NONE, followerJoinResult.error)
+    assertEquals(nextGenerationId, followerJoinResult.generationId)
+  }
+
+  @Test
+  def testJoinGroupFromUnchangedLeaderShouldRebalance() {
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    // join groups from the leader should force the group to rebalance, which allows the
+    // leader to push new assignments when local metadata changes
+
+    EasyMock.reset(replicaManager)
+    val secondJoinResult = await(sendJoinGroup(groupId, firstMemberId, protocolType, protocols), 1)
+
+    assertEquals(Errors.NONE, secondJoinResult.error)
+    assertNotEquals(firstGenerationId, secondJoinResult.generationId)
+  }
+
+  @Test
+  def testLeaderFailureInSyncGroup() {
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    val nextGenerationId = joinResult.generationId
+
+    // with no leader SyncGroup, the follower's request should failure with an error indicating
+    // that it should rejoin
+    EasyMock.reset(replicaManager)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+
+    timer.advanceClock(DefaultSessionTimeout + 100)
+
+    val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, followerSyncResult._2)
+  }
+
+  @Test
+  def testSyncGroupFollowerAfterLeader() {
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val firstJoinResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = firstJoinResult.memberId
+    val firstGenerationId = firstJoinResult.generationId
+    assertEquals(firstMemberId, firstJoinResult.leaderId)
+    assertEquals(Errors.NONE, firstJoinResult.error)
+
+    EasyMock.reset(replicaManager)
+    val firstSyncResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    assertEquals(Errors.NONE, firstSyncResult._2)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    val nextGenerationId = joinResult.generationId
+    val leaderId = firstMemberId
+    val leaderAssignment = Array[Byte](0)
+    val followerId = otherJoinResult.memberId
+    val followerAssignment = Array[Byte](1)
+
+    EasyMock.reset(replicaManager)
+    val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
+      Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
+    assertEquals(Errors.NONE, leaderSyncResult._2)
+    assertEquals(leaderAssignment, leaderSyncResult._1)
+
+    EasyMock.reset(replicaManager)
+    val followerSyncResult = syncGroupFollower(groupId, nextGenerationId, otherJoinResult.memberId)
+    assertEquals(Errors.NONE, followerSyncResult._2)
+    assertEquals(followerAssignment, followerSyncResult._1)
+  }
+
+  @Test
+  def testSyncGroupLeaderAfterFollower() {
+    // to get a group of two members:
+    // 1. join and sync with a single member (because we can't immediately join with two members)
+    // 2. join and sync with the first member and a new member
+
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val firstMemberId = joinGroupResult.memberId
+    val firstGenerationId = joinGroupResult.generationId
+    assertEquals(firstMemberId, joinGroupResult.leaderId)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, firstGenerationId, firstMemberId, Map(firstMemberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val otherJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    EasyMock.reset(replicaManager)
+    val joinFuture = sendJoinGroup(groupId, firstMemberId, protocolType, protocols)
+
+    val joinResult = await(joinFuture, DefaultSessionTimeout+100)
+    val otherJoinResult = await(otherJoinFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, joinResult.error)
+    assertEquals(Errors.NONE, otherJoinResult.error)
+    assertTrue(joinResult.generationId == otherJoinResult.generationId)
+
+    val nextGenerationId = joinResult.generationId
+    val leaderId = joinResult.leaderId
+    val leaderAssignment = Array[Byte](0)
+    val followerId = otherJoinResult.memberId
+    val followerAssignment = Array[Byte](1)
+
+    assertEquals(firstMemberId, joinResult.leaderId)
+    assertEquals(firstMemberId, otherJoinResult.leaderId)
+
+    EasyMock.reset(replicaManager)
+    val followerSyncFuture = sendSyncGroupFollower(groupId, nextGenerationId, followerId)
+
+    EasyMock.reset(replicaManager)
+    val leaderSyncResult = syncGroupLeader(groupId, nextGenerationId, leaderId,
+      Map(leaderId -> leaderAssignment, followerId -> followerAssignment))
+    assertEquals(Errors.NONE, leaderSyncResult._2)
+    assertEquals(leaderAssignment, leaderSyncResult._1)
+
+    val followerSyncResult = await(followerSyncFuture, DefaultSessionTimeout+100)
+    assertEquals(Errors.NONE, followerSyncResult._2)
+    assertEquals(followerAssignment, followerSyncResult._1)
+  }
+
+  @Test
+  def testCommitOffsetFromUnknownGroup() {
+    val generationId = 1
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
+    assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testCommitOffsetWithDefaultGeneration() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testFetchOffsets() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testBasicFetchTxnOffsets() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val producerId = 1000L
+    val producerEpoch : Short = 2
+
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+
+    // Validate that the offset isn't materialjzed yet.
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
+
+    val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+
+    // Send commit marker.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
+
+    // Validate that committed offset is materialized.
+    val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(0), secondReqPartitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsWithAbort() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val producerId = 1000L
+    val producerEpoch : Short = 2
+
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
+
+    val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+
+    // Validate that the pending commit is discarded.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
+
+    val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsIgnoreSpuriousCommit() {
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+    val producerId = 1000L
+    val producerEpoch : Short = 2
+
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(tp -> offset))
+    assertEquals(Errors.NONE, commitOffsetResult(tp))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
+
+    val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.ABORT)
+
+    val (secondReqError, secondReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), secondReqPartitionData.get(tp).map(_.offset))
+
+    // Ignore spurious commit.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetsTopic), TransactionResult.COMMIT)
+
+    val (thirdReqError, thirdReqPartitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, secondReqError)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), thirdReqPartitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsOneProducerMultipleGroups() {
+    // One producer, two groups located on separate offsets topic partitions.
+    // Both group have pending offset commits.
+    // Marker for only one partition is received. That commit should be materialized while the other should not.
+
+    val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
+    val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+    val producerId = 1000L
+    val producerEpoch: Short = 3
+
+    val groupIds = List(groupId, otherGroupId)
+    val offsetTopicPartitions = List(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)),
+      new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(otherGroupId)))
+
+    groupCoordinator.groupManager.addPartitionOwnership(offsetTopicPartitions(1).partition)
+    val errors = mutable.ArrayBuffer[Errors]()
+    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+
+    val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
+
+    // Ensure that the two groups map to different partitions.
+    assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
+
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, producerEpoch, immutable.Map(partitions(0) -> offsets(0))))
+    assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
+    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, producerId, producerEpoch, immutable.Map(partitions(1) -> offsets(1))))
+    assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+
+    // We got a commit for only one __consumer_offsets partition. We should only materialize it's group offsets.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), TransactionResult.COMMIT)
+    groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+     groupCoordinator.handleFetchOffsets(groupIds(1), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+    assertEquals(2, errors.size)
+    assertEquals(Errors.NONE, errors(0))
+    assertEquals(Errors.NONE, errors(1))
+
+    // Exactly one offset commit should have been materialized.
+    assertEquals(Some(offsets(0).offset), partitionData(0).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(1)).map(_.offset))
+
+    // Now we receive the other marker.
+    groupCoordinator.handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), TransactionResult.COMMIT)
+    errors.clear()
+    partitionData.clear()
+    groupCoordinator.handleFetchOffsets(groupIds(0), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+     groupCoordinator.handleFetchOffsets(groupIds(1), Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+    // Two offsets should have been materialized
+    assertEquals(Some(offsets(0).offset), partitionData(0).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(1).get(partitions(0)).map(_.offset))
+    assertEquals(Some(offsets(1).offset), partitionData(1).get(partitions(1)).map(_.offset))
+  }
+
+  @Test
+  def testFetchTxnOffsetsMultipleProducersOneGroup() {
+    // One group, two producers
+    // Different producers will commit offsets for different partitions.
+    // Each partition's offsets should be materialized when the corresponding producer's marker is received.
+
+    val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
+    val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+    val producerIds = List(1000L, 1005L)
+    val producerEpochs: Seq[Short] = List(3, 4)
+
+    val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId))
+
+    val errors = mutable.ArrayBuffer[Errors]()
+    val partitionData = mutable.ArrayBuffer[Map[TopicPartition, OffsetFetchResponse.PartitionData]]()
+
+    val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
+
+    // producer0 commits the offsets for partition0
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(0), producerEpochs(0), immutable.Map(partitions(0) -> offsets(0))))
+    assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
+
+    // producer1 commits the offsets for partition1
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerIds(1), producerEpochs(1), immutable.Map(partitions(1) -> offsets(1))))
+    assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+
+    // producer0 commits its transaction.
+    groupCoordinator.handleTxnCompletion(producerIds(0), List(offsetTopicPartition), TransactionResult.COMMIT)
+    groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+    assertEquals(Errors.NONE, errors(0))
+
+    // We should only see the offset commit for producer0
+    assertEquals(Some(offsets(0).offset), partitionData(0).get(partitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData(0).get(partitions(1)).map(_.offset))
+
+    // producer1 now commits its transaction.
+    groupCoordinator.handleTxnCompletion(producerIds(1), List(offsetTopicPartition), TransactionResult.COMMIT)
+
+    groupCoordinator.handleFetchOffsets(groupId, Some(partitions)) match {
+      case (error, partData) =>
+        errors.append(error)
+        partitionData.append(partData)
+      case _ =>
+    }
+
+    assertEquals(Errors.NONE, errors(1))
+
+    // We should now see the offset commits for both producers.
+    assertEquals(Some(offsets(0).offset), partitionData(1).get(partitions(0)).map(_.offset))
+    assertEquals(Some(offsets(1).offset), partitionData(1).get(partitions(1)).map(_.offset))
+  }
+
+  @Test
+  def testFetchOffsetForUnknownPartition(): Unit = {
+    val tp = new TopicPartition("topic", 0)
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
+    assertEquals(Errors.NONE, error)
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), partitionData.get(tp).map(_.offset))
+  }
+
+  @Test
+  def testFetchOffsetNotCoordinatorForGroup(): Unit = {
+    val tp = new TopicPartition("topic", 0)
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(otherGroupId, Some(Seq(tp)))
+    assertEquals(Errors.NOT_COORDINATOR, error)
+    assertTrue(partitionData.isEmpty)
+  }
+
+  @Test
+  def testFetchAllOffsets() {
+    val tp1 = new TopicPartition("topic", 0)
+    val tp2 = new TopicPartition("topic", 1)
+    val tp3 = new TopicPartition("other-topic", 0)
+    val offset1 = OffsetAndMetadata(15)
+    val offset2 = OffsetAndMetadata(16)
+    val offset3 = OffsetAndMetadata(17)
+
+    assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
+
+    val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, immutable.Map(tp1 -> offset1, tp2 -> offset2, tp3 -> offset3))
+    assertEquals(Errors.NONE, commitOffsetResult(tp1))
+    assertEquals(Errors.NONE, commitOffsetResult(tp2))
+    assertEquals(Errors.NONE, commitOffsetResult(tp3))
+
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(3, partitionData.size)
+    assertTrue(partitionData.forall(_._2.error == Errors.NONE))
+    assertEquals(Some(offset1.offset), partitionData.get(tp1).map(_.offset))
+    assertEquals(Some(offset2.offset), partitionData.get(tp2).map(_.offset))
+    assertEquals(Some(offset3.offset), partitionData.get(tp3).map(_.offset))
+  }
+
+  @Test
+  def testCommitOffsetInAwaitingSync() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val tp = new TopicPartition("topic", 0)
+    val offset = OffsetAndMetadata(0)
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, generationId, immutable.Map(tp -> offset))
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
+  }
+
+  @Test
+  def testHeartbeatDuringRebalanceCausesRebalanceInProgress() {
+    // First start up a group (with a slightly larger timeout to give us time to heartbeat when the rebalance starts)
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val assignedConsumerId = joinGroupResult.memberId
+    val initialGenerationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    // Then join with a new consumer to trigger a rebalance
+    EasyMock.reset(replicaManager)
+    sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+
+    // We should be in the middle of a rebalance, so the heartbeat should return rebalance in progress
+    EasyMock.reset(replicaManager)
+    val heartbeatResult = heartbeat(groupId, assignedConsumerId, initialGenerationId)
+    assertEquals(Errors.REBALANCE_IN_PROGRESS, heartbeatResult)
+  }
+
+  @Test
+  def testGenerationIdIncrementsOnRebalance() {
+    val joinGroupResult = joinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    val initialGenerationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    val memberId = joinGroupResult.memberId
+    assertEquals(1, initialGenerationId)
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, initialGenerationId, memberId, Map(memberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val joinGroupFuture = sendJoinGroup(groupId, memberId, protocolType, protocols)
+    val otherJoinGroupResult = await(joinGroupFuture, 1)
+
+    val nextGenerationId = otherJoinGroupResult.generationId
+    val otherJoinGroupError = otherJoinGroupResult.error
+    assertEquals(2, nextGenerationId)
+    assertEquals(Errors.NONE, otherJoinGroupError)
+  }
+
+  @Test
+  def testLeaveGroupWrongCoordinator() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val leaveGroupResult = leaveGroup(otherGroupId, memberId)
+    assertEquals(Errors.NOT_COORDINATOR, leaveGroupResult)
+  }
+
+  @Test
+  def testLeaveGroupUnknownGroup() {
+
+    val leaveGroupResult = leaveGroup(groupId, memberId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
+  }
+
+  @Test
+  def testLeaveGroupUnknownConsumerExistingGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val otherMemberId = "consumerId"
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, otherMemberId)
+    assertEquals(Errors.UNKNOWN_MEMBER_ID, leaveGroupResult)
+  }
+
+  @Test
+  def testValidLeaveGroup() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val leaveGroupResult = leaveGroup(groupId, assignedMemberId)
+    assertEquals(Errors.NONE, leaveGroupResult)
+  }
+
+  @Test
+  def testListGroupsIncludesStableGroups() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    val (error, groups) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    assertEquals(GroupOverview("groupId", "consumer"), groups.head)
+  }
+
+  @Test
+  def testListGroupsIncludesRebalancingGroups() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+
+    val (error, groups) = groupCoordinator.handleListGroups()
+    assertEquals(Errors.NONE, error)
+    assertEquals(1, groups.size)
+    assertEquals(GroupOverview("groupId", "consumer"), groups.head)
+  }
+
+  @Test
+  def testDescribeGroupWrongCoordinator() {
+    EasyMock.reset(replicaManager)
+    val (error, _) = groupCoordinator.handleDescribeGroup(otherGroupId)
+    assertEquals(Errors.NOT_COORDINATOR, error)
+  }
+
+  @Test
+  def testDescribeGroupInactiveGroup() {
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(GroupCoordinator.DeadGroup, summary)
+  }
+
+  @Test
+  def testDescribeGroupStable() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val assignedMemberId = joinGroupResult.memberId
+    val generationId = joinGroupResult.generationId
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val syncGroupResult = syncGroupLeader(groupId, generationId, assignedMemberId, Map(assignedMemberId -> Array[Byte]()))
+
+    val syncGroupError = syncGroupResult._2
+    assertEquals(Errors.NONE, syncGroupError)
+
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(protocolType, summary.protocolType)
+    assertEquals("range", summary.protocol)
+    assertEquals(List(assignedMemberId), summary.members.map(_.memberId))
+  }
+
+  @Test
+  def testDescribeGroupRebalancing() {
+    val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
+    val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
+    val joinGroupError = joinGroupResult.error
+    assertEquals(Errors.NONE, joinGroupError)
+
+    EasyMock.reset(replicaManager)
+    val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
+    assertEquals(Errors.NONE, error)
+    assertEquals(protocolType, summary.protocolType)
+    assertEquals(GroupCoordinator.NoProtocol, summary.protocol)
+    assertEquals(AwaitingSync.toString, summary.state)
+    assertTrue(summary.members.map(_.memberId).contains(joinGroupResult.memberId))
+    assertTrue(summary.members.forall(_.metadata.isEmpty))
+    assertTrue(summary.members.forall(_.assignment.isEmpty))
+  }
+
+  @Test
+  def shouldDelayInitialRebalanceByGroupInitialRebalanceDelayOnEmptyGroup() {
+    val firstJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols)
+    timer.advanceClock(GroupInitialRebalanceDelay / 2)
+    verifyDelayedTaskNotCompleted(firstJoinFuture)
+    timer.advanceClock((GroupInitialRebalanceDelay / 2) + 1)
+    val joinGroupResult = await(firstJoinFuture, 1)
+    assertEquals(Errors.NONE, joinGroupResult.error)
+  }
+
+  private def verifyDelayedTaskNotCompleted(firstJoinFuture: Future[JoinGroupResult]) = {
+    try {
+      await(firstJoinFuture, 1)
+      Assert.fail("should have timed out as rebalance delay not expired")
+    } catch {
+      case _: TimeoutException => // ok
+    }
+  }
+
+  @Test
+  def shouldResetRebalanceDelayWhenNewMemberJoinsGroupInInitialRebalance() {
+    val rebalanceTimeout = GroupInitialRebalanceDelay * 3
+    val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+    EasyMock.reset(replicaManager)
+    timer.advanceClock(GroupInitialRebalanceDelay - 1)
+    val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+    EasyMock.reset(replicaManager)
+    timer.advanceClock(2)
+
+    // advance past initial rebalance delay and make sure that tasks
+    // haven't been completed
+    timer.advanceClock(GroupInitialRebalanceDelay / 2 + 1)
+    verifyDelayedTaskNotCompleted(firstMemberJoinFuture)
+    verifyDelayedTaskNotCompleted(secondMemberJoinFuture)
+    // advance clock beyond updated delay and make sure the
+    // tasks have completed
+    timer.advanceClock(GroupInitialRebalanceDelay / 2)
+    val firstResult = await(firstMemberJoinFuture, 1)
+    val secondResult = await(secondMemberJoinFuture, 1)
+    assertEquals(Errors.NONE, firstResult.error)
+    assertEquals(Errors.NONE, secondResult.error)
+  }
+
+  @Test
+  def shouldDelayRebalanceUptoRebalanceTimeout() {
+    val rebalanceTimeout = GroupInitialRebalanceDelay * 2
+    val firstMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+    EasyMock.reset(replicaManager)
+    val secondMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    EasyMock.reset(replicaManager)
+    val thirdMemberJoinFuture = sendJoinGroup(groupId, JoinGroupRequest.UNKNOWN_MEMBER_ID, protocolType, protocols, rebalanceTimeout)
+    timer.advanceClock(GroupInitialRebalanceDelay)
+    EasyMock.reset(replicaManager)
+
+    verifyDelayedTaskNotCompleted(firstMemberJoinFuture)
+    verifyDelayedTaskNotCompleted(secondMemberJoinFuture)
+    verifyDelayedTaskNotCompleted(thirdMemberJoinFuture)
+
+    // advance clock beyond rebalanceTimeout
+    timer.advanceClock(1)
+
+    val firstResult = await(firstMemberJoinFuture, 1)
+    val secondResult = await(secondMemberJoinFuture, 1)
+    val thirdResult = await(thirdMemberJoinFuture, 1)
+    assertEquals(Errors.NONE, firstResult.error)
+    assertEquals(Errors.NONE, secondResult.error)
+    assertEquals(Errors.NONE, thirdResult.error)
+  }
+
+  private def setupJoinGroupCallback: (Future[JoinGroupResult], JoinGroupCallback) = {
+    val responsePromise = Promise[JoinGroupResult]
+    val responseFuture = responsePromise.future
+    val responseCallback: JoinGroupCallback = responsePromise.success(_)
+    (responseFuture, responseCallback)
+  }
+
+  private def setupSyncGroupCallback: (Future[SyncGroupCallbackParams], SyncGroupCallback) = {
+    val responsePromise = Promise[SyncGroupCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: SyncGroupCallback = (assignment, error) =>
+      responsePromise.success((assignment, error))
+    (responseFuture, responseCallback)
+  }
+
+  private def setupHeartbeatCallback: (Future[HeartbeatCallbackParams], HeartbeatCallback) = {
+    val responsePromise = Promise[HeartbeatCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: HeartbeatCallback = error => responsePromise.success(error)
+    (responseFuture, responseCallback)
+  }
+
+  private def setupCommitOffsetsCallback: (Future[CommitOffsetCallbackParams], CommitOffsetCallback) = {
+    val responsePromise = Promise[CommitOffsetCallbackParams]
+    val responseFuture = responsePromise.future
+    val responseCallback: CommitOffsetCallback = offsets => responsePromise.success(offsets)
+    (responseFuture, responseCallback)
+  }
+
+  private def sendJoinGroup(groupId: String,
+                            memberId: String,
+                            protocolType: String,
+                            protocols: List[(String, Array[Byte])],
+                            rebalanceTimeout: Int = DefaultRebalanceTimeout,
+                            sessionTimeout: Int = DefaultSessionTimeout): Future[JoinGroupResult] = {
+    val (responseFuture, responseCallback) = setupJoinGroupCallback
+
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleJoinGroup(groupId, memberId, "clientId", "clientHost", rebalanceTimeout, sessionTimeout,
+      protocolType, protocols, responseCallback)
+    responseFuture
+  }
+
+
+  private def sendSyncGroupLeader(groupId: String,
+                                  generation: Int,
+                                  leaderId: String,
+                                  assignment: Map[String, Array[Byte]]): Future[SyncGroupCallbackParams] = {
+    val (responseFuture, responseCallback) = setupSyncGroupCallback
+
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+        )
+      )})
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleSyncGroup(groupId, generation, leaderId, assignment, responseCallback)
+    responseFuture
+  }
+
+  private def sendSyncGroupFollower(groupId: String,
+                                    generation: Int,
+                                    memberId: String): Future[SyncGroupCallbackParams] = {
+    val (responseFuture, responseCallback) = setupSyncGroupCallback
+
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleSyncGroup(groupId, generation, memberId, Map.empty[String, Array[Byte]], responseCallback)
+    responseFuture
+  }
+
+  private def joinGroup(groupId: String,
+                        memberId: String,
+                        protocolType: String,
+                        protocols: List[(String, Array[Byte])],
+                        sessionTimeout: Int = DefaultSessionTimeout,
+                        rebalanceTimeout: Int = DefaultRebalanceTimeout): JoinGroupResult = {
+    val responseFuture = sendJoinGroup(groupId, memberId, protocolType, protocols, rebalanceTimeout, sessionTimeout)
+    timer.advanceClock(GroupInitialRebalanceDelay + 1)
+    // should only have to wait as long as session timeout, but allow some extra time in case of an unexpected delay
+    Await.result(responseFuture, Duration(rebalanceTimeout + 100, TimeUnit.MILLISECONDS))
+  }
+
+
+  private def syncGroupFollower(groupId: String,
+                                generationId: Int,
+                                memberId: String,
+                                sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = {
+    val responseFuture = sendSyncGroupFollower(groupId, generationId, memberId)
+    Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS))
+  }
+
+  private def syncGroupLeader(groupId: String,
+                              generationId: Int,
+                              memberId: String,
+                              assignment: Map[String, Array[Byte]],
+                              sessionTimeout: Int = DefaultSessionTimeout): SyncGroupCallbackParams = {
+    val responseFuture = sendSyncGroupLeader(groupId, generationId, memberId, assignment)
+    Await.result(responseFuture, Duration(sessionTimeout + 100, TimeUnit.MILLISECONDS))
+  }
+
+  private def heartbeat(groupId: String,
+                        consumerId: String,
+                        generationId: Int): HeartbeatCallbackParams = {
+    val (responseFuture, responseCallback) = setupHeartbeatCallback
+
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleHeartbeat(groupId, consumerId, generationId, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+  private def await[T](future: Future[T], millis: Long): T = {
+    Await.result(future, Duration(millis, TimeUnit.MILLISECONDS))
+  }
+
+  private def commitOffsets(groupId: String,
+                            consumerId: String,
+                            generationId: Int,
+                            offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+    val (responseFuture, responseCallback) = setupCommitOffsetsCallback
+
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])
+    ).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+        )
+      )})
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleCommitOffsets(groupId, consumerId, generationId, offsets, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+  private def commitTransactionalOffsets(groupId: String,
+                                         producerId: Long,
+                                         producerEpoch: Short,
+                                         offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+    val (responseFuture, responseCallback) = setupCommitOffsetsCallback
+
+    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+      EasyMock.anyShort(),
+      internalTopicsAllowed = EasyMock.eq(true),
+      isFromClient = EasyMock.eq(false),
+      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
+      EasyMock.capture(capturedArgument),
+      EasyMock.anyObject().asInstanceOf[Option[Object]])
+    ).andAnswer(new IAnswer[Unit] {
+      override def answer = capturedArgument.getValue.apply(
+        Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupCoordinator.partitionFor(groupId)) ->
+          new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP)
+        )
+      )})
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V2)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleTxnCommitOffsets(groupId, producerId, producerEpoch, offsets, responseCallback)
+    val result = Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+    EasyMock.reset(replicaManager)
+    result
+  }
+
+  private def leaveGroup(groupId: String, consumerId: String): LeaveGroupCallbackParams = {
+    val (responseFuture, responseCallback) = setupHeartbeatCallback
+
+    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId))).andReturn(None)
+    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(Some(RecordBatch.MAGIC_VALUE_V1)).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    groupCoordinator.handleLeaveGroup(groupId, consumerId, responseCallback)
+    Await.result(responseFuture, Duration(40, TimeUnit.MILLISECONDS))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 0e13f89..2db6603 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -455,7 +455,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
     assertTrue(group.hasOffsets)
     assertEquals(None, group.offset(partition))
-    group.failPendingTxnOffsetCommit(producerId, partition, txnOffsetCommit)
+    group.failPendingTxnOffsetCommit(producerId, partition)
     assertFalse(group.hasOffsets)
     assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 7e50049..fa2e55b 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -250,19 +250,20 @@ class RequestQuotaTest extends BaseRequestTest {
           new OffsetsForLeaderEpochRequest.Builder().add(tp, 0)
 
         case ApiKeys.ADD_PARTITIONS_TO_TXN =>
-          new AddPartitionsToTxnRequest.Builder("txn1", 1, 0, List(tp).asJava)
+          new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava)
 
         case ApiKeys.ADD_OFFSETS_TO_TXN =>
-          new AddOffsetsToTxnRequest.Builder("txn1", 1, 0, "test-txn-group")
+          new AddOffsetsToTxnRequest.Builder("test-transactional-id", 1, 0, "test-txn-group")
 
         case ApiKeys.END_TXN =>
-          new EndTxnRequest.Builder("txn1", 1, 0, TransactionResult.forId(false))
+          new EndTxnRequest.Builder("test-transactional-id", 1, 0, TransactionResult.forId(false))
 
         case ApiKeys.WRITE_TXN_MARKERS =>
           new WriteTxnMarkersRequest.Builder(List.empty.asJava)
 
         case ApiKeys.TXN_OFFSET_COMMIT =>
-          new TxnOffsetCommitRequest.Builder("test-txn-group", 2, 0, Map.empty.asJava)
+          new TxnOffsetCommitRequest.Builder("test-transactional-id", "test-txn-group", 2, 0,
+            Map.empty[TopicPartition, TxnOffsetCommitRequest.CommittedOffset].asJava)
 
         case ApiKeys.DESCRIBE_ACLS =>
           new DescribeAclsRequest.Builder(AclBindingFilter.ANY)

http://git-wip-us.apache.org/repos/asf/kafka/blob/38f6cae9/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 70c340b..054a4ff 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1340,21 +1340,19 @@ object TestUtils extends Logging {
   }
 
   // Seeds the given topic with records with keys and values in the range [0..numRecords)
-  def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Int = {
+  def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Unit = {
     val props = new Properties()
     props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
-    var recordsWritten = 0
-    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
     try {
       for (i <- 0 until numRecords) {
         producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString)))
-        recordsWritten += 1
       }
       producer.flush()
     } finally {
       producer.close()
     }
-    recordsWritten
   }
 
   private def asString(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
@@ -1404,7 +1402,7 @@ object TestUtils extends Logging {
     offsetsToCommit.toMap
   }
 
-  def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+  def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
     val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
     TestUtils.waitUntilTrue(() => {
       records ++= consumer.poll(50)


Mime
View raw message