kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [04/10] kafka git commit: KAFKA-5059: Implement Transactional Coordinator
Date Wed, 26 Apr 2017 21:11:07 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
deleted file mode 100644
index b1f68bb..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ /dev/null
@@ -1,814 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.coordinator
-
-import java.nio.ByteBuffer
-
-import kafka.api.ApiVersion
-import kafka.cluster.Partition
-import kafka.common.{OffsetAndMetadata, Topic}
-import kafka.log.{Log, LogAppendInfo}
-import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
-import kafka.utils.TestUtils.fail
-import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
-import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.OffsetFetchResponse
-import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
-import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
-import org.junit.{Before, Test}
-
-import scala.collection.JavaConverters._
-import scala.collection._
-
-class GroupMetadataManagerTest {
-
-  var time: MockTime = null
-  var replicaManager: ReplicaManager = null
-  var groupMetadataManager: GroupMetadataManager = null
-  var scheduler: KafkaScheduler = null
-  var zkUtils: ZkUtils = null
-  var partition: Partition = null
-
-  val groupId = "foo"
-  val groupPartitionId = 0
-  val protocolType = "protocolType"
-  val rebalanceTimeout = 60000
-  val sessionTimeout = 10000
-
-  @Before
-  def setUp() {
-    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(nodeId = 0, zkConnect = ""))
-
-    val offsetConfig = OffsetConfig(maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-
-    // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
-    zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-    EasyMock.expect(zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName)).andReturn(Some(2))
-    EasyMock.replay(zkUtils)
-
-    time = new MockTime
-    replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-    groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkUtils, time)
-    partition = EasyMock.niceMock(classOf[Partition])
-  }
-
-  @Test
-  def testLoadOffsetsWithoutGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
-    val startOffset = 15L
-
-    val committedOffsets = Map(
-      new TopicPartition("foo", 0) -> 23L,
-      new TopicPartition("foo", 1) -> 455L,
-      new TopicPartition("bar", 0) -> 8992L
-    )
-
-    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, offsetCommitRecords: _*)
-    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
-
-    EasyMock.replay(replicaManager)
-    
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
-
-    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
-    assertEquals(groupId, group.groupId)
-    assertEquals(Empty, group.currentState)
-    assertEquals(committedOffsets.size, group.allOffsets.size)
-    committedOffsets.foreach { case (topicPartition, offset) =>
-      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
-    }
-  }
-
-  @Test
-  def testLoadOffsetsWithTombstones() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
-    val startOffset = 15L
-
-    val tombstonePartition = new TopicPartition("foo", 1)
-    val committedOffsets = Map(
-      new TopicPartition("foo", 0) -> 23L,
-      tombstonePartition -> 455L,
-      new TopicPartition("bar", 0) -> 8992L
-    )
-
-    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val tombstone = new SimpleRecord(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition), null)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(tombstone): _*)
-
-    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
-
-    EasyMock.replay(replicaManager)
-
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
-
-    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
-    assertEquals(groupId, group.groupId)
-    assertEquals(Empty, group.currentState)
-    assertEquals(committedOffsets.size - 1, group.allOffsets.size)
-    committedOffsets.foreach { case (topicPartition, offset) =>
-      if (topicPartition == tombstonePartition)
-        assertEquals(None, group.offset(topicPartition))
-      else
-        assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
-    }
-  }
-
-  @Test
-  def testLoadOffsetsAndGroup() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
-    val startOffset = 15L
-    val committedOffsets = Map(
-      new TopicPartition("foo", 0) -> 23L,
-      new TopicPartition("foo", 1) -> 455L,
-      new TopicPartition("bar", 0) -> 8992L
-    )
-
-    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
-
-    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
-
-    EasyMock.replay(replicaManager)
-
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
-
-    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
-    assertEquals(groupId, group.groupId)
-    assertEquals(Stable, group.currentState)
-    assertEquals(memberId, group.leaderId)
-    assertEquals(Set(memberId), group.allMembers)
-    assertEquals(committedOffsets.size, group.allOffsets.size)
-    committedOffsets.foreach { case (topicPartition, offset) =>
-      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
-    }
-  }
-
-  @Test
-  def testLoadGroupWithTombstone() {
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
-    val startOffset = 15L
-
-    val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
-    val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      Seq(groupMetadataRecord, groupMetadataTombstone): _*)
-
-    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
-
-    EasyMock.replay(replicaManager)
-
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
-
-    assertEquals(None, groupMetadataManager.getGroup(groupId))
-  }
-
-  @Test
-  def testOffsetWriteAfterGroupRemoved(): Unit = {
-    // this test case checks the following scenario:
-    // 1. the group exists at some point in time, but is later removed (because all members left)
-    // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit some offsets
-
-    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
-    val startOffset = 15L
-
-    val committedOffsets = Map(
-      new TopicPartition("foo", 0) -> 23L,
-      new TopicPartition("foo", 1) -> 455L,
-      new TopicPartition("bar", 0) -> 8992L
-    )
-    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
-    val memberId = "98098230493"
-    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
-    val groupMetadataTombstone = new SimpleRecord(GroupMetadataManager.groupMetadataKey(groupId), null)
-    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
-      Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
-
-    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
-
-    EasyMock.replay(replicaManager)
-
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
-
-    val group = groupMetadataManager.getGroup(groupId).getOrElse(TestUtils.fail("Group was not loaded into the cache"))
-    assertEquals(groupId, group.groupId)
-    assertEquals(Empty, group.currentState)
-    assertEquals(committedOffsets.size, group.allOffsets.size)
-    committedOffsets.foreach { case (topicPartition, offset) =>
-      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
-    }
-  }
-
-  @Test
-  def testAddGroup() {
-    val group = new GroupMetadata("foo")
-    assertEquals(group, groupMetadataManager.addGroup(group))
-    assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo")))
-  }
-
-  @Test
-  def testStoreEmptyGroup() {
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    expectAppendMessage(Errors.NONE)
-    EasyMock.replay(replicaManager)
-
-    var maybeError: Option[Errors] = None
-    def callback(error: Errors) {
-      maybeError = Some(error)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
-    groupMetadataManager.store(delayedStore)
-    assertEquals(Some(Errors.NONE), maybeError)
-  }
-
-  @Test
-  def testStoreGroupErrorMapping() {
-    assertStoreGroupErrorMapping(Errors.NONE, Errors.NONE)
-    assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
-    assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN)
-    assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN)
-    assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN)
-    assertStoreGroupErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
-  }
-
-  private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
-    EasyMock.reset(replicaManager)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    expectAppendMessage(appendError)
-    EasyMock.replay(replicaManager)
-
-    var maybeError: Option[Errors] = None
-    def callback(error: Errors) {
-      maybeError = Some(error)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map.empty, callback).get
-    groupMetadataManager.store(delayedStore)
-    assertEquals(Some(expectedError), maybeError)
-
-    EasyMock.verify(replicaManager)
-  }
-
-  @Test
-  def testStoreNonEmptyGroup() {
-    val memberId = "memberId"
-    val clientId = "clientId"
-    val clientHost = "localhost"
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
-      protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
-    group.transitionTo(PreparingRebalance)
-    group.initNextGeneration()
-
-    expectAppendMessage(Errors.NONE)
-    EasyMock.replay(replicaManager)
-
-    var maybeError: Option[Errors] = None
-    def callback(error: Errors) {
-      maybeError = Some(error)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback).get
-    groupMetadataManager.store(delayedStore)
-    assertEquals(Some(Errors.NONE), maybeError)
-  }
-
-  @Test
-  def testStoreNonEmptyGroupWhenCoordinatorHasMoved() {
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
-    val memberId = "memberId"
-    val clientId = "clientId"
-    val clientHost = "localhost"
-
-    val group = new GroupMetadata(groupId)
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
-      protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
-    group.transitionTo(PreparingRebalance)
-    group.initNextGeneration()
-
-    EasyMock.replay(replicaManager)
-
-    var maybeError: Option[Errors] = None
-    def callback(error: Errors) {
-      maybeError = Some(error)
-    }
-
-    groupMetadataManager.prepareStoreGroup(group, Map(memberId -> Array[Byte]()), callback)
-    assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
-    EasyMock.verify(replicaManager)
-  }
-
-  @Test
-  def testCommitOffset() {
-    val memberId = ""
-    val generationId = -1
-    val topicPartition = new TopicPartition("foo", 0)
-    val offset = 37
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
-
-    expectAppendMessage(Errors.NONE)
-    EasyMock.replay(replicaManager)
-
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]) {
-      commitErrors = Some(errors)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
-    assertTrue(group.hasOffsets)
-
-    groupMetadataManager.store(delayedStore)
-
-    assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NONE), maybeError)
-    assertTrue(group.hasOffsets)
-
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition)))
-    val maybePartitionResponse = cachedOffsets.get(topicPartition)
-    assertFalse(maybePartitionResponse.isEmpty)
-
-    val partitionResponse = maybePartitionResponse.get
-    assertEquals(Errors.NONE, partitionResponse.error)
-    assertEquals(offset, partitionResponse.offset)
-  }
-
-  @Test
-  def testCommitOffsetWhenCoordinatorHasMoved() {
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andReturn(None)
-    val memberId = ""
-    val generationId = -1
-    val topicPartition = new TopicPartition("foo", 0)
-    val offset = 37
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
-
-    EasyMock.replay(replicaManager)
-
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]) {
-      commitErrors = Some(errors)
-    }
-
-    groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
-
-    assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
-    EasyMock.verify(replicaManager)
-  }
-
-  @Test
-  def testCommitOffsetFailure() {
-    assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
-    assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
-    assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
-    assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)
-    assertCommitOffsetErrorMapping(Errors.CORRUPT_MESSAGE, Errors.CORRUPT_MESSAGE)
-  }
-
-  private def assertCommitOffsetErrorMapping(appendError: Errors, expectedError: Errors): Unit = {
-    EasyMock.reset(replicaManager)
-
-    val memberId = ""
-    val generationId = -1
-    val topicPartition = new TopicPartition("foo", 0)
-    val offset = 37
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
-
-    expectAppendMessage(appendError)
-    EasyMock.replay(replicaManager)
-
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]) {
-      commitErrors = Some(errors)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
-    assertTrue(group.hasOffsets)
-
-    groupMetadataManager.store(delayedStore)
-
-    assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
-    assertEquals(Some(expectedError), maybeError)
-    assertFalse(group.hasOffsets)
-
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition).map(_.offset))
-
-    EasyMock.verify(replicaManager)
-  }
-
-  @Test
-  def testExpireOffset() {
-    val memberId = ""
-    val generationId = -1
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
-    val offset = 37
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    // expire the offset after 1 millisecond
-    val startMs = time.milliseconds
-    val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
-      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
-
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    expectAppendMessage(Errors.NONE)
-    EasyMock.replay(replicaManager)
-
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]) {
-      commitErrors = Some(errors)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
-    assertTrue(group.hasOffsets)
-
-    groupMetadataManager.store(delayedStore)
-    assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
-
-    // expire only one of the offsets
-    time.sleep(2)
-
-    EasyMock.reset(partition)
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
-    EasyMock.replay(partition)
-
-    groupMetadataManager.cleanupGroupMetadata()
-
-    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(Some(offset), group.offset(topicPartition2).map(_.offset))
-
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
-  }
-
-  @Test
-  def testGroupMetadataRemoval() {
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-    group.generationId = 5
-
-    // expect the group metadata tombstone
-    EasyMock.reset(partition)
-    val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
-
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
-    EasyMock.replay(replicaManager, partition)
-
-    groupMetadataManager.cleanupGroupMetadata()
-
-    assertTrue(recordsCapture.hasCaptured)
-
-    val records = recordsCapture.getValue.records.asScala.toList
-    recordsCapture.getValue.batches.asScala.foreach { batch =>
-      assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
-      assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
-    }
-    assertEquals(1, records.size)
-
-    val metadataTombstone = records.head
-    assertTrue(metadataTombstone.hasKey)
-    assertFalse(metadataTombstone.hasValue)
-    assertTrue(metadataTombstone.timestamp > 0)
-
-    val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
-    assertEquals(groupId, groupKey.key)
-
-    // the full group should be gone since all offsets were removed
-    assertEquals(None, groupMetadataManager.getGroup(groupId))
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
-  }
-
-  @Test
-  def testGroupMetadataRemovalWithLogAppendTime() {
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-    group.generationId = 5
-
-    // expect the group metadata tombstone
-    EasyMock.reset(partition)
-    val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
-
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
-    EasyMock.replay(replicaManager, partition)
-
-    groupMetadataManager.cleanupGroupMetadata()
-
-    assertTrue(recordsCapture.hasCaptured)
-
-    val records = recordsCapture.getValue.records.asScala.toList
-    recordsCapture.getValue.batches.asScala.foreach { batch =>
-      assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, batch.magic)
-      // Use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
-      assertEquals(TimestampType.CREATE_TIME, batch.timestampType)
-    }
-    assertEquals(1, records.size)
-
-    val metadataTombstone = records.head
-    assertTrue(metadataTombstone.hasKey)
-    assertFalse(metadataTombstone.hasValue)
-    assertTrue(metadataTombstone.timestamp > 0)
-
-    val groupKey = GroupMetadataManager.readMessageKey(metadataTombstone.key).asInstanceOf[GroupMetadataKey]
-    assertEquals(groupId, groupKey.key)
-
-    // the full group should be gone since all offsets were removed
-    assertEquals(None, groupMetadataManager.getGroup(groupId))
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
-  }
-
-  @Test
-  def testExpireGroupWithOffsetsOnly() {
-    // verify that the group is removed properly, but no tombstone is written if
-    // this is a group which is only using kafka for offset storage
-
-    val memberId = ""
-    val generationId = -1
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
-    val offset = 37
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    // expire the offset after 1 millisecond
-    val startMs = time.milliseconds
-    val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
-      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
-
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    expectAppendMessage(Errors.NONE)
-    EasyMock.replay(replicaManager)
-
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]) {
-      commitErrors = Some(errors)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback).get
-    assertTrue(group.hasOffsets)
-
-    groupMetadataManager.store(delayedStore)
-    assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
-
-    // expire all of the offsets
-    time.sleep(4)
-
-    // expect the offset tombstone
-    EasyMock.reset(partition)
-    val recordsCapture: Capture[MemoryRecords] = EasyMock.newCapture()
-
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.capture(recordsCapture), EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
-    EasyMock.replay(partition)
-
-    groupMetadataManager.cleanupGroupMetadata()
-
-    assertTrue(recordsCapture.hasCaptured)
-
-    // verify the tombstones are correct and only for the expired offsets
-    val records = recordsCapture.getValue.records.asScala.toList
-    assertEquals(2, records.size)
-    records.foreach { message =>
-      assertTrue(message.hasKey)
-      assertFalse(message.hasValue)
-      val offsetKey = GroupMetadataManager.readMessageKey(message.key).asInstanceOf[OffsetKey]
-      assertEquals(groupId, offsetKey.key.group)
-      assertEquals("foo", offsetKey.key.topicPartition.topic)
-    }
-
-    // the full group should be gone since all offsets were removed
-    assertEquals(None, groupMetadataManager.getGroup(groupId))
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
-  }
-
-  @Test
-  def testExpireOffsetsWithActiveGroup() {
-    val memberId = "memberId"
-    val clientId = "clientId"
-    val clientHost = "localhost"
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
-    val offset = 37
-
-    groupMetadataManager.addPartitionOwnership(groupPartitionId)
-
-    val group = new GroupMetadata(groupId)
-    groupMetadataManager.addGroup(group)
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
-      protocolType, List(("protocol", Array[Byte]())))
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
-    group.transitionTo(PreparingRebalance)
-    group.initNextGeneration()
-
-    // expire the offset after 1 millisecond
-    val startMs = time.milliseconds
-    val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
-      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
-
-    EasyMock.expect(replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId))).andStubReturn(Some(partition))
-    expectAppendMessage(Errors.NONE)
-    EasyMock.replay(replicaManager)
-
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]) {
-      commitErrors = Some(errors)
-    }
-
-    val delayedStore = groupMetadataManager.prepareStoreOffsets(group, memberId, group.generationId, offsets, callback).get
-    assertTrue(group.hasOffsets)
-
-    groupMetadataManager.store(delayedStore)
-    assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
-
-    // expire all of the offsets
-    time.sleep(4)
-
-    // expect the offset tombstone
-    EasyMock.reset(partition)
-    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]), EasyMock.anyInt()))
-      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
-    EasyMock.replay(partition)
-
-    groupMetadataManager.cleanupGroupMetadata()
-
-    // group should still be there, but the offsets should be gone
-    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(None, group.offset(topicPartition2))
-
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
-  }
-
-  private def expectAppendMessage(error: Errors) {
-    val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
-    EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
-      EasyMock.anyShort(),
-      EasyMock.anyBoolean(),
-      EasyMock.anyObject().asInstanceOf[Map[TopicPartition, MemoryRecords]],
-      EasyMock.capture(capturedArgument))).andAnswer(new IAnswer[Unit] {
-      override def answer = capturedArgument.getValue.apply(
-        Map(new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId) ->
-          new PartitionResponse(error, 0L, RecordBatch.NO_TIMESTAMP)
-        )
-      )})
-    EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-  }
-
-  private def buildStableGroupRecordWithMember(memberId: String): SimpleRecord = {
-    val group = new GroupMetadata(groupId)
-    group.transitionTo(PreparingRebalance)
-    val memberProtocols = List(("roundrobin", Array.emptyByteArray))
-    val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, "consumer", memberProtocols)
-    group.add(member)
-    member.awaitingJoinCallback = _ => {}
-    group.initNextGeneration()
-    group.transitionTo(Stable)
-
-    val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
-    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
-    new SimpleRecord(groupMetadataKey, groupMetadataValue)
-  }
-
-  private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
-                                      startOffset: Long,
-                                      records: MemoryRecords): Unit = {
-    val endOffset = startOffset + records.records.asScala.size
-    val logMock =  EasyMock.mock(classOf[Log])
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
-
-    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
-    EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
-    EasyMock.expect(replicaManager.getHighWatermark(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
-    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true)))
-      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
-    EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
-      .andReturn(records.buffer)
-
-    EasyMock.replay(logMock, fileRecordsMock)
-  }
-
-  private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
-                                           groupId: String = groupId): Seq[SimpleRecord] = {
-    committedOffsets.map { case (topicPartition, offset) =>
-      val offsetAndMetadata = OffsetAndMetadata(offset)
-      val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
-      val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
-      new SimpleRecord(offsetCommitKey, offsetCommitValue)
-    }.toSeq
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
deleted file mode 100644
index 3db7818..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ /dev/null
@@ -1,360 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.coordinator
-
-import kafka.common.OffsetAndMetadata
-import org.apache.kafka.common.TopicPartition
-import org.junit.Assert._
-import org.junit.{Before, Test}
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test group state transitions and other GroupMetadata functionality
- */
-class GroupMetadataTest extends JUnitSuite {
-  private val protocolType = "consumer"
-  private val groupId = "groupId"
-  private val clientId = "clientId"
-  private val clientHost = "clientHost"
-  private val rebalanceTimeoutMs = 60000
-  private val sessionTimeoutMs = 10000
-
-  private var group: GroupMetadata = null
-
-  @Before
-  def setUp() {
-    group = new GroupMetadata("groupId")
-  }
-
-  @Test
-  def testCanRebalanceWhenStable() {
-    assertTrue(group.canRebalance)
-  }
-
-  @Test
-  def testCanRebalanceWhenAwaitingSync(){
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
-    assertTrue(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenPreparingRebalance() {
-    group.transitionTo(PreparingRebalance)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testCannotRebalanceWhenDead() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Empty)
-    group.transitionTo(Dead)
-    assertFalse(group.canRebalance)
-  }
-
-  @Test
-  def testStableToPreparingRebalanceTransition() {
-    group.transitionTo(PreparingRebalance)
-    assertState(group, PreparingRebalance)
-  }
-
-  @Test
-  def testStableToDeadTransition() {
-    group.transitionTo(Dead)
-    assertState(group, Dead)
-  }
-
-  @Test
-  def testAwaitingSyncToPreparingRebalanceTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
-    group.transitionTo(PreparingRebalance)
-    assertState(group, PreparingRebalance)
-  }
-
-  @Test
-  def testPreparingRebalanceToDeadTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    assertState(group, Dead)
-  }
-
-  @Test
-  def testPreparingRebalanceToEmptyTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Empty)
-    assertState(group, Empty)
-  }
-
-  @Test
-  def testEmptyToDeadTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Empty)
-    group.transitionTo(Dead)
-    assertState(group, Dead)
-  }
-
-  @Test
-  def testAwaitingSyncToStableTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
-    group.transitionTo(Stable)
-    assertState(group, Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToStableIllegalTransition() {
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testStableToAwaitingSyncIllegalTransition() {
-    group.transitionTo(AwaitingSync)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testPreparingRebalanceToPreparingRebalanceIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testPreparingRebalanceToStableIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testAwaitingSyncToAwaitingSyncIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(AwaitingSync)
-    group.transitionTo(AwaitingSync)
-  }
-
-  def testDeadToDeadIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Dead)
-    assertState(group, Dead)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToStableIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(Stable)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToPreparingRebalanceIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(PreparingRebalance)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testDeadToAwaitingSyncIllegalTransition() {
-    group.transitionTo(PreparingRebalance)
-    group.transitionTo(Dead)
-    group.transitionTo(AwaitingSync)
-  }
-
-  @Test
-  def testSelectProtocol() {
-    val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
-
-    group.add(member)
-    assertEquals("range", group.selectProtocol)
-
-    val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
-      sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
-
-    group.add(otherMember)
-    // now could be either range or robin since there is no majority preference
-    assertTrue(Set("range", "roundrobin")(group.selectProtocol))
-
-    val lastMemberId = "lastMemberId"
-    val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
-      sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
-
-    group.add(lastMember)
-    // now we should prefer 'roundrobin'
-    assertEquals("roundrobin", group.selectProtocol)
-  }
-
-  @Test(expected = classOf[IllegalStateException])
-  def testSelectProtocolRaisesIfNoMembers() {
-    group.selectProtocol
-    fail()
-  }
-
-  @Test
-  def testSelectProtocolChoosesCompatibleProtocol() {
-    val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
-
-    val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
-      sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
-
-    group.add(member)
-    group.add(otherMember)
-    assertEquals("roundrobin", group.selectProtocol)
-  }
-
-  @Test
-  def testSupportsProtocols() {
-    // by default, the group supports everything
-    assertTrue(group.supportsProtocols(Set("roundrobin", "range")))
-
-    val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
-      sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
-
-    group.add(member)
-    assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
-    assertTrue(group.supportsProtocols(Set("range", "foo")))
-    assertFalse(group.supportsProtocols(Set("foo", "bar")))
-
-    val otherMemberId = "otherMemberId"
-    val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
-      sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
-
-    group.add(otherMember)
-
-    assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
-    assertFalse(group.supportsProtocols(Set("range", "foo")))
-  }
-
-  @Test
-  def testInitNextGeneration() {
-    val memberId = "memberId"
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, List(("roundrobin", Array.empty[Byte])))
-
-    group.transitionTo(PreparingRebalance)
-    member.awaitingJoinCallback = _ => ()
-    group.add(member)
-
-    assertEquals(0, group.generationId)
-    assertNull(group.protocol)
-
-    group.initNextGeneration()
-
-    assertEquals(1, group.generationId)
-    assertEquals("roundrobin", group.protocol)
-  }
-
-  @Test
-  def testInitNextGenerationEmptyGroup() {
-    assertEquals(Empty, group.currentState)
-    assertEquals(0, group.generationId)
-    assertNull(group.protocol)
-
-    group.transitionTo(PreparingRebalance)
-    group.initNextGeneration()
-
-    assertEquals(1, group.generationId)
-    assertNull(group.protocol)
-  }
-
-  @Test
-  def testOffsetCommit(): Unit = {
-    val partition = new TopicPartition("foo", 0)
-    val offset = OffsetAndMetadata(37)
-
-    group.prepareOffsetCommit(Map(partition -> offset))
-    assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-
-    group.completePendingOffsetWrite(partition, offset)
-    assertTrue(group.hasOffsets)
-    assertEquals(Some(offset), group.offset(partition))
-  }
-
-  @Test
-  def testOffsetCommitFailure(): Unit = {
-    val partition = new TopicPartition("foo", 0)
-    val offset = OffsetAndMetadata(37)
-
-    group.prepareOffsetCommit(Map(partition -> offset))
-    assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-
-    group.failPendingOffsetWrite(partition, offset)
-    assertFalse(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-  }
-
-  @Test
-  def testOffsetCommitFailureWithAnotherPending(): Unit = {
-    val partition = new TopicPartition("foo", 0)
-    val firstOffset = OffsetAndMetadata(37)
-    val secondOffset = OffsetAndMetadata(57)
-
-    group.prepareOffsetCommit(Map(partition -> firstOffset))
-    assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-
-    group.prepareOffsetCommit(Map(partition -> secondOffset))
-    assertTrue(group.hasOffsets)
-
-    group.failPendingOffsetWrite(partition, firstOffset)
-    assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-
-    group.completePendingOffsetWrite(partition, secondOffset)
-    assertTrue(group.hasOffsets)
-    assertEquals(Some(secondOffset), group.offset(partition))
-  }
-
-  @Test
-  def testOffsetCommitWithAnotherPending(): Unit = {
-    val partition = new TopicPartition("foo", 0)
-    val firstOffset = OffsetAndMetadata(37)
-    val secondOffset = OffsetAndMetadata(57)
-
-    group.prepareOffsetCommit(Map(partition -> firstOffset))
-    assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-
-    group.prepareOffsetCommit(Map(partition -> secondOffset))
-    assertTrue(group.hasOffsets)
-
-    group.completePendingOffsetWrite(partition, firstOffset)
-    assertTrue(group.hasOffsets)
-    assertEquals(Some(firstOffset), group.offset(partition))
-
-    group.completePendingOffsetWrite(partition, secondOffset)
-    assertTrue(group.hasOffsets)
-    assertEquals(Some(secondOffset), group.offset(partition))
-  }
-
-  private def assertState(group: GroupMetadata, targetState: GroupState) {
-    val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead)
-    val otherStates = states - targetState
-    otherStates.foreach { otherState =>
-      assertFalse(group.is(otherState))
-    }
-    assertTrue(group.is(targetState))
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
deleted file mode 100644
index 257dde7..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/MemberMetadataTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.coordinator
-
-import java.util
-
-import org.junit.Assert._
-import org.junit.Test
-import org.scalatest.junit.JUnitSuite
-
-class MemberMetadataTest extends JUnitSuite {
-  val groupId = "groupId"
-  val clientId = "clientId"
-  val clientHost = "clientHost"
-  val memberId = "memberId"
-  val protocolType = "consumer"
-  val rebalanceTimeoutMs = 60000
-  val sessionTimeoutMs = 10000
-
-
-  @Test
-  def testMatchesSupportedProtocols {
-    val protocols = List(("range", Array.empty[Byte]))
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, protocols)
-    assertTrue(member.matches(protocols))
-    assertFalse(member.matches(List(("range", Array[Byte](0)))))
-    assertFalse(member.matches(List(("roundrobin", Array.empty[Byte]))))
-    assertFalse(member.matches(List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))))
-  }
-
-  @Test
-  def testVoteForPreferredProtocol {
-    val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, protocols)
-    assertEquals("range", member.vote(Set("range", "roundrobin")))
-    assertEquals("roundrobin", member.vote(Set("blah", "roundrobin")))
-  }
-
-  @Test
-  def testMetadata {
-    val protocols = List(("range", Array[Byte](0)), ("roundrobin", Array[Byte](1)))
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, protocols)
-    assertTrue(util.Arrays.equals(Array[Byte](0), member.metadata("range")))
-    assertTrue(util.Arrays.equals(Array[Byte](1), member.metadata("roundrobin")))
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testMetadataRaisesOnUnsupportedProtocol {
-    val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, protocols)
-    member.metadata("blah")
-    fail()
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
-  def testVoteRaisesOnNoSupportedProtocols {
-    val protocols = List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte]))
-
-    val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs,
-      protocolType, protocols)
-    member.vote(Set("blah"))
-    fail()
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
deleted file mode 100644
index da9ec47..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/ProducerIdManagerTest.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.coordinator
-
-import kafka.common.KafkaException
-import kafka.utils.ZkUtils
-import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{After, Test}
-import org.junit.Assert._
-
-class ProducerIdManagerTest {
-
-  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-
-  @After
-  def tearDown(): Unit = {
-    EasyMock.reset(zkUtils)
-  }
-
-  @Test
-  def testGetPID() {
-    var zkVersion: Int = -1
-    var data: String = null
-    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
-      .andAnswer(new IAnswer[(Option[String], Int)] {
-        override def answer(): (Option[String], Int) = {
-          if (zkVersion == -1) {
-            (None.asInstanceOf[Option[String]], 0)
-          } else {
-            (Some(data), zkVersion)
-          }
-        }
-      })
-      .anyTimes()
-
-    val capturedVersion: Capture[Int] = EasyMock.newCapture()
-    val capturedData: Capture[String] = EasyMock.newCapture()
-    EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
-      EasyMock.capture(capturedData),
-      EasyMock.capture(capturedVersion),
-      EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
-      .andAnswer(new IAnswer[(Boolean, Int)] {
-        override def answer(): (Boolean, Int) = {
-          zkVersion = capturedVersion.getValue + 1
-          data = capturedData.getValue
-
-          (true, zkVersion)
-        }
-      })
-      .anyTimes()
-
-    EasyMock.replay(zkUtils)
-
-    val manager1: ProducerIdManager = new ProducerIdManager(0, zkUtils)
-    val manager2: ProducerIdManager = new ProducerIdManager(1, zkUtils)
-
-    val pid1 = manager1.nextPid()
-    val pid2 = manager2.nextPid()
-
-    assertEquals(0, pid1)
-    assertEquals(ProducerIdManager.PidBlockSize, pid2)
-
-    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid1 + i, manager1.nextPid())
-    }
-
-    for (i <- 1 until ProducerIdManager.PidBlockSize.asInstanceOf[Int]) {
-      assertEquals(pid2 + i, manager2.nextPid())
-    }
-
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize, manager1.nextPid())
-    assertEquals(pid2 + ProducerIdManager.PidBlockSize * 2, manager2.nextPid())
-  }
-
-  @Test(expected = classOf[KafkaException])
-  def testExceedPIDLimit() {
-    EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
-      .andAnswer(new IAnswer[(Option[String], Int)] {
-        override def answer(): (Option[String], Int) = {
-          (Some(ProducerIdManager.generatePidBlockJson(ProducerIdBlock(0,
-            Long.MaxValue - ProducerIdManager.PidBlockSize,
-            Long.MaxValue))), 0)
-        }
-      })
-      .anyTimes()
-    EasyMock.replay(zkUtils)
-    new ProducerIdManager(0, zkUtils)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/f69d9415/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
deleted file mode 100644
index f8ef5dc..0000000
--- a/core/src/test/scala/unit/kafka/coordinator/TransactionCoordinatorTest.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  *    http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-package kafka.coordinator
-
-import kafka.utils.ZkUtils
-import org.apache.kafka.common.protocol.Errors
-import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
-
-class TransactionCoordinatorTest {
-
-  val zkUtils: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
-
-  var zkVersion: Int = -1
-  var data: String = null
-  val capturedVersion: Capture[Int] = EasyMock.newCapture()
-  val capturedData: Capture[String] = EasyMock.newCapture()
-  EasyMock.expect(zkUtils.readDataAndVersionMaybeNull(EasyMock.anyString()))
-    .andAnswer(new IAnswer[(Option[String], Int)] {
-      override def answer(): (Option[String], Int) = {
-        if (zkVersion == -1) {
-          (None.asInstanceOf[Option[String]], 0)
-        } else {
-          (Some(data), zkVersion)
-        }
-      }
-    })
-    .anyTimes()
-
-  EasyMock.expect(zkUtils.conditionalUpdatePersistentPath(EasyMock.anyString(),
-    EasyMock.capture(capturedData),
-    EasyMock.capture(capturedVersion),
-    EasyMock.anyObject().asInstanceOf[Option[(ZkUtils, String, String) => (Boolean,Int)]]))
-    .andAnswer(new IAnswer[(Boolean, Int)] {
-      override def answer(): (Boolean, Int) = {
-        zkVersion = capturedVersion.getValue + 1
-        data = capturedData.getValue
-
-        (true, zkVersion)
-      }
-    })
-    .anyTimes()
-
-  EasyMock.replay(zkUtils)
-
-  val pidManager: ProducerIdManager = new ProducerIdManager(0, zkUtils)
-  val coordinator: TransactionCoordinator = new TransactionCoordinator(0, pidManager)
-
-  var result: InitPidResult = null
-
-  @Before
-  def setUp(): Unit = {
-    coordinator.startup()
-  }
-
-  @After
-  def tearDown(): Unit = {
-    EasyMock.reset(zkUtils)
-    coordinator.shutdown()
-  }
-
-  @Test
-  def testHandleInitPid() = {
-    coordinator.handleInitPid("", initPidMockCallback)
-    assertEquals(InitPidResult(0L, 0, Errors.NONE), result)
-
-    coordinator.handleInitPid("", initPidMockCallback)
-    assertEquals(InitPidResult(1L, 0, Errors.NONE), result)
-
-    coordinator.handleInitPid(null, initPidMockCallback)
-    assertEquals(InitPidResult(2L, 0, Errors.NONE), result)
-  }
-
-  def initPidMockCallback(ret: InitPidResult): Unit = {
-    result = ret
-  }
-}


Mime
View raw message