kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1740: merge offset manager into consumer coordinator; reviewed by Onur Karaman and Jason Gustafson
Date Thu, 02 Jul 2015 18:42:01 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 14e0ce0a4 -> 3f8480ccf


KAFKA-1740: merge offset manager into consumer coordinator; reviewed by Onur Karaman and Jason Gustafson


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3f8480cc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3f8480cc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3f8480cc

Branch: refs/heads/trunk
Commit: 3f8480ccfb011eb43da774737597c597f703e11b
Parents: 14e0ce0
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Thu Jul 2 11:41:51 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Thu Jul 2 11:41:51 2015 -0700

----------------------------------------------------------------------
 .../clients/consumer/internals/Coordinator.java |  27 ++-
 .../apache/kafka/common/protocol/Errors.java    |   6 +-
 .../common/requests/OffsetCommitResponse.java   |   8 +-
 .../common/requests/OffsetFetchRequest.java     |   3 -
 .../common/requests/OffsetFetchResponse.java    |   5 +-
 .../consumer/internals/CoordinatorTest.java     |   7 -
 .../main/scala/kafka/admin/TopicCommand.scala   |   4 +-
 .../main/scala/kafka/cluster/Partition.scala    |  16 +-
 .../kafka/common/OffsetMetadataAndError.scala   |  14 +-
 core/src/main/scala/kafka/common/Topic.scala    |   4 +-
 .../kafka/coordinator/ConsumerCoordinator.scala | 170 +++++++++++++++++--
 .../kafka/coordinator/CoordinatorMetadata.scala |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  69 +++++---
 .../main/scala/kafka/server/KafkaServer.scala   |  26 +--
 .../main/scala/kafka/server/OffsetManager.scala |  52 ++----
 .../scala/kafka/server/ReplicaManager.scala     |  95 +++++++----
 .../integration/kafka/api/ConsumerTest.scala    |   7 +-
 .../kafka/api/IntegrationTestHarness.scala      |   9 +-
 .../unit/kafka/admin/TopicCommandTest.scala     |   8 +-
 .../unit/kafka/consumer/TopicFilterTest.scala   |   9 +-
 .../ConsumerCoordinatorResponseTest.scala       |   9 +-
 .../coordinator/CoordinatorMetadataTest.scala   |   2 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  10 +-
 23 files changed, 357 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 68b4cb1..c1c8172 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -290,9 +290,10 @@ public final class Coordinator {
                         // re-discover the coordinator and retry
                         coordinatorDead();
                         future.retryWithNewCoordinator();
-                    } else if (data.errorCode == Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) {
-                        // just ignore this partition
-                        log.debug("Unknown topic or partition for " + tp);
+                    } else if (data.errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+                            || data.errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                        // need to re-join group
+                        subscriptions.needReassignment();
                     } else {
                         future.raise(new KafkaException("Unexpected error in fetch offset response: "
                                 + Errors.forCode(data.errorCode).exception().getMessage()));
@@ -499,13 +500,23 @@ public final class Coordinator {
                             || errorCode == Errors.NOT_COORDINATOR_FOR_CONSUMER.code()) {
                         coordinatorDead();
                         future.retryWithNewCoordinator();
-                    } else {
+                    } else if (errorCode == Errors.OFFSET_METADATA_TOO_LARGE.code()
+                            || errorCode == Errors.INVALID_COMMIT_OFFSET_SIZE.code()) {
                         // do not need to throw the exception but just log the error
-                        future.retryAfterBackoff();
                         log.error("Error committing partition {} at offset {}: {}",
-                            tp,
-                            offset,
-                            Errors.forCode(errorCode).exception().getMessage());
+                                tp,
+                                offset,
+                                Errors.forCode(errorCode).exception().getMessage());
+                    } else if (errorCode == Errors.UNKNOWN_CONSUMER_ID.code()
+                            || errorCode == Errors.ILLEGAL_GENERATION.code()) {
+                        // need to re-join group
+                        subscriptions.needReassignment();
+                    } else {
+                        // re-throw the exception as these should not happen
+                        log.error("Error committing partition {} at offset {}: {}",
+                                tp,
+                                offset,
+                                Errors.forCode(errorCode).exception().getMessage());
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 5b898c8..4c0ecc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -77,7 +77,11 @@ public enum Errors {
     UNKNOWN_CONSUMER_ID(25,
             new ApiException("The coordinator is not aware of this consumer.")),
     INVALID_SESSION_TIMEOUT(26,
-            new ApiException("The session timeout is not within an acceptable range."));
+            new ApiException("The session timeout is not within an acceptable range.")),
+    COMMITTING_PARTITIONS_NOT_ASSIGNED(27,
+            new ApiException("Some of the committing partitions are not assigned the committer")),
+    INVALID_COMMIT_OFFSET_SIZE(28,
+            new ApiException("The committing offset data size is not valid"));
 
     private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
     private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 70844d6..a163333 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -41,7 +41,13 @@ public class OffsetCommitResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     * TODO
+     * OFFSET_METADATA_TOO_LARGE (12)
+     * CONSUMER_COORDINATOR_NOT_AVAILABLE (15)
+     * NOT_COORDINATOR_FOR_CONSUMER (16)
+     * ILLEGAL_GENERATION (22)
+     * UNKNOWN_CONSUMER_ID (25)
+     * COMMITTING_PARTITIONS_NOT_ASSIGNED (27)
+     * INVALID_COMMIT_OFFSET_SIZE (28)
      */
 
     private final Map<TopicPartition, Short> responseData;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
index b5e8a0f..6ee7597 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
@@ -42,9 +42,6 @@ public class OffsetFetchRequest extends AbstractRequest {
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";
 
-    public static final int DEFAULT_GENERATION_ID = -1;
-    public static final String DEFAULT_CONSUMER_ID = "";
-
     private final String groupId;
     private final List<TopicPartition> partitions;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 512a0ef..3dc8521 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -47,10 +47,11 @@ public class OffsetFetchResponse extends AbstractRequestResponse {
     /**
      * Possible error code:
      *
-     *  UNKNOWN_TOPIC_OR_PARTITION (3)
+     *  UNKNOWN_TOPIC_OR_PARTITION (3)  <- only for request v0
      *  OFFSET_LOAD_IN_PROGRESS (14)
      *  NOT_COORDINATOR_FOR_CONSUMER (16)
-     *  NO_OFFSETS_FETCHABLE (23)
+     *  ILLEGAL_GENERATION (22)
+     *  UNKNOWN_CONSUMER_ID (25)
      */
 
     private final Map<TopicPartition, PartitionData> responseData;

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
index 613b192..d085fe5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
@@ -333,13 +333,6 @@ public class CoordinatorTest {
         assertTrue(result.isDone());
         assertTrue(result.value().isEmpty());
 
-        // fetch with offset topic unknown
-        client.prepareResponse(offsetFetchResponse(tp, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), "", 100L));
-        result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());
-        client.poll(0, time.milliseconds());
-        assertTrue(result.isDone());
-        assertTrue(result.value().isEmpty());
-
         // fetch with offset -1
         client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L));
         result = coordinator.fetchOffsets(Collections.singleton(tp), time.milliseconds());

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index dacbdd0..a2ecb96 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -27,8 +27,8 @@ import scala.collection._
 import scala.collection.JavaConversions._
 import kafka.log.LogConfig
 import kafka.consumer.Whitelist
