kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-4704; Coordinator cache loading fails if groupId is reused for offset storage after group is removed
Date Fri, 27 Jan 2017 20:30:09 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9213de8f1 -> 57f0cb299


KAFKA-4704; Coordinator cache loading fails if groupId is reused for offset storage after
group is removed

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2455 from hachikuji/KAFKA-4704


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/57f0cb29
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/57f0cb29
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/57f0cb29

Branch: refs/heads/trunk
Commit: 57f0cb29973bb40e76ed6deb64b934b4b2e0592a
Parents: 9213de8
Author: Jason Gustafson <jason@confluent.io>
Authored: Fri Jan 27 20:30:00 2017 +0000
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jan 27 20:30:00 2017 +0000

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    |   2 +-
 .../scala/kafka/coordinator/GroupMetadata.scala |   6 +-
 .../coordinator/GroupMetadataManager.scala      | 220 ++++++++++---------
 core/src/main/scala/kafka/log/Log.scala         |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  44 ++--
 .../scala/kafka/server/ReplicaManager.scala     |  12 +-
 .../coordinator/GroupMetadataManagerTest.scala  | 215 ++++++++++++++++--
 .../kafka/coordinator/GroupMetadataTest.scala   |  16 +-
 .../unit/kafka/producer/AsyncProducerTest.scala |   3 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |   5 +
 10 files changed, 358 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index c7edcef..36b0c86 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -621,7 +621,7 @@ class GroupCoordinator(val brokerId: Int,
     val member = new MemberMetadata(memberId, group.groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, protocols)
     member.awaitingJoinCallback = callback
-    group.add(member.memberId, member)
+    group.add(member)
     maybePrepareRebalance(group)
     member
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
index 0e49995..4ea5bdd 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadata.scala
@@ -158,7 +158,7 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState:
Grou
   def has(memberId: String) = members.contains(memberId)
   def get(memberId: String) = members(memberId)
 
-  def add(memberId: String, member: MemberMetadata) {
+  def add(member: MemberMetadata) {
     if (members.isEmpty)
       this.protocolType = Some(member.protocolType)
 
@@ -167,8 +167,8 @@ private[coordinator] class GroupMetadata(val groupId: String, initialState:
Grou
     assert(supportsProtocols(member.protocols))
 
     if (leaderId == null)
-      leaderId = memberId
-    members.put(memberId, member)
+      leaderId = member.memberId
+    members.put(member.memberId, member)
   }
 
   def remove(memberId: String) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 45ed77b..c66ce74 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -235,7 +235,7 @@ class GroupMetadataManager(val brokerId: Int,
       case Some((magicValue, timestampType, timestamp)) =>
         val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata)
=>
           Record.create(magicValue, timestampType, timestamp,
-            GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition.topic, topicPartition.partition),
+            GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition),
             GroupMetadataManager.offsetCommitValue(offsetAndMetadata))
         }.toSeq
 
