kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-2000) Delete consumer offsets from kafka once the topic is deleted
Date Sun, 25 Feb 2018 06:57:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-2000?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375965#comment-16375965
] 

ASF GitHub Bot commented on KAFKA-2000:
---------------------------------------

hachikuji closed pull request #704: KAFKA-2000: Delete topic should also delete consumer offsets.
URL: https://github.com/apache/kafka/pull/704
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 48818c3edff..56dc1139cf1 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -62,14 +62,14 @@ class GroupMetadataManager(val brokerId: Int,
   /* group metadata cache */
   private val groupsCache = new Pool[String, GroupMetadata]
 
-  /* partitions of consumer groups that are being loaded, its lock should be always called
BEFORE offsetExpireLock and the group lock if needed */
+  /* partitions of consumer groups that are being loaded, its lock should be always called
BEFORE removeOffsetLock and the group lock if needed */
   private val loadingPartitions: mutable.Set[Int] = mutable.Set()
 
   /* partitions of consumer groups that are assigned, using the same loading partition lock
*/
   private val ownedPartitions: mutable.Set[Int] = mutable.Set()
 
-  /* lock for expiring stale offsets, it should be always called BEFORE the group lock if
needed */
-  private val offsetExpireLock = new ReentrantReadWriteLock()
+  /* lock for removing stale/deleted topic's offsets, it should be always called BEFORE the
group lock if needed */
+  private val removeOffsetLock = new ReentrantReadWriteLock()
 
   /* shutting down flag */
   private val shuttingDown = new AtomicBoolean(false)
@@ -163,6 +163,19 @@ class GroupMetadataManager(val brokerId: Int,
     }
   }
 
+  /**
+   * Removes all the groups and corresponding offsets for this topicPartition.
+   * @param topicPartition
+   */
+  def removeOffsetsByTopicPartition(topicPartition: TopicPartition): Unit = {
+    val startMs = SystemTime.milliseconds
+    val numberOfDeletedExpiredOffsets = inWriteLock(removeOffsetLock) {
+      val offsetsToBeDeleted = offsetsCache.filter(_._1.topicPartition == TopicAndPartition(topicPartition.topic,
topicPartition.partition))
+      deleteOffsets(offsetsToBeDeleted)
+    }
+    info(s"Removed ${numberOfDeletedExpiredOffsets} offsets in ${SystemTime.milliseconds
- startMs} milliseconds for topicPartition ${topicPartition}")
+  }
+
   def prepareStoreGroup(group: GroupMetadata,
                         groupAssignment: Map[String, Array[Byte]],
                         responseCallback: Short => Unit): DelayedStore = {
@@ -365,7 +378,7 @@ class GroupMetadataManager(val brokerId: Int,
             var currOffset = log.logSegments.head.baseOffset
             val buffer = ByteBuffer.allocate(config.loadBufferSize)
             // loop breaks if leader changes at any time during the load, since getHighWatermark
is -1
-            inWriteLock(offsetExpireLock) {
+            inWriteLock(removeOffsetLock) {
               val loadedGroups = mutable.Map[String, GroupMetadata]()
               val removedGroups = mutable.Set[String]()
 
@@ -534,53 +547,62 @@ class GroupMetadataManager(val brokerId: Int,
     debug("Collecting expired offsets.")
     val startMs = SystemTime.milliseconds
 
-    val numExpiredOffsetsRemoved = inWriteLock(offsetExpireLock) {
+    val numExpiredOffsetsRemoved = inWriteLock(removeOffsetLock) {
       val expiredOffsets = offsetsCache.filter { case (groupTopicPartition, offsetAndMetadata)
=>
         offsetAndMetadata.expireTimestamp < startMs
       }
 
-      debug("Found %d expired offsets.".format(expiredOffsets.size))
+      deleteOffsets(expiredOffsets)
+    }
+
+    info(s"Removed ${numExpiredOffsetsRemoved} expired offsets in ${SystemTime.milliseconds
- startMs} milliseconds.")
+  }
+
+  /**
+   * Deletes the provided offsets.
+   * @param offsetsToBeDeleted collection of offsets that needs to be deleted.
+   * @return number of deleted offsets.
+   */
+  def deleteOffsets(offsetsToBeDeleted: scala.Iterable[(GroupTopicPartition, OffsetAndMetadata)]):
Int = {
+    debug("Found %d offsets that needs to be removed.".format(offsetsToBeDeleted.size))
 
-      // delete the expired offsets from the table and generate tombstone messages to remove
them from the log
-      val tombstonesForPartition = expiredOffsets.map { case (groupTopicAndPartition, offsetAndMetadata)
=>
-        val offsetsPartition = partitionFor(groupTopicAndPartition.group)
-        trace("Removing expired offset and metadata for %s: %s".format(groupTopicAndPartition,
offsetAndMetadata))
+    // delete the offsets from the table and generate tombstone messages to remove them from
the log
+    val tombstonesForPartition = offsetsToBeDeleted.map { case (groupTopicAndPartition, offsetAndMetadata)
=>
+      val offsetsPartition = partitionFor(groupTopicAndPartition.group)
+      trace("Removing offset and metadata for %s: %s".format(groupTopicAndPartition, offsetAndMetadata))
 
-        offsetsCache.remove(groupTopicAndPartition)
+      offsetsCache.remove(groupTopicAndPartition)
 
-        val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
-          groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
+      val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group,
+        groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition)
 
-        (offsetsPartition, new Message(bytes = null, key = commitKey))
-      }.groupBy { case (partition, tombstone) => partition }
+      (offsetsPartition, new Message(bytes = null, key = commitKey))
+    }.groupBy { case (partition, tombstone) => partition }
 
-      // Append the tombstone messages to the offset partitions. It is okay if the replicas
don't receive these (say,
-      // if we crash or leaders move) since the new leaders will get rid of expired offsets
during their own purge cycles.
-      tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
-        val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
-        partitionOpt.map { partition =>
-          val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
-          val messages = tombstones.map(_._2).toSeq
+    // Append the tombstone messages to the offset partitions. It is okay if the replicas
don't receive these (say,
+    // if we crash or leaders move) since the new leaders will get rid of expired offsets
during their own purge cycles.
+    tombstonesForPartition.flatMap { case (offsetsPartition, tombstones) =>
+      val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
+      partitionOpt.map { partition =>
+        val appendPartition = TopicAndPartition(GroupCoordinator.GroupMetadataTopicName,
offsetsPartition)
+        val messages = tombstones.map(_._2).toSeq
 
-          trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
+        trace("Marked %d offsets in %s for deletion.".format(messages.size, appendPartition))
 
-          try {
-            // do not need to require acks since even if the tombstone is lost,
-            // it will be appended again in the next purge cycle
-            partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
messages: _*))
-            tombstones.size
-          }
-          catch {
-            case t: Throwable =>
-              error("Failed to mark %d expired offsets for deletion in %s.".format(messages.size,
appendPartition), t)
-              // ignore and continue
-              0
-          }
+        try {
+          // do not need to require acks since even if the tombsone is lost,
+          // it will be appended again in the next purge cycle
+          partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec,
messages: _*))
+          tombstones.size
         }
-      }.sum
-    }
-
-    info("Removed %d expired offsets in %d milliseconds.".format(numExpiredOffsetsRemoved,
SystemTime.milliseconds - startMs))
+        catch {
+          case t: Throwable =>
+            error("Failed to mark %d offsets for deletion in %s.".format(messages.size, appendPartition),
t)
+            // ignore and continue
+            0
+        }
+      }
+    }.sum
   }
 
   private def getHighWatermark(partitionId: Int): Long = {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f2e95332e8f..09cb06196ab 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -164,6 +164,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val response =
       if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
         val (result, error) = replicaManager.stopReplicas(stopReplicaRequest)
+        stopReplicaRequest.partitions.asScala.foreach(topicPartition => coordinator.groupManager.removeOffsetsByTopicPartition(topicPartition))
         new StopReplicaResponse(error, result.asInstanceOf[Map[TopicPartition, JShort]].asJava)
       } else {
         val result = stopReplicaRequest.partitions.asScala.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
index 1d5148b0e15..cffe852b6a8 100755
--- a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -17,9 +17,10 @@
 
 package kafka.server
 
+import kafka.admin.AdminUtils
 import kafka.api.{GroupCoordinatorRequest, OffsetCommitRequest, OffsetFetchRequest}
 import kafka.consumer.SimpleConsumer
-import kafka.common.{OffsetMetadata, OffsetMetadataAndError, OffsetAndMetadata, TopicAndPartition}
+import kafka.common._
 import kafka.utils._
 import kafka.utils.TestUtils._
 import kafka.zk.ZooKeeperTestHarness
@@ -49,7 +50,7 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
   @Before
   override def setUp() {
     super.setUp()
-    val config: Properties = createBrokerConfig(1, zkConnect)
+    val config: Properties = createBrokerConfig(1, zkConnect, enableDeleteTopic = true)
     config.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "1")
     config.setProperty(KafkaConfig.OffsetsRetentionCheckIntervalMsProp, retentionCheckInterval.toString)
     val logDirPath = config.getProperty("log.dir")
@@ -307,4 +308,33 @@ class OffsetCommitTest extends ZooKeeperTestHarness {
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION.code, commitResponse.commitStatus.get(TopicAndPartition(topic1,
0)).get)
     assertEquals(Errors.NONE.code, commitResponse.commitStatus.get(TopicAndPartition(topic2,
0)).get)
   }