-import kafka.server.OffsetManager
 import org.apache.kafka.common.utils.Utils
+import kafka.coordinator.ConsumerCoordinator
 
 
 object TopicCommand {
@@ -111,7 +111,7 @@ object TopicCommand {
         println("Updated config for topic \"%s\".".format(topic))
       }
       if(opts.options.has(opts.partitionsOpt)) {
-        if (topic == OffsetManager.OffsetsTopicName) {
+        if (topic == ConsumerCoordinator.OffsetsTopicName) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
         }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 0990938..2649090 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,7 +22,7 @@ import kafka.utils.CoreUtils.{inReadLock,inWriteLock}
 import kafka.admin.AdminUtils
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, OffsetManager, LogReadResult, ReplicaManager}
+import kafka.server.{TopicPartitionOperationKey, LogOffsetMetadata, LogReadResult, ReplicaManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import kafka.message.ByteBufferMessageSet
@@ -160,8 +160,7 @@ class Partition(val topic: String,
    *  and setting the new leader and ISR
    */
   def makeLeader(controllerId: Int,
-                 partitionStateInfo: PartitionStateInfo, correlationId: Int,
-                 offsetManager: OffsetManager): Boolean = {
+                 partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -186,8 +185,6 @@ class Partition(val topic: String,
         if (r.brokerId != localBrokerId) r.updateLogReadResult(LogReadResult.UnknownLogReadResult))
       // we may need to increment high watermark since ISR could be down to 1
       maybeIncrementLeaderHW(newLeaderReplica)
-      if (topic == OffsetManager.OffsetsTopicName)
-        offsetManager.loadOffsetsFromLog(partitionId)
       true
     }
   }
@@ -198,7 +195,7 @@ class Partition(val topic: String,
    */
   def makeFollower(controllerId: Int,
                    partitionStateInfo: PartitionStateInfo,
-                   correlationId: Int, offsetManager: OffsetManager): Boolean = {
+                   correlationId: Int): Boolean = {
     inWriteLock(leaderIsrUpdateLock) {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -215,13 +212,6 @@ class Partition(val topic: String,
       leaderEpoch = leaderAndIsr.leaderEpoch
       zkVersion = leaderAndIsr.zkVersion
 
-      leaderReplicaIdOpt.foreach { leaderReplica =>
-        if (topic == OffsetManager.OffsetsTopicName &&
-           /* if we are making a leader->follower transition */
-           leaderReplica == localBrokerId)
-          offsetManager.removeOffsetsFromCacheForPartition(partitionId)
-      }
-
       if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
         false
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 6b4242c..deb48b1 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -17,6 +17,8 @@
 
 package kafka.common
 
+import org.apache.kafka.common.protocol.Errors
+
 case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
   override def toString = "OffsetMetadata[%d,%s]"
     .format(offset,
@@ -51,7 +53,7 @@ object OffsetAndMetadata {
   def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
 }
 
-case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = ErrorMapping.NoError) {
+case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short = Errors.NONE.code) {
   def offset = offsetMetadata.offset
 
   def metadata = offsetMetadata.metadata
@@ -60,10 +62,12 @@ case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Short =
 }
 
 object OffsetMetadataAndError {
-  val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NoError)
-  val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.OffsetsLoadInProgressCode)
-  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.UnknownTopicOrPartitionCode)
-  val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
+  val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE.code)
+  val OffsetsLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.OFFSET_LOAD_IN_PROGRESS.code)
+  val UnknownConsumer = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_CONSUMER_ID.code)
+  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
+  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+  val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION.code)
 
   def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), ErrorMapping.NoError)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index ad75978..32595d6 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,7 +18,7 @@
 package kafka.common
 
 import util.matching.Regex