@@ -367,115 +367,128 @@ class GroupMetadataManager(val brokerId: Int,
   /**
    * Asynchronously read the partition from the offsets topic and populate the cache
    */
-  def loadGroupsForPartition(offsetsPartition: Int,
-                             onGroupLoaded: GroupMetadata => Unit) {
+  def loadGroupsForPartition(offsetsPartition: Int, onGroupLoaded: GroupMetadata => Unit)
{
     val topicPartition = new TopicPartition(Topic.GroupMetadataTopicName, offsetsPartition)
-    scheduler.schedule(topicPartition.toString, loadGroupsAndOffsets)
 
-    def loadGroupsAndOffsets() {
-      info("Loading offsets and group metadata from " + topicPartition)
+    def doLoadGroupsAndOffsets() {
+      info(s"Loading offsets and group metadata from $topicPartition")
 
       inLock(partitionLock) {
         if (loadingPartitions.contains(offsetsPartition)) {
-          info("Offset load from %s already in progress.".format(topicPartition))
+          info(s"Offset load from $topicPartition already in progress.")
           return
         } else {
           loadingPartitions.add(offsetsPartition)
         }
       }
 
-      val startMs = time.milliseconds()
       try {
-        replicaManager.logManager.getLog(topicPartition) match {
-          case Some(log) =>
-            var currOffset = log.logSegments.head.baseOffset
-            val buffer = ByteBuffer.allocate(config.loadBufferSize)
-            // loop breaks if leader changes at any time during the load, since getHighWatermark
is -1
-            val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
-            val removedOffsets = mutable.Set[GroupTopicPartition]()
-            val loadedGroups = mutable.Map[String, GroupMetadata]()
-            val removedGroups = mutable.Set[String]()
-
-            while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get())
{
-              buffer.clear()
-              val fileRecords = log.read(currOffset, config.loadBufferSize, minOneMessage
= true).records.asInstanceOf[FileRecords]
-              fileRecords.readInto(buffer, 0)
-
-              MemoryRecords.readableRecords(buffer).deepEntries.asScala.foreach { entry =>
-                val record = entry.record
-
-                require(record.hasKey, "Offset entry key should not be null")
-                val baseKey = GroupMetadataManager.readMessageKey(record.key)
-
-                if (baseKey.isInstanceOf[OffsetKey]) {
-                  // load offset
-                  val key = baseKey.key.asInstanceOf[GroupTopicPartition]
-                  if (record.hasNullValue) {
-                    loadedOffsets.remove(key)
-                    removedOffsets.add(key)
-                  } else {
-                    val value = GroupMetadataManager.readOffsetMessageValue(record.value)
-                    loadedOffsets.put(key, value)
-                    removedOffsets.remove(key)
-                  }
+        loadGroupsAndOffsets(topicPartition, onGroupLoaded)
+      } catch {
+        case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
+      } finally {
+        inLock(partitionLock) {
+          ownedPartitions.add(offsetsPartition)
+          loadingPartitions.remove(offsetsPartition)
+        }
+      }
+    }
+
+    scheduler.schedule(topicPartition.toString, doLoadGroupsAndOffsets)
+  }
+
+  private[coordinator] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded:
GroupMetadata => Unit) {
+    def highWaterMark = replicaManager.getHighWatermark(topicPartition).getOrElse(-1L)
+
+    val startMs = time.milliseconds()
+    replicaManager.getLog(topicPartition) match {
+      case None =>
+        warn(s"Attempted to load offsets and group metadata from $topicPartition, but found
no log")
+
+      case Some(log) =>
+        var currOffset = log.logStartOffset
+        val buffer = ByteBuffer.allocate(config.loadBufferSize)
+        // loop breaks if leader changes at any time during the load, since getHighWatermark
is -1
+        val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
+        val removedOffsets = mutable.Set[GroupTopicPartition]()
+        val loadedGroups = mutable.Map[String, GroupMetadata]()
+        val removedGroups = mutable.Set[String]()
+
+        while (currOffset < highWaterMark && !shuttingDown.get()) {
+          buffer.clear()
+          val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None,
minOneMessage = true)
+            .records.asInstanceOf[FileRecords]
+          val bufferRead = fileRecords.readInto(buffer, 0)
+
+          MemoryRecords.readableRecords(bufferRead).deepEntries.asScala.foreach { entry =>
+            val record = entry.record
+            require(record.hasKey, "Group metadata/offset entry key should not be null")
+
+            GroupMetadataManager.readMessageKey(record.key) match {
+              case offsetKey: OffsetKey =>
+                // load offset
+                val key = offsetKey.key
+                if (record.hasNullValue) {
+                  loadedOffsets.remove(key)
+                  removedOffsets.add(key)
                 } else {
-                  // load group metadata
-                  val groupId = baseKey.key.asInstanceOf[String]
-                  val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId,
record.value)
-                  if (groupMetadata != null) {
-                    trace(s"Loaded group metadata for group ${groupMetadata.groupId} with
generation ${groupMetadata.generationId}")
-                    removedGroups.remove(groupId)
-                    loadedGroups.put(groupId, groupMetadata)
-                  } else {
-                    loadedGroups.remove(groupId)
-                    removedGroups.add(groupId)
-                  }
+                  val value = GroupMetadataManager.readOffsetMessageValue(record.value)
+                  loadedOffsets.put(key, value)
+                  removedOffsets.remove(key)
                 }
 
-                currOffset = entry.nextOffset
-              }
-            }
-
-            val (groupOffsets, noGroupOffsets) = loadedOffsets
-              .groupBy(_._1.group)
-              .mapValues(_.map{ case (groupTopicPartition, offsetAndMetadata) => (groupTopicPartition.topicPartition,
offsetAndMetadata)})
-              .partition(value => loadedGroups.contains(value._1))
+              case groupMetadataKey: GroupMetadataKey =>
+                // load group metadata
+                val groupId = groupMetadataKey.key
+                val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+                if (groupMetadata != null) {
+                  trace(s"Loaded group metadata for group $groupId with generation ${groupMetadata.generationId}")
+                  removedGroups.remove(groupId)
+                  loadedGroups.put(groupId, groupMetadata)
+                } else {
+                  loadedGroups.remove(groupId)
+                  removedGroups.add(groupId)
+                }
 
-            loadedGroups.values.foreach { group =>
-              val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition,
OffsetAndMetadata])
-              loadGroup(group, offsets)
-              onGroupLoaded(group)
+              case unknownKey =>
+                throw new IllegalStateException(s"Unexpected message key $unknownKey while
loading offsets and group metadata")
             }
 
-            noGroupOffsets.foreach { case (groupId, offsets) =>
-              val group = new GroupMetadata(groupId)
-              loadGroup(group, offsets)
-              onGroupLoaded(group)
-            }
+            currOffset = entry.nextOffset
+          }
+        }
 
-            removedGroups.foreach { groupId =>
-              if (groupMetadataCache.contains(groupId))
-                throw new IllegalStateException(s"Unexpected unload of active group $groupId
while " +
-                  s"loading partition $topicPartition")
-            }
+        val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+          .groupBy(_._1.group)
+          .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition,
offset)} )
+          .partition { case (group, _) => loadedGroups.contains(group) }
 
-            if (!shuttingDown.get())
-              info("Finished loading offsets from %s in %d milliseconds."
-                .format(topicPartition, time.milliseconds() - startMs))
-          case None =>
-            warn("No log found for " + topicPartition)
+        loadedGroups.values.foreach { group =>
+          val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
+          loadGroup(group, offsets)
+          onGroupLoaded(group)
         }
-      }
-      catch {
-        case t: Throwable =>
-          error("Error in loading offsets from " + topicPartition, t)
-      }
-      finally {
-        inLock(partitionLock) {
-          ownedPartitions.add(offsetsPartition)
-          loadingPartitions.remove(offsetsPartition)
+
+        // load groups which store offsets in kafka, but which have no active members and
thus no group
+        // metadata stored in the log
+        emptyGroupOffsets.foreach { case (groupId, offsets) =>
+          val group = new GroupMetadata(groupId)
+          loadGroup(group, offsets)
+          onGroupLoaded(group)
         }
-      }
+
+        removedGroups.foreach { groupId =>
+          // if the cache already contains a group which should be removed, raise an error.
Note that it
+          // is possible (however unlikely) for a consumer group to be removed, and then
to be used only for
+          // offset storage (i.e. by "simple" consumers)
+          if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
+            throw new IllegalStateException(s"Unexpected unload of active group $groupId
while " +
+              s"loading partition $topicPartition")
+        }
+
+        if (!shuttingDown.get())
+          info("Finished loading offsets from %s in %d milliseconds."
+            .format(topicPartition, time.milliseconds() - startMs))
     }
   }
 