+
+  @Test
+  def testOffsetsDeleteAfterTopicDeletion() {
+    // set up topic partition
+    val topic = "topic"
+    val topicPartition = TopicAndPartition(topic, 0)
+    createTopic(zkUtils, topic, servers = Seq(server), numPartitions = 1)
+
+    val fetchRequest = OffsetFetchRequest(group, Seq(TopicAndPartition(topic, 0)))
+
+    // v0 version commit request with commit timestamp set to -1
+    // should not expire
+    val commitRequest0 = OffsetCommitRequest(
+      groupId = group,
+      requestInfo = immutable.Map(topicPartition -> OffsetAndMetadata(1L, "metadata",
-1L)),
+      versionId = 0
+    )
+    assertEquals(Errors.NONE.code(), simpleConsumer.commitOffsets(commitRequest0).commitStatus.get(topicPartition).get)
+
+    // start topic deletion
+    AdminUtils.deleteTopic(zkUtils, topic)
+    TestUtils.verifyTopicDeletion(zkUtils, topic, 1, Seq(server))
+    Thread.sleep(retentionCheckInterval * 2)
+
+    // check if offsets deleted
+    val offsetMetadataAndErrorMap = simpleConsumer.fetchOffsets(fetchRequest)
+    val offsetMetadataAndError = offsetMetadataAndErrorMap.requestInfo(topicPartition)
+    assertEquals(OffsetMetadataAndError.NoOffset, offsetMetadataAndError)
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Delete consumer offsets from kafka once the topic is deleted
> ------------------------------------------------------------
>
>                 Key: KAFKA-2000
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2000
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Sriharsha Chintalapani
>            Assignee: Manikumar
>            Priority: Major
>              Labels: newbie++
>             Fix For: 0.10.2.0, 0.11.0.0
>
>         Attachments: KAFKA-2000.patch, KAFKA-2000_2015-05-03_10:39:11.patch
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message