-import kafka.server.OffsetManager
+import kafka.coordinator.ConsumerCoordinator
 
 
 object Topic {
@@ -26,7 +26,7 @@ object Topic {
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 
-  val InternalTopics = Set(OffsetManager.OffsetsTopicName)
+  val InternalTopics = Set(ConsumerCoordinator.OffsetsTopicName)
 
   def validate(topic: String) {
     if (topic.length <= 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index a385adb..476973b 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -16,7 +16,9 @@
  */
 package kafka.coordinator
 
-import kafka.common.TopicAndPartition
+import kafka.common.{OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
+import kafka.message.UncompressedCodec
+import kafka.log.LogConfig
 import kafka.server._
 import kafka.utils._
 import org.apache.kafka.common.protocol.Errors
@@ -24,7 +26,11 @@ import org.apache.kafka.common.requests.JoinGroupRequest
 
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
+import java.util.Properties
+import scala.collection.{Map, Seq, immutable}
 
+case class GroupManagerConfig(consumerMinSessionTimeoutMs: Int,
+                              consumerMaxSessionTimeoutMs: Int)
 
 /**
  * ConsumerCoordinator handles consumer group and consumer offset management.
@@ -33,11 +39,13 @@ import java.util.concurrent.atomic.AtomicBoolean
  * consumer groups. Consumer groups are assigned to coordinators based on their
  * group names.
  */
-class ConsumerCoordinator(val config: KafkaConfig,
-                          val zkClient: ZkClient,
-                          val offsetManager: OffsetManager) extends Logging {
+class ConsumerCoordinator(val brokerId: Int,
+                          val groupConfig: GroupManagerConfig,
+                          val offsetConfig: OffsetManagerConfig,
+                          private val offsetManager: OffsetManager,
+                          zkClient: ZkClient) extends Logging {
 
-  this.logIdent = "[ConsumerCoordinator " + config.brokerId + "]: "
+  this.logIdent = "[ConsumerCoordinator " + brokerId + "]: "
 
   private val isActive = new AtomicBoolean(false)
 
@@ -45,6 +53,22 @@ class ConsumerCoordinator(val config: KafkaConfig,
   private var rebalancePurgatory: DelayedOperationPurgatory[DelayedRebalance] = null
   private var coordinatorMetadata: CoordinatorMetadata = null
 
+  def this(brokerId: Int,
+           groupConfig: GroupManagerConfig,
+           offsetConfig: OffsetManagerConfig,
+           replicaManager: ReplicaManager,
+           zkClient: ZkClient,
+           scheduler: KafkaScheduler) = this(brokerId, groupConfig, offsetConfig,
+    new OffsetManager(offsetConfig, replicaManager, zkClient, scheduler), zkClient)
+
+  def offsetsTopicConfigs: Properties = {
+    val props = new Properties
+    props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+    props.put(LogConfig.SegmentBytesProp, offsetConfig.offsetsTopicSegmentBytes.toString)
+    props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
+    props
+  }
+
   /**
    * NOTE: If a group lock and metadataLock are simultaneously needed,
    * be sure to acquire the group lock before metadataLock to prevent deadlock
@@ -55,9 +79,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
    */
   def startup() {
     info("Starting up.")
-    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
-    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", config.brokerId)
-    coordinatorMetadata = new CoordinatorMetadata(config, zkClient, maybePrepareRebalance)
+    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", brokerId)
+    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance]("Rebalance", brokerId)
+    coordinatorMetadata = new CoordinatorMetadata(brokerId, zkClient, maybePrepareRebalance)
     isActive.set(true)
     info("Startup complete.")
   }
@@ -69,6 +93,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
   def shutdown() {
     info("Shutting down.")
     isActive.set(false)
+    offsetManager.shutdown()
     coordinatorMetadata.shutdown()
     heartbeatPurgatory.shutdown()
     rebalancePurgatory.shutdown()
@@ -87,7 +112,8 @@ class ConsumerCoordinator(val config: KafkaConfig,
       responseCallback(Set.empty, consumerId, 0, Errors.NOT_COORDINATOR_FOR_CONSUMER.code)
     } else if (!PartitionAssignor.strategies.contains(partitionAssignmentStrategy)) {
       responseCallback(Set.empty, consumerId, 0, Errors.UNKNOWN_PARTITION_ASSIGNMENT_STRATEGY.code)
-    } else if (sessionTimeoutMs < config.consumerMinSessionTimeoutMs || sessionTimeoutMs > config.consumerMaxSessionTimeoutMs) {
+    } else if (sessionTimeoutMs < groupConfig.consumerMinSessionTimeoutMs ||
+               sessionTimeoutMs > groupConfig.consumerMaxSessionTimeoutMs) {
       responseCallback(Set.empty, consumerId, 0, Errors.INVALID_SESSION_TIMEOUT.code)
     } else {
       // only try to create the group if the group is not unknown AND
@@ -196,6 +222,75 @@ class ConsumerCoordinator(val config: KafkaConfig,
     }
   }
 
+  def handleCommitOffsets(groupId: String,
+                          consumerId: String,
+                          generationId: Int,
+                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+    if (!isActive.get) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.code))
+    } else if (!isCoordinatorForGroup(groupId)) {
+      responseCallback(offsetMetadata.mapValues(_ => Errors.NOT_COORDINATOR_FOR_CONSUMER.code))
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        // if the group does not exist, it means this group is not relying
+        // on Kafka for partition management, and hence never send join-group
+        // request to the coordinator before; in this case blindly commit the offsets
+        offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
+      } else {
+        group synchronized {
+          if (group.is(Dead)) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
+          } else if (!group.has(consumerId)) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_CONSUMER_ID.code))
+          } else if (generationId != group.generationId) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION.code))
+          } else if (!offsetMetadata.keySet.subsetOf(group.get(consumerId).assignedTopicPartitions)) {
+            responseCallback(offsetMetadata.mapValues(_ => Errors.COMMITTING_PARTITIONS_NOT_ASSIGNED.code))
+          } else {
+            offsetManager.storeOffsets(groupId, consumerId, generationId, offsetMetadata, responseCallback)
+          }
+        }
+      }
+    }
+  }
+
+  def handleFetchOffsets(groupId: String,
+                         partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+    if (!isActive.get) {
+      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+    } else if (!isCoordinatorForGroup(groupId)) {
+      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+    } else {
+      val group = coordinatorMetadata.getGroup(groupId)
+      if (group == null) {
+        // if the group does not exist, it means this group is not relying
+        // on Kafka for partition management, and hence never send join-group
+        // request to the coordinator before; in this case blindly fetch the offsets
+        offsetManager.getOffsets(groupId, partitions)
+      } else {
+        group synchronized {
+          if (group.is(Dead)) {
+            partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownConsumer)}.toMap
+          } else {
+            offsetManager.getOffsets(groupId, partitions)
+          }
+        }
+      }
+    }
+  }
+
+  def handleGroupImmigration(offsetTopicPartitionId: Int) = {
+    // TODO we may need to add more logic in KAFKA-2017
+    offsetManager.loadOffsetsFromLog(offsetTopicPartitionId)
+  }
+
+  def handleGroupEmigration(offsetTopicPartitionId: Int) = {
+    // TODO we may need to add more logic in KAFKA-2017
+    offsetManager.removeOffsetsFromCacheForPartition(offsetTopicPartitionId)
+  }
+
   /**
    * Complete existing DelayedHeartbeats for the given consumer and schedule the next one
    */