@@ -568,7 +581,7 @@ class GroupMetadataManager(val brokerId: Int,
           partitionOpt.foreach { partition =>
             val tombstones = removedOffsets.map { case (topicPartition, offsetAndMetadata)
=>
               trace(s"Removing expired/deleted offset and metadata for $groupId, $topicPartition:
$offsetAndMetadata")
-              val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition.topic,
topicPartition.partition)
+              val commitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
               Record.create(magicValue, timestampType, timestamp, commitKey, null)
             }.toBuffer
             trace(s"Marked ${removedOffsets.size} offsets in $appendPartition for deletion.")
@@ -606,16 +619,6 @@ class GroupMetadataManager(val brokerId: Int,
     info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
   }
 
-  private def getHighWatermark(partitionId: Int): Long = {
-    val partitionOpt = replicaManager.getPartition(new TopicPartition(Topic.GroupMetadataTopicName,
partitionId))
-
-    val hw = partitionOpt.map { partition =>
-      partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset).getOrElse(-1L)
-    }.getOrElse(-1L)
-
-    hw
-  }
-
   /*
    * Check if the offset metadata length is valid
    */
@@ -816,11 +819,12 @@ object GroupMetadataManager {
    *
    * @return key for offset commit message
    */
-  private def offsetCommitKey(group: String, topic: String, partition: Int, versionId: Short
= 0): Array[Byte] = {
+  private[coordinator] def offsetCommitKey(group: String, topicPartition: TopicPartition,
+                                           versionId: Short = 0): Array[Byte] = {
     val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
     key.set(OFFSET_KEY_GROUP_FIELD, group)
-    key.set(OFFSET_KEY_TOPIC_FIELD, topic)
-    key.set(OFFSET_KEY_PARTITION_FIELD, partition)
+    key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
+    key.set(OFFSET_KEY_PARTITION_FIELD, topicPartition.partition)
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + key.sizeOf)
     byteBuffer.putShort(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
@@ -833,7 +837,7 @@ object GroupMetadataManager {
    *
    * @return key bytes for group metadata message
    */
-  def groupMetadataKey(group: String): Array[Byte] = {
+  private[coordinator] def groupMetadataKey(group: String): Array[Byte] = {
     val key = new Struct(CURRENT_GROUP_KEY_SCHEMA)
     key.set(GROUP_KEY_GROUP_FIELD, group)
 
@@ -849,7 +853,7 @@ object GroupMetadataManager {
    * @param offsetAndMetadata consumer's current offset and metadata
    * @return payload for offset commit message
    */
-  private def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
+  private[coordinator] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte]
= {
     // generate commit value with schema version 1
     val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
     value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
@@ -871,9 +875,9 @@ object GroupMetadataManager {
    * @param version the version of the value message to use
    * @return payload for offset commit message
    */
-  def groupMetadataValue(groupMetadata: GroupMetadata,
-                         assignment: Map[String, Array[Byte]],
-                         version: Short = 0): Array[Byte] = {
+  private[coordinator] def groupMetadataValue(groupMetadata: GroupMetadata,
+                                              assignment: Map[String, Array[Byte]],
+                                              version: Short = 0): Array[Byte] = {
     val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
 
     value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@@ -1013,7 +1017,7 @@ object GroupMetadataManager {
 
           member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
 
-          group.add(memberId, member)
+          group.add(member)
         }
 
         group

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 8dea5ca..25bb83d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -316,7 +316,7 @@ class Log(@volatile var dir: File,
   /**
    * Check if we have the "clean shutdown" file
    */
-  private def hasCleanShutdownFile() = new File(dir.getParentFile, CleanShutdownFile).exists()
+  private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
 
   /**
    * The number of segments in the log.

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 8cd9b34..761edf9 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -349,13 +349,7 @@ class LogManager(val logDirs: Array[File],
   /**
    * Get the log if it exists, otherwise return None
    */
-  def getLog(topicPartition: TopicPartition): Option[Log] = {
-    val log = logs.get(topicPartition)
-    if (log == null)
-      None
-    else
-      Some(log)
-  }
+  def getLog(topicPartition: TopicPartition): Option[Log] = Option(logs.get(topicPartition))
 
   /**
    * Create a log for the given topic and the given partition
@@ -363,28 +357,20 @@ class LogManager(val logDirs: Array[File],
    */
   def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
     logCreationOrDeletionLock synchronized {
-      var log = logs.get(topicPartition)
-      
-      // check if the log has already been created in another thread
-      if(log != null)
-        return log
-      
-      // if not, create it
-      val dataDir = nextLogDir()
-      val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
-      dir.mkdirs()
-      log = new Log(dir, 
-                    config,
-                    recoveryPoint = 0L,
-                    scheduler,
-                    time)
-      logs.put(topicPartition, log)
-      info("Created log for partition [%s,%d] in %s with properties {%s}."
-           .format(topicPartition.topic,
-                   topicPartition.partition,
-                   dataDir.getAbsolutePath,
-                   config.originals.asScala.mkString(", ")))
-      log
+      // create the log if it has not already been created in another thread
+      getLog(topicPartition).getOrElse {
+        val dataDir = nextLogDir()
+        val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
+        dir.mkdirs()
+        val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
+        logs.put(topicPartition, log)
+        info("Created log for partition [%s,%d] in %s with properties {%s}."
+          .format(topicPartition.topic,
+            topicPartition.partition,
+            dataDir.getAbsolutePath,
+            config.originals.asScala.mkString(", ")))
+        log
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index dd8fc03..1aa88a2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -25,7 +25,7 @@ import kafka.api._
 import kafka.cluster.{Partition, Replica}
 import kafka.common._
 import kafka.controller.KafkaController
-import kafka.log.{LogAppendInfo, LogManager}
+import kafka.log.{Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils._
@@ -189,6 +189,8 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  def getLog(topicPartition: TopicPartition): Option[Log] = logManager.getLog(topicPartition)
+
   /**
    * Try to complete some delayed produce requests with the request key;
    * this can be triggered when:
@@ -926,10 +928,16 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  private def getLeaderPartitions() : List[Partition] = {
+  private def getLeaderPartitions(): List[Partition] = {
     allPartitions.values.filter(_.leaderReplicaIfLocal.isDefined).toList
   }
 
+  def getHighWatermark(topicPartition: TopicPartition): Option[Long] = {
+    getPartition(topicPartition).flatMap { partition =>
+      partition.leaderReplicaIfLocal.map(_.highWatermark.messageOffset)
+    }
+  }
+
   // Flushes the highwatermark value for all partitions to the highwatermark file
   def checkpointHighWatermarks() {
     val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
index 9478122..30dfc63 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataManagerTest.scala
@@ -17,20 +17,23 @@
 
 package kafka.coordinator
 
+import java.nio.ByteBuffer
+
 import kafka.api.ApiVersion
 import kafka.cluster.Partition
 import kafka.common.{OffsetAndMetadata, Topic}
-import kafka.log.LogAppendInfo
-import kafka.server.{KafkaConfig, ReplicaManager}
+import kafka.log.{Log, LogAppendInfo}
+import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils, ZkUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.record.{MemoryRecords, Record, TimestampType}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, TimestampType}
 import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.{After, Before, Test}
-import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import kafka.utils.TestUtils.fail
 
 import scala.collection._
 import JavaConverters._
@@ -79,10 +82,153 @@ class GroupMetadataManagerTest {
     partition = EasyMock.niceMock(classOf[Partition])
   }
 
-  @After
-  def tearDown() {
-    EasyMock.reset(replicaManager)
-    EasyMock.reset(partition)
+  @Test
+  def testLoadOffsetsWithoutGroup() {
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val startOffset = 15L
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords: _*)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+    
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded
into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
+  def testLoadOffsetsWithTombstones() {
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val startOffset = 15L
+
+    val tombstonePartition = new TopicPartition("foo", 1)
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      tombstonePartition -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val tombstone = Record.create(GroupMetadataManager.offsetCommitKey(groupId, tombstonePartition),
null)
+    val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(tombstone):
_*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded
into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(committedOffsets.size - 1, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      if (topicPartition == tombstonePartition)
+        assertEquals(None, group.offset(topicPartition))
+      else
+        assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
+  def testLoadOffsetsAndGroup() {
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val startOffset = 15L
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val memberId = "98098230493"
+    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+    val records = MemoryRecords.withRecords(startOffset, offsetCommitRecords ++ Seq(groupMetadataRecord):
_*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded
into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Stable, group.currentState)
+    assertEquals(memberId, group.leaderId)
+    assertEquals(Set(memberId), group.allMembers)
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  @Test
+  def testLoadGroupWithTombstone() {
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val startOffset = 15L
+
+    val memberId = "98098230493"
+    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+    val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId),
null)
+    val records = MemoryRecords.withRecords(startOffset, Seq(groupMetadataRecord, groupMetadataTombstone):
_*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+  }
+
+  @Test
+  def testOffsetWriteAfterGroupRemoved(): Unit = {
+    // this test case checks the following scenario:
+    // 1. the group exists at some point in time, but is later removed (because all members
left)
+    // 2. a "simple" consumer (i.e. not a consumer group) then uses the same groupId to commit
some offsets
+
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GroupMetadataTopicName, groupPartitionId)
+    val startOffset = 15L
+
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+    val memberId = "98098230493"
+    val groupMetadataRecord = buildStableGroupRecordWithMember(memberId)
+    val groupMetadataTombstone = Record.create(GroupMetadataManager.groupMetadataKey(groupId),
null)
+    val records = MemoryRecords.withRecords(startOffset,
+      Seq(groupMetadataRecord, groupMetadataTombstone) ++ offsetCommitRecords: _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(TestUtils.fail("Group was
not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
   }
 
   @Test
@@ -156,7 +302,7 @@ class GroupMetadataManagerTest {
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout,
sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
     member.awaitingJoinCallback = _ => ()
-    group.add(memberId, member)
+    group.add(member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
@@ -185,7 +331,7 @@ class GroupMetadataManagerTest {
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout,
sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
     member.awaitingJoinCallback = _ => ()
-    group.add(memberId, member)
+    group.add(member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
@@ -264,7 +410,7 @@ class GroupMetadataManagerTest {
       commitErrors = Some(errors)
     }
 
-    val delayedStoreOpt = groupMetadataManager.prepareStoreOffsets(group, memberId, generationId,
offsets, callback)
+    groupMetadataManager.prepareStoreOffsets(group, memberId, generationId, offsets, callback)
 
     assertFalse(commitErrors.isEmpty)
     val maybeError = commitErrors.get.get(topicPartition)
@@ -557,7 +703,7 @@ class GroupMetadataManagerTest {
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout,
sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
     member.awaitingJoinCallback = _ => ()
-    group.add(memberId, member)
+    group.add(member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
@@ -620,4 +766,47 @@ class GroupMetadataManagerTest {
       .andStubReturn(Some(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME))
   }
 
+  private def buildStableGroupRecordWithMember(memberId: String): Record = {
+    val group = new GroupMetadata(groupId)
+    group.transitionTo(PreparingRebalance)
+    val memberProtocols = List(("roundrobin", Array.emptyByteArray))
+    val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000,
"consumer", memberProtocols)
+    group.add(member)
+    member.awaitingJoinCallback = _ => {}
+    group.initNextGeneration()
+    group.transitionTo(Stable)
+
+    val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
+    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId
-> Array.empty[Byte]))
+    Record.create(groupMetadataKey, groupMetadataValue)
+  }
+
+  private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
+                                      startOffset: Long,
+                                      records: MemoryRecords): Unit = {
+    val endOffset = startOffset + records.deepEntries.asScala.size
+    val logMock =  EasyMock.mock(classOf[Log])
+    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+
+    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
+    EasyMock.expect(replicaManager.getHighWatermark(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
+    EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
EasyMock.eq(true)))
+      .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
+    EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
+      .andReturn(records.buffer)
+
+    EasyMock.replay(logMock, fileRecordsMock)
+  }
+
+  private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
+                                           groupId: String = groupId): Seq[Record] = {
+    committedOffsets.map { case (topicPartition, offset) =>
+      val offsetAndMetadata = OffsetAndMetadata(offset)
+      val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
+      val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+      Record.create(offsetCommitKey, offsetCommitValue)
+    }.toSeq
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
index bf695bf..3db7818 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupMetadataTest.scala
@@ -180,14 +180,14 @@ class GroupMetadataTest extends JUnitSuite {
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs,
       protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
-    group.add(memberId, member)
+    group.add(member)
     assertEquals("range", group.selectProtocol)
 
     val otherMemberId = "otherMemberId"
     val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
-    group.add(otherMemberId, otherMember)
+    group.add(otherMember)
     // now could be either range or robin since there is no majority preference
     assertTrue(Set("range", "roundrobin")(group.selectProtocol))
 
@@ -195,7 +195,7 @@ class GroupMetadataTest extends JUnitSuite {
     val lastMember = new MemberMetadata(lastMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("range", Array.empty[Byte])))
 
-    group.add(lastMemberId, lastMember)
+    group.add(lastMember)
     // now we should prefer 'roundrobin'
     assertEquals("roundrobin", group.selectProtocol)
   }
@@ -216,8 +216,8 @@ class GroupMetadataTest extends JUnitSuite {
     val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
-    group.add(memberId, member)
-    group.add(otherMemberId, otherMember)
+    group.add(member)
+    group.add(otherMember)
     assertEquals("roundrobin", group.selectProtocol)
   }
 
@@ -230,7 +230,7 @@ class GroupMetadataTest extends JUnitSuite {
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("range", Array.empty[Byte]), ("roundrobin", Array.empty[Byte])))
 
-    group.add(memberId, member)
+    group.add(member)
     assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
     assertTrue(group.supportsProtocols(Set("range", "foo")))
     assertFalse(group.supportsProtocols(Set("foo", "bar")))
@@ -239,7 +239,7 @@ class GroupMetadataTest extends JUnitSuite {
     val otherMember = new MemberMetadata(otherMemberId, groupId, clientId, clientHost, rebalanceTimeoutMs,
       sessionTimeoutMs, protocolType, List(("roundrobin", Array.empty[Byte]), ("blah", Array.empty[Byte])))
 
-    group.add(otherMemberId, otherMember)
+    group.add(otherMember)
 
     assertTrue(group.supportsProtocols(Set("roundrobin", "foo")))
     assertFalse(group.supportsProtocols(Set("range", "foo")))
@@ -253,7 +253,7 @@ class GroupMetadataTest extends JUnitSuite {
 
     group.transitionTo(PreparingRebalance)
     member.awaitingJoinCallback = _ => ()
-    group.add(memberId, member)
+    group.add(member)
 
     assertEquals(0, group.generationId)
     assertNull(group.protocol)

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
index 7a00f2a..6358bdc 100755
--- a/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
+++ b/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.LinkedBlockingQueue
 
 import org.apache.kafka.common.protocol.{Errors, SecurityProtocol}
-import org.junit.Assert._
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.easymock.EasyMock
 import org.junit.Test
 import kafka.api._
@@ -36,7 +36,6 @@ import kafka.utils.TestUtils._
 import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import kafka.utils._
-import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.Time
 
 @deprecated("This test has been deprecated and it will be removed in a future release.",
"0.10.0.0")

http://git-wip-us.apache.org/repos/asf/kafka/blob/57f0cb29/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f132f9e..743756e 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -289,6 +289,11 @@ object TestUtils extends Logging {
   }
 
   /**
+   * Fail a test case explicitly. Return Nothing so that we are not constrained by the return
type.
+   */
+  def fail(msg: String): Nothing = throw new AssertionError(msg)
+
+  /**
    * Wrap a single record log buffer.
    */
   def singletonRecords(value: Array[Byte],


Mime
View raw message