@@ -246,8 +341,7 @@ class ConsumerCoordinator(val config: KafkaConfig,
 
   private def prepareRebalance(group: ConsumerGroupMetadata) {
     group.transitionTo(PreparingRebalance)
-    group.generationId += 1
-    info("Preparing to rebalance group %s generation %s".format(group.groupId, group.generationId))
+    info("Preparing to rebalance group %s with old generation %s".format(group.groupId, group.generationId))
 
     val rebalanceTimeout = group.rebalanceTimeout
     val delayedRebalance = new DelayedRebalance(this, group, rebalanceTimeout)
@@ -259,7 +353,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
     assert(group.notYetRejoinedConsumers == List.empty[ConsumerMetadata])
 
     group.transitionTo(Rebalancing)
-    info("Rebalancing group %s generation %s".format(group.groupId, group.generationId))
+    group.generationId += 1
+
+    info("Rebalancing group %s with new generation %s".format(group.groupId, group.generationId))
 
     val assignedPartitionsPerConsumer = reassignPartitions(group)
     trace("Rebalance for group %s generation %s has assigned partitions: %s"
@@ -275,8 +371,6 @@ class ConsumerCoordinator(val config: KafkaConfig,
     maybePrepareRebalance(group)
   }
 
-  private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
-
   private def reassignPartitions(group: ConsumerGroupMetadata) = {
     val assignor = PartitionAssignor.createInstance(group.partitionAssignmentStrategy)
     val topicsPerConsumer = group.topicsPerConsumer
@@ -345,8 +439,54 @@ class ConsumerCoordinator(val config: KafkaConfig,
     }
   }
 
-  def onCompleteHeartbeat() {}
+  def onCompleteHeartbeat() {
+    // TODO: add metrics for complete heartbeats
+  }
+
+  def partitionFor(group: String): Int = offsetManager.partitionFor(group)
 
   private def shouldKeepConsumerAlive(consumer: ConsumerMetadata, heartbeatDeadline: Long) =
     consumer.awaitingRebalanceCallback != null || consumer.latestHeartbeat + consumer.sessionTimeoutMs > heartbeatDeadline
+
+  private def isCoordinatorForGroup(groupId: String) = offsetManager.leaderIsLocal(offsetManager.partitionFor(groupId))
+}
+
+object ConsumerCoordinator {
+
+  val OffsetsTopicName = "__consumer_offsets"
+
+  def create(config: KafkaConfig,
+             zkClient: ZkClient,
+             replicaManager: ReplicaManager,
+             kafkaScheduler: KafkaScheduler): ConsumerCoordinator = {
+    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+      loadBufferSize = config.offsetsLoadBufferSize,
+      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
+      consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
+
+    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, replicaManager, zkClient, kafkaScheduler)
+  }
+
+  def create(config: KafkaConfig,
+             zkClient: ZkClient,
+             offsetManager: OffsetManager): ConsumerCoordinator = {
+    val offsetConfig = OffsetManagerConfig(maxMetadataSize = config.offsetMetadataMaxSize,
+      loadBufferSize = config.offsetsLoadBufferSize,
+      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
+      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
+      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
+      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
+      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
+      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
+    val groupConfig = GroupManagerConfig(consumerMinSessionTimeoutMs = config.consumerMinSessionTimeoutMs,
+      consumerMaxSessionTimeoutMs = config.consumerMaxSessionTimeoutMs)
+
+    new ConsumerCoordinator(config.brokerId, groupConfig, offsetConfig, offsetManager, zkClient)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
index 0cd5605..2920320 100644
--- a/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
@@ -32,7 +32,7 @@ import scala.collection.mutable
  * It delegates all group logic to the callers.
  */
 @threadsafe
-private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
+private[coordinator] class CoordinatorMetadata(brokerId: Int,
                                                zkClient: ZkClient,
                                                maybePrepareRebalance: ConsumerGroupMetadata => Unit) {
 
@@ -179,7 +179,7 @@ private[coordinator] class CoordinatorMetadata(config: KafkaConfig,
    * Zookeeper listener to handle topic partition changes
    */
   class TopicPartitionChangeListener extends IZkDataListener with Logging {
-    this.logIdent = "[TopicPartitionChangeListener on Coordinator " + config.brokerId + "]: "
+    this.logIdent = "[TopicPartitionChangeListener on Coordinator " + brokerId + "]: "
 
     override def handleDataChange(dataPath: String, data: Object) {
       info("Handling data change for path: %s data: %s".format(dataPath, data))

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ad6f058..18f5b5b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -37,7 +37,6 @@ import org.I0Itec.zkclient.ZkClient
  */
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
-                val offsetManager: OffsetManager,
                 val coordinator: ConsumerCoordinator,
                 val controller: KafkaController,
                 val zkClient: ZkClient,
@@ -95,8 +94,23 @@ class KafkaApis(val requestChannel: RequestChannel,
     // stop serving data to clients for the topic being deleted
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     try {
-      val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager)
-      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error)
+      // call replica manager to handle updating partitions to become leader or follower
+      val result = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
+      val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, result.responseMap, result.errorCode)
+      // for each new leader or follower, call coordinator to handle
+      // consumer group migration
+      result.updatedLeaders.foreach { case partition =>
+        if (partition.topic == ConsumerCoordinator.OffsetsTopicName)
+          coordinator.handleGroupImmigration(partition.partitionId)
+      }
+      result.updatedFollowers.foreach { case partition =>
+        partition.leaderReplicaIdOpt.foreach { leaderReplica =>
+          if (partition.topic == ConsumerCoordinator.OffsetsTopicName &&
+              leaderReplica == brokerId)
+            coordinator.handleGroupEmigration(partition.partitionId)
+        }
+      }
+
       requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, leaderAndIsrResponse)))
     } catch {
       case e: KafkaStorageException =>
@@ -142,6 +156,12 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetCommitRequest(request: RequestChannel.Request) {
     val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
 
+    // filter non-exist topics
+    val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
+      !metadataCache.contains(topicAndPartition.topic)
+    }
+    val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys)
+
     // the callback for sending an offset commit response
     def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
       commitStatus.foreach { case (topicAndPartition, errorCode) =>
@@ -154,14 +174,14 @@ class KafkaApis(val requestChannel: RequestChannel,
             topicAndPartition, ErrorMapping.exceptionNameFor(errorCode)))
         }
       }
-
-      val response = OffsetCommitResponse(commitStatus, offsetCommitRequest.correlationId)
+      val combinedCommitStatus = commitStatus ++ invalidRequestsInfo.map(_._1 -> ErrorMapping.UnknownTopicOrPartitionCode)
+      val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
       requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
     }
 
     if (offsetCommitRequest.versionId == 0) {
       // for version 0 always store offsets to ZK
-      val responseInfo = offsetCommitRequest.requestInfo.map {
+      val responseInfo = filteredRequestInfo.map {
         case (topicAndPartition, metaAndError) => {
           val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
           try {
@@ -189,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val offsetRetention =
         if (offsetCommitRequest.versionId <= 1 ||
           offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) {
-          offsetManager.config.offsetsRetentionMs
+          coordinator.offsetConfig.offsetsRetentionMs
         } else {
           offsetCommitRequest.retentionMs
         }
@@ -203,7 +223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       val currentTimestamp = SystemTime.milliseconds
       val defaultExpireTimestamp = offsetRetention + currentTimestamp
 
-      val offsetData = offsetCommitRequest.requestInfo.mapValues(offsetAndMetadata =>
+      val offsetData = filteredRequestInfo.mapValues(offsetAndMetadata =>
         offsetAndMetadata.copy(
           commitTimestamp = currentTimestamp,
           expireTimestamp = {
@@ -215,8 +235,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         )
       )
 
-      // call offset manager to store offsets
-      offsetManager.storeOffsets(
+      // call coordinator to handle commit offset
+      coordinator.handleCommitOffsets(
         offsetCommitRequest.groupId,
         offsetCommitRequest.consumerId,
         offsetCommitRequest.groupGenerationId,
@@ -422,9 +442,9 @@ class KafkaApis(val requestChannel: RequestChannel,
     if (topics.size > 0 && topicResponses.size != topics.size) {
       val nonExistentTopics = topics -- topicResponses.map(_.topic).toSet
       val responsesForNonExistentTopics = nonExistentTopics.map { topic =>
-        if (topic == OffsetManager.OffsetsTopicName || config.autoCreateTopicsEnable) {
+        if (topic == ConsumerCoordinator.OffsetsTopicName || config.autoCreateTopicsEnable) {
           try {
-            if (topic == OffsetManager.OffsetsTopicName) {
+            if (topic == ConsumerCoordinator.OffsetsTopicName) {
               val aliveBrokers = metadataCache.getAliveBrokers
               val offsetsTopicReplicationFactor =
                 if (aliveBrokers.length > 0)
@@ -433,7 +453,7 @@ class KafkaApis(val requestChannel: RequestChannel,
                   config.offsetsTopicReplicationFactor.toInt
               AdminUtils.createTopic(zkClient, topic, config.offsetsTopicPartitions,
                                      offsetsTopicReplicationFactor,
-                                     offsetManager.offsetsTopicConfig)
+                                     coordinator.offsetsTopicConfigs)
               info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
                 .format(topic, config.offsetsTopicPartitions, offsetsTopicReplicationFactor))
             }
@@ -496,26 +516,19 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
     } else {
-      // version 1 reads offsets from Kafka
-      val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
-        metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
-      )
-      val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
-      val knownStatus =
-        if (knownTopicPartitions.size > 0)
-          offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
-        else
-          Map.empty[TopicAndPartition, OffsetMetadataAndError]
-      val status = unknownStatus ++ knownStatus
+      // version 1 reads offsets from Kafka;
+      val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, offsetFetchRequest.requestInfo).toMap
 
-      OffsetFetchResponse(status, offsetFetchRequest.correlationId)
+      // Note that we do not need to filter the partitions in the
+      // metadata cache as the topic partitions will be filtered
+      // in coordinator's offset manager through the offset cache
+      OffsetFetchResponse(offsets, offsetFetchRequest.correlationId)
     }
 
     trace("Sending offset fetch response %s for correlation id %d to client %s."
           .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
 
     requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
-
   }
 
   /*
@@ -524,10 +537,10 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleConsumerMetadataRequest(request: RequestChannel.Request) {
     val consumerMetadataRequest = request.requestObj.asInstanceOf[ConsumerMetadataRequest]
 
-    val partition = offsetManager.partitionFor(consumerMetadataRequest.group)
+    val partition = coordinator.partitionFor(consumerMetadataRequest.group)
 
     // get metadata (and create the topic if necessary)
-    val offsetsTopicMetadata = getTopicMetadata(Set(OffsetManager.OffsetsTopicName), request.securityProtocol).head
+    val offsetsTopicMetadata = getTopicMetadata(Set(ConsumerCoordinator.OffsetsTopicName), request.securityProtocol).head
 
     val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode, consumerMetadataRequest.correlationId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 52dc728..18917bc 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -41,7 +41,7 @@ import kafka.common.{ErrorMapping, InconsistentBrokerIdException, GenerateBroker
 import kafka.network.{BlockingChannel, SocketServer}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
-import kafka.coordinator.ConsumerCoordinator
+import kafka.coordinator.{GroupManagerConfig, ConsumerCoordinator}
 
 /**
  * Represents the lifecycle of a single Kafka broker. Handles all functionality required
@@ -75,8 +75,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
 
   var logManager: LogManager = null
 
-  var offsetManager: OffsetManager = null
-
   var replicaManager: ReplicaManager = null
 
   var topicConfigManager: TopicConfigManager = null
@@ -157,19 +155,16 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
           replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)
           replicaManager.startup()
 
-          /* start offset manager */
-          offsetManager = createOffsetManager()
-
           /* start kafka controller */
           kafkaController = new KafkaController(config, zkClient, brokerState)
           kafkaController.startup()
 
           /* start kafka coordinator */
-          consumerCoordinator = new ConsumerCoordinator(config, zkClient, offsetManager)
+          consumerCoordinator = ConsumerCoordinator.create(config, zkClient, replicaManager, kafkaScheduler)
           consumerCoordinator.startup()
 
           /* start processing requests */
-          apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, consumerCoordinator,
+          apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator,
             kafkaController, zkClient, config.brokerId, config, metadataCache)
           requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
           brokerState.newState(RunningAsBroker)
@@ -349,8 +344,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
           CoreUtils.swallow(socketServer.shutdown())
         if(requestHandlerPool != null)
           CoreUtils.swallow(requestHandlerPool.shutdown())
-        if(offsetManager != null)
-          offsetManager.shutdown()
         CoreUtils.swallow(kafkaScheduler.shutdown())
         if(apis != null)
           CoreUtils.swallow(apis.close())
@@ -450,19 +443,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     logProps
   }
 
-  private def createOffsetManager(): OffsetManager = {
-    val offsetManagerConfig = OffsetManagerConfig(
-      maxMetadataSize = config.offsetMetadataMaxSize,
-      loadBufferSize = config.offsetsLoadBufferSize,
-      offsetsRetentionMs = config.offsetsRetentionMinutes * 60 * 1000L,
-      offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
-      offsetsTopicNumPartitions = config.offsetsTopicPartitions,
-      offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
-      offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
-      offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
-    new OffsetManager(offsetManagerConfig, replicaManager, zkClient, kafkaScheduler, metadataCache)
-  }
-
   /**
     * Generates new brokerId or reads from meta.properties based on following conditions
     * <ol>

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 5cca85c..47b6ce9 100755
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
 import org.apache.kafka.common.protocol.types.Type.STRING
 import org.apache.kafka.common.protocol.types.Type.INT32
@@ -25,19 +26,19 @@ import org.apache.kafka.common.utils.Utils
 
 import kafka.utils._
 import kafka.common._
-import kafka.log.{FileMessageSet, LogConfig}
+import kafka.log.FileMessageSet
 import kafka.message._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.common.TopicAndPartition
 import kafka.tools.MessageFormatter
 import kafka.api.ProducerResponseStatus
+import kafka.coordinator.ConsumerCoordinator
 
 import scala.Some
 import scala.collection._
 import java.io.PrintStream
 import java.util.concurrent.atomic.AtomicBoolean
 import java.nio.ByteBuffer
-import java.util.Properties
 import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
@@ -87,8 +88,7 @@ object OffsetManagerConfig {
 class OffsetManager(val config: OffsetManagerConfig,
                     replicaManager: ReplicaManager,
                     zkClient: ZkClient,
-                    scheduler: Scheduler,
-                    metadataCache: MetadataCache) extends Logging with KafkaMetricsGroup {
+                    scheduler: Scheduler) extends Logging with KafkaMetricsGroup {
 
   /* offsets and metadata cache */
   private val offsetsCache = new Pool[GroupTopicPartition, OffsetAndMetadata]
@@ -143,9 +143,9 @@ class OffsetManager(val config: OffsetManagerConfig,
       // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say,
       // if we crash or leaders move) since the new leaders will get rid of expired offsets during their own purge cycles.
       tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
-        val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+        val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
         partitionOpt.map { partition =>
-          val appendPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+          val appendPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
           val messages = tombstones.map(_._2).toSeq
 
           trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
@@ -170,14 +170,6 @@ class OffsetManager(val config: OffsetManagerConfig,
   }
 
 
-  def offsetsTopicConfig: Properties = {
-    val props = new Properties
-    props.put(LogConfig.SegmentBytesProp, config.offsetsTopicSegmentBytes.toString)
-    props.put(LogConfig.CleanupPolicyProp, "compact")
-    props.put(LogConfig.CompressionTypeProp, "uncompressed")
-    props
-  }
-
   def partitionFor(group: String): Int = Utils.abs(group.hashCode) % config.offsetsTopicNumPartitions
 
   /**
@@ -214,22 +206,14 @@ class OffsetManager(val config: OffsetManagerConfig,
   /**
    * Store offsets by appending it to the replicated log and then inserting to cache
    */
-  // TODO: generation id and consumer id is needed by coordinator to do consumer checking in the future
   def storeOffsets(groupId: String,
                    consumerId: String,
                    generationId: Int,
                    offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
                    responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
-    // check if there are any non-existent topics
-    val nonExistentTopics = offsetMetadata.filter { case (topicAndPartition, offsetMetadata) =>
-      !metadataCache.contains(topicAndPartition.topic)
-    }
-
-    // first filter out partitions with offset metadata size exceeding limit or
-    // if its a non existing topic
-    // TODO: in the future we may want to only support atomic commit and hence fail the whole commit
+    // first filter out partitions with offset metadata size exceeding limit
     val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
-      validateOffsetMetadataLength(offsetAndMetadata.metadata) || nonExistentTopics.contains(topicAndPartition)
+      validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 
     // construct the message set to append
@@ -240,7 +224,7 @@ class OffsetManager(val config: OffsetManagerConfig,
       )
     }.toSeq
 
-    val offsetTopicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, partitionFor(groupId))
+    val offsetTopicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, partitionFor(groupId))
 
     val offsetsAndMetadataMessageSet = Map(offsetTopicPartition ->
       new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, messages:_*))
@@ -271,6 +255,10 @@ class OffsetManager(val config: OffsetManagerConfig,
             ErrorMapping.ConsumerCoordinatorNotAvailableCode
           else if (status.error == ErrorMapping.NotLeaderForPartitionCode)
             ErrorMapping.NotCoordinatorForConsumerCode
+          else if (status.error == ErrorMapping.MessageSizeTooLargeCode
+                || status.error == ErrorMapping.MessageSetSizeTooLargeCode
+                || status.error == ErrorMapping.InvalidFetchSizeCode)
+            Errors.INVALID_COMMIT_OFFSET_SIZE.code
           else
             status.error
         }
@@ -278,9 +266,7 @@ class OffsetManager(val config: OffsetManagerConfig,
 
       // compute the final error codes for the commit response
       val commitStatus = offsetMetadata.map { case (topicAndPartition, offsetAndMetadata) =>
-        if (nonExistentTopics.contains(topicAndPartition))
-          (topicAndPartition, ErrorMapping.UnknownTopicOrPartitionCode)
-        else if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
+        if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
           (topicAndPartition, responseCode)
         else
           (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
@@ -338,7 +324,7 @@ class OffsetManager(val config: OffsetManagerConfig,
         debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
         topicPartitions.map { topicAndPartition =>
           val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
-          (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotOffsetManagerForGroup)
+          (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
         }.toMap
       }
     }
@@ -349,7 +335,7 @@ class OffsetManager(val config: OffsetManagerConfig,
    */
   def loadOffsetsFromLog(offsetsPartition: Int) {
 
-    val topicPartition = TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)
+    val topicPartition = TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)
 
     loadingPartitions synchronized {
       if (loadingPartitions.contains(offsetsPartition)) {
@@ -421,7 +407,7 @@ class OffsetManager(val config: OffsetManagerConfig,
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
-    val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId)
+    val partitionOpt = replicaManager.getPartition(ConsumerCoordinator.OffsetsTopicName, partitionId)
 
     val hw = partitionOpt.map { partition =>
       partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
@@ -449,7 +435,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     }
 
     if (numRemoved > 0) info("Removed %d cached offsets for %s on follower transition."
-                             .format(numRemoved, TopicAndPartition(OffsetManager.OffsetsTopicName, offsetsPartition)))
+                             .format(numRemoved, TopicAndPartition(ConsumerCoordinator.OffsetsTopicName, offsetsPartition)))
 
   }
 
@@ -461,8 +447,6 @@ class OffsetManager(val config: OffsetManagerConfig,
 
 object OffsetManager {
 
-  val OffsetsTopicName = "__consumer_offsets"
-
   private case class KeyAndValueSchemas(keySchema: Schema, valueSchema: Schema)
 
   private val CURRENT_OFFSET_SCHEMA_VERSION = 1.toShort

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 59c9bc3..795220e 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -23,19 +23,19 @@ import kafka.cluster.{BrokerEndPoint, Partition, Replica}
 import kafka.log.{LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
-import kafka.common.TopicAndPartition
 import kafka.message.{ByteBufferMessageSet, MessageSet}
+import kafka.api.ProducerResponseStatus
+import kafka.common.TopicAndPartition
+import kafka.api.PartitionFetchInfo
+
+import org.apache.kafka.common.protocol.Errors
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.{IOException, File}
 import java.util.concurrent.TimeUnit
-import org.apache.kafka.common.protocol.Errors
 
-import scala.Predef._
+import scala.Some
 import scala.collection._
-import scala.collection.mutable.HashMap
-import scala.collection.Map
-import scala.collection.Set
 
 import org.I0Itec.zkclient.ZkClient
 import com.yammer.metrics.core.Gauge
@@ -84,6 +84,17 @@ object LogReadResult {
                                            false)
 }
 
+case class BecomeLeaderOrFollowerResult(responseMap: collection.Map[(String, Int), Short],
+                                        updatedLeaders: Set[Partition],
+                                        updatedFollowers: Set[Partition],
+                                        errorCode: Short) {
+
+  override def toString = {
+    "updated leaders: [%s], updated followers: [%s], update results: [%s], global error: [%d]"
+      .format(updatedLeaders, updatedFollowers, responseMap, errorCode)
+  }
+}
+
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
 }
@@ -393,10 +404,10 @@ class ReplicaManager(val config: KafkaConfig,
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(utpe)))
           case nle: NotLeaderForPartitionException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(nle)))
-          case mtl: MessageSizeTooLargeException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtl)))
-          case mstl: MessageSetSizeTooLargeException =>
-            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstl)))
+          case mtle: MessageSizeTooLargeException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mtle)))
+          case mstle: MessageSetSizeTooLargeException =>
+            (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(mstle)))
           case imse : InvalidMessageSizeException =>
             (topicAndPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(imse)))
           case t: Throwable =>
@@ -416,7 +427,7 @@ class ReplicaManager(val config: KafkaConfig,
   def fetchMessages(timeout: Long,
                     replicaId: Int,
                     fetchMinBytes: Int,
-                    fetchInfo: Map[TopicAndPartition, PartitionFetchInfo],
+                    fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo],
                     responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
 
     val isFromFollower = replicaId >= 0
@@ -544,30 +555,29 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest,
-                             offsetManager: OffsetManager): (collection.Map[(String, Int), Short], Short) = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): BecomeLeaderOrFollowerResult = {
     leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
       stateChangeLogger.trace("Broker %d received LeaderAndIsr request %s correlation id %d from controller %d epoch %d for partition [%s,%d]"
                                 .format(localBrokerId, stateInfo, leaderAndISRRequest.correlationId,
                                         leaderAndISRRequest.controllerId, leaderAndISRRequest.controllerEpoch, topic, partition))
     }
     replicaStateChangeLock synchronized {
-      val responseMap = new collection.mutable.HashMap[(String, Int), Short]
-      if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
+      val responseMap = new mutable.HashMap[(String, Int), Short]
+      if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
         leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
         stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
           "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
           leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
         }
-        (responseMap, ErrorMapping.StaleControllerEpochCode)
+        BecomeLeaderOrFollowerResult(responseMap, Set.empty[Partition], Set.empty[Partition], ErrorMapping.StaleControllerEpochCode)
       } else {
         val controllerId = leaderAndISRRequest.controllerId
         val correlationId = leaderAndISRRequest.correlationId
         controllerEpoch = leaderAndISRRequest.controllerEpoch
 
         // First check partition's leader epoch
-        val partitionState = new HashMap[Partition, PartitionStateInfo]()
-        leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
+        val partitionState = new mutable.HashMap[Partition, PartitionStateInfo]()
+        leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partitionId), partitionStateInfo) =>
           val partition = getOrCreatePartition(topic, partitionId)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
           // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
@@ -591,14 +601,19 @@ class ReplicaManager(val config: KafkaConfig,
           }
         }
 
-        val partitionsTobeLeader = partitionState
-          .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
+        val partitionsTobeLeader = partitionState.filter { case (partition, partitionStateInfo) =>
+          partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId
+        }
         val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
 
-        if (!partitionsTobeLeader.isEmpty)
-          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap, offsetManager)
-        if (!partitionsToBeFollower.isEmpty)
-          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap, offsetManager)
+        val partitionsBecomeLeader = if (!partitionsTobeLeader.isEmpty)
+          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
+        else
+          Set.empty[Partition]
+        val partitionsBecomeFollower = if (!partitionsToBeFollower.isEmpty)
+          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+        else
+          Set.empty[Partition]
 
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding weird race conditions
@@ -607,7 +622,7 @@ class ReplicaManager(val config: KafkaConfig,
           hwThreadInitialized = true
         }
         replicaFetcherManager.shutdownIdleFetcherThreads()
-        (responseMap, ErrorMapping.NoError)
+        BecomeLeaderOrFollowerResult(responseMap, partitionsBecomeLeader, partitionsBecomeFollower, ErrorMapping.NoError)
       }
     }
   }
@@ -623,10 +638,11 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition caused it
    *  TODO: the above may need to be fixed later
    */
-  private def makeLeaders(controllerId: Int, epoch: Int,
+  private def makeLeaders(controllerId: Int,
+                          epoch: Int,
                           partitionState: Map[Partition, PartitionStateInfo],
-                          correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
-                          offsetManager: OffsetManager) = {
+                          correlationId: Int,
+                          responseMap: mutable.Map[(String, Int), Short]): Set[Partition] = {
     partitionState.foreach(state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-leader transition for partition %s")
@@ -645,7 +661,7 @@ class ReplicaManager(val config: KafkaConfig,
       }
       // Update the partition information to be the leader
       partitionState.foreach{ case (partition, partitionStateInfo) =>
-        partition.makeLeader(controllerId, partitionStateInfo, correlationId, offsetManager)}
+        partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
 
     } catch {
       case e: Throwable =>
@@ -664,6 +680,8 @@ class ReplicaManager(val config: KafkaConfig,
         "for the become-leader transition for partition %s")
         .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
     }
+
+    partitionState.keySet
   }
 
   /*
@@ -682,9 +700,12 @@ class ReplicaManager(val config: KafkaConfig,
    * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
    * the error message will be set on each partition since we do not know which partition caused it
    */
-  private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
-                            leaders: Set[BrokerEndPoint], correlationId: Int, responseMap: mutable.Map[(String, Int), Short],
-                            offsetManager: OffsetManager) {
+  private def makeFollowers(controllerId: Int,
+                            epoch: Int,
+                            partitionState: Map[Partition, PartitionStateInfo],
+                            leaders: Set[BrokerEndPoint],
+                            correlationId: Int,
+                            responseMap: mutable.Map[(String, Int), Short]) : Set[Partition] = {
     partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")
@@ -694,18 +715,18 @@ class ReplicaManager(val config: KafkaConfig,
     for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
 
-    try {
+    val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
 
-      var partitionsToMakeFollower: Set[Partition] = Set()
+    try {
 
-      // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
+      // TODO: Delete leaders from LeaderAndIsrRequest
       partitionState.foreach{ case (partition, partitionStateInfo) =>
         val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
         val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
         leaders.find(_.id == newLeaderBrokerId) match {
           // Only change partition state when the leader is available
           case Some(leaderBroker) =>
-            if (partition.makeFollower(controllerId, partitionStateInfo, correlationId, offsetManager))
+            if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
               partitionsToMakeFollower += partition
             else
               stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
@@ -775,6 +796,8 @@ class ReplicaManager(val config: KafkaConfig,
         "for the become-follower transition for partition %s")
         .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
     }
+
+    partitionsToMakeFollower
   }
 
   private def maybeShrinkIsr(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
index 17b17b9..92ffb91 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerTest.scala
@@ -25,12 +25,13 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.clients.consumer.NoOffsetForPartitionException
 
 import kafka.utils.{TestUtils, Logging}
-import kafka.server.{KafkaConfig, OffsetManager}
+import kafka.server.KafkaConfig
 
 import java.util.ArrayList
 import org.junit.Assert._
 
 import scala.collection.JavaConversions._
+import kafka.coordinator.ConsumerCoordinator
 
 
 /**
@@ -158,9 +159,9 @@ class ConsumerTest extends IntegrationTestHarness with Logging {
       consumer0.poll(50)
     
     // get metadata for the topic
-    var parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+    var parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
     while(parts == null)
-      parts = consumer0.partitionsFor(OffsetManager.OffsetsTopicName)
+      parts = consumer0.partitionsFor(ConsumerCoordinator.OffsetsTopicName)
     assertEquals(1, parts.size)
     assertNotNull(parts(0).leader())
     

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 07b1ff4..afcc349 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -26,6 +26,7 @@ import org.apache.kafka.clients.producer.KafkaProducer
 import kafka.server.{OffsetManager, KafkaConfig}
 import kafka.integration.KafkaServerTestHarness
 import scala.collection.mutable.Buffer
+import kafka.coordinator.ConsumerCoordinator
 
 /**
  * A helper class for writing integration tests that involve producers, consumers, and servers
@@ -63,11 +64,11 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
       consumers += new KafkaConsumer(consumerConfig)
 
     // create the consumer offset topic
-    TestUtils.createTopic(zkClient, OffsetManager.OffsetsTopicName,
-      serverConfig.getProperty("offsets.topic.num.partitions").toInt,
-      serverConfig.getProperty("offsets.topic.replication.factor").toInt,
+    TestUtils.createTopic(zkClient, ConsumerCoordinator.OffsetsTopicName,
+      serverConfig.getProperty(KafkaConfig.OffsetsTopicPartitionsProp).toInt,
+      serverConfig.getProperty(KafkaConfig.OffsetsTopicReplicationFactorProp).toInt,
       servers,
-      servers(0).offsetManager.offsetsTopicConfig)
+      servers(0).consumerCoordinator.offsetsTopicConfigs)
   }
   
   override def tearDown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
index c7136f2..dcd6988 100644
--- a/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
@@ -22,9 +22,9 @@ import org.scalatest.junit.JUnit3Suite
 import kafka.utils.Logging
 import kafka.utils.TestUtils
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.{OffsetManager, KafkaConfig}
 import kafka.admin.TopicCommand.TopicCommandOptions
 import kafka.utils.ZkUtils
+import kafka.coordinator.ConsumerCoordinator
 
 class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
 
@@ -87,12 +87,12 @@ class TopicCommandTest extends JUnit3Suite with ZooKeeperTestHarness with Loggin
     // create the offset topic
     val createOffsetTopicOpts = new TopicCommandOptions(Array("--partitions", numPartitionsOriginal.toString,
       "--replication-factor", "1",
-      "--topic", OffsetManager.OffsetsTopicName))
+      "--topic", ConsumerCoordinator.OffsetsTopicName))
     TopicCommand.createTopic(zkClient, createOffsetTopicOpts)
 
     // try to delete the OffsetManager.OffsetsTopicName and make sure it doesn't
-    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", OffsetManager.OffsetsTopicName))
-    val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(OffsetManager.OffsetsTopicName)
+    val deleteOffsetTopicOpts = new TopicCommandOptions(Array("--topic", ConsumerCoordinator.OffsetsTopicName))
+    val deleteOffsetTopicPath = ZkUtils.getDeleteTopicPath(ConsumerCoordinator.OffsetsTopicName)
     assertFalse("Delete path for topic shouldn't exist before deletion.", zkClient.exists(deleteOffsetTopicPath))
     intercept[AdminOperationException] {
         TopicCommand.deleteTopic(zkClient, deleteOffsetTopicOpts)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
index 4f124af..4b326d0 100644
--- a/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala
@@ -22,6 +22,7 @@ import junit.framework.Assert._
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
 import kafka.server.OffsetManager
+import kafka.coordinator.ConsumerCoordinator
 
 
 class TopicFilterTest extends JUnitSuite {
@@ -37,8 +38,8 @@ class TopicFilterTest extends JUnitSuite {
 
     val topicFilter2 = new Whitelist(".+")
     assertTrue(topicFilter2.isTopicAllowed("alltopics", excludeInternalTopics = true))
-    assertFalse(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter2.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter2.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
 
     val topicFilter3 = new Whitelist("white_listed-topic.+")
     assertTrue(topicFilter3.isTopicAllowed("white_listed-topic1", excludeInternalTopics = true))
@@ -57,8 +58,8 @@ class TopicFilterTest extends JUnitSuite {
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = true))
     assertFalse(topicFilter1.isTopicAllowed("black1", excludeInternalTopics = false))
 
-    assertFalse(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = true))
-    assertTrue(topicFilter1.isTopicAllowed(OffsetManager.OffsetsTopicName, excludeInternalTopics = false))
+    assertFalse(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = true))
+    assertTrue(topicFilter1.isTopicAllowed(ConsumerCoordinator.OffsetsTopicName, excludeInternalTopics = false))
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
index a44fbd6..3cd726d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala
@@ -22,8 +22,8 @@ import java.util.concurrent.TimeUnit
 
 import junit.framework.Assert._
 import kafka.common.TopicAndPartition
-import kafka.server.{KafkaConfig, OffsetManager}
-import kafka.utils.TestUtils
+import kafka.server.{OffsetManager, ReplicaManager, KafkaConfig}
+import kafka.utils.{KafkaScheduler, TestUtils}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.JoinGroupRequest
 import org.easymock.EasyMock
@@ -45,8 +45,8 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
   val ConsumerMinSessionTimeout = 10
   val ConsumerMaxSessionTimeout = 30
   val DefaultSessionTimeout = 20
-  var offsetManager: OffsetManager = null
   var consumerCoordinator: ConsumerCoordinator = null
+  var offsetManager : OffsetManager = null
 
   @Before
   def setUp() {
@@ -54,12 +54,13 @@ class ConsumerCoordinatorResponseTest extends JUnitSuite {
     props.setProperty(KafkaConfig.ConsumerMinSessionTimeoutMsProp, ConsumerMinSessionTimeout.toString)
     props.setProperty(KafkaConfig.ConsumerMaxSessionTimeoutMsProp, ConsumerMaxSessionTimeout.toString)
     offsetManager = EasyMock.createStrictMock(classOf[OffsetManager])
-    consumerCoordinator = new ConsumerCoordinator(KafkaConfig.fromProps(props), null, offsetManager)
+    consumerCoordinator = ConsumerCoordinator.create(KafkaConfig.fromProps(props), null, offsetManager)
     consumerCoordinator.startup()
   }
 
   @After
   def tearDown() {
+    EasyMock.reset(offsetManager)
     consumerCoordinator.shutdown()
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
index 08854c5..2cbf6e2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
@@ -40,7 +40,7 @@ class CoordinatorMetadataTest extends JUnitSuite {
   def setUp() {
     val props = TestUtils.createBrokerConfig(nodeId = 0, zkConnect = "")
     zkClient = EasyMock.createStrictMock(classOf[ZkClient])
-    coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props), zkClient, null)
+    coordinatorMetadata = new CoordinatorMetadata(KafkaConfig.fromProps(props).brokerId, zkClient, null)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/3f8480cc/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 528525b..39a6852 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -120,7 +120,7 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     val fetchRequest2 = OffsetFetchRequest(group, Seq(unknownTopicAndPartition))
     val fetchResponse2 = simpleConsumer.fetchOffsets(fetchRequest2)
 
-    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
+    assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse2.requestInfo.get(unknownTopicAndPartition).get)
     assertEquals(1, fetchResponse2.requestInfo.size)
   }
 
@@ -166,14 +166,14 @@ class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
 
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
-    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
+    assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get)
 
     assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
     assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get)
 
-    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
-    assertEquals(OffsetMetadataAndError.UnknownTopicOrPartition, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get.error)
+    assertEquals(OffsetMetadataAndError.NoOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic5, 0)).get)
 
     assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
     assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)


Mime
View raw message