kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6650) The controller should be able to handle a partially deleted topic
Date Tue, 17 Apr 2018 00:17:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16440195#comment-16440195
] 

ASF GitHub Bot commented on KAFKA-6650:
---------------------------------------

junrao closed pull request #4825: KAFKA-6650: Allowing transition to OfflineReplica state
for replicas without leadership info
URL: https://github.com/apache/kafka/pull/4825
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index a2d04e65ae6..5fafcc4fe3f 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -202,8 +202,10 @@ class ReplicaStateMachine(config: KafkaConfig,
           controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition,
             deletePartition = false, (_, _) => ())
         }
-        val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition))
-        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition))
+        val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = validReplicas.partition
{ replica =>
+          controllerContext.partitionLeadershipInfo.contains(replica.topicPartition)
+        }
+        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasWithLeadershipInfo.map(_.topicPartition))
         updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch)
=>
           if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
             val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_
== replicaId)
@@ -216,6 +218,11 @@ class ReplicaStateMachine(config: KafkaConfig,
           logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)
           replicaState.put(replica, OfflineReplica)
         }
+
+        replicasWithoutLeadershipInfo.foreach { replica =>
+          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica),
OfflineReplica)
+          replicaState.put(replica, OfflineReplica)
+        }
       case ReplicaDeletionStarted =>
         validReplicas.foreach { replica =>
           logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica),
ReplicaDeletionStarted)
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 9b58fc7cc4b..a65128ad98a 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -1370,7 +1370,7 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure:
Boolean
    * @return true if path gets deleted successfully, false if root path doesn't exist
    * @throws KeeperException if there is an error while deleting the znodes
    */
-  private[zk] def deleteRecursive(path: String): Boolean = {
+  def deleteRecursive(path: String): Boolean = {
     val getChildrenResponse = retryRequestUntilConnected(GetChildrenRequest(path))
     getChildrenResponse.resultCode match {
       case Code.OK =>
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index ef455d4457e..4c033c421bb 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -17,7 +17,7 @@
 package kafka.admin
 
 import kafka.log.Log
-import kafka.zk.ZooKeeperTestHarness
+import kafka.zk.{TopicPartitionZNode, ZooKeeperTestHarness}
 import kafka.utils.TestUtils
 import kafka.server.{KafkaConfig, KafkaServer}
 import org.junit.Assert._
@@ -326,7 +326,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     brokerConfigs.head.setProperty("log.segment.bytes","100")
     brokerConfigs.head.setProperty("log.cleaner.dedupe.buffer.size","1048577")
 
-    servers = createTestTopicAndCluster(topic,brokerConfigs)
+    servers = createTestTopicAndCluster(topic, brokerConfigs, expectedReplicaAssignment)
 
     // for simplicity, we are validating cleaner offsets on a single broker
     val server = servers.head
@@ -363,18 +363,18 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     TestUtils.verifyTopicDeletion(zkClient, topic, 1, servers)
   }
 
-  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true):
Seq[KafkaServer] = {
+  private def createTestTopicAndCluster(topic: String, deleteTopicEnabled: Boolean = true,
replicaAssignment: Map[Int, List[Int]] = expectedReplicaAssignment): Seq[KafkaServer] = {
     val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, enableControlledShutdown
= false)
     brokerConfigs.foreach(_.setProperty("delete.topic.enable", deleteTopicEnabled.toString))
-    createTestTopicAndCluster(topic, brokerConfigs)
+    createTestTopicAndCluster(topic, brokerConfigs, replicaAssignment)
   }
 
-  private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties]): Seq[KafkaServer]
= {
+  private def createTestTopicAndCluster(topic: String, brokerConfigs: Seq[Properties], replicaAssignment:
Map[Int, List[Int]]): Seq[KafkaServer] = {
     val topicPartition = new TopicPartition(topic, 0)
     // create brokers
     val servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b)))
     // create the topic
-    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment)
+    adminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(topic, replicaAssignment)
     // wait until replica log is created on every broker
     TestUtils.waitUntilTrue(() => servers.forall(_.getLogManager().getLog(topicPartition).isDefined),
       "Replicas for topic test not created")
@@ -408,4 +408,35 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val leaderIdOpt = zkClient.getLeaderForPartition(new TopicPartition(topic, 0))
     assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
   }
+
+  @Test
+  def testDeletingPartiallyDeletedTopic() {
+    /**
+      * A previous controller could have deleted some partitions of a topic from ZK, but
not all partitions, and then crashed.
+      * In that case, the new controller should be able to handle the partially deleted topic,
and finish the deletion.
+      */
+
+    val replicaAssignment = Map(0 -> List(0, 1, 2), 1 -> List(0, 1, 2))
+    val topic = "test"
+    servers = createTestTopicAndCluster(topic, true, replicaAssignment)
+
+    /**
+      * shutdown all brokers in order to create a partially deleted topic on ZK
+      */
+    servers.foreach(_.shutdown())
+
+    /**
+      * delete the partition znode at /brokers/topics/test/partition/0
+      * to simulate the case that a previous controller crashed right after deleting the
partition znode
+      */
+    zkClient.deleteRecursive(TopicPartitionZNode.path(new TopicPartition(topic, 0)))
+    adminZkClient.deleteTopic(topic)
+
+    /**
+      * start up all brokers and verify that topic deletion eventually finishes.
+      */
+    servers.foreach(_.startup())
+    TestUtils.waitUntilTrue(() => servers.exists(_.kafkaController.isActive), "No controller
is elected")
+    TestUtils.verifyTopicDeletion(zkClient, topic, 2, servers)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 6a961a53157..14d2df2e8f8 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -119,7 +119,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     EasyMock.replay(mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
     EasyMock.verify(mockControllerBrokerRequestBatch)
-    assertEquals(NewReplica, replicaState(replica))
+    assertEquals(OfflineReplica, replicaState(replica))
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> The controller should be able to handle a partially deleted topic
> -----------------------------------------------------------------
>
>                 Key: KAFKA-6650
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6650
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Minor
>
> A previous controller could have deleted some partitions of a topic from ZK, but not
all partitions, and then died.
> In that case, the new controller should be able to handle the partially deleted topic,
and finish the deletion.
> In the current code base, if there is no leadership info for a replica's partition, the
transition to OfflineReplica state for the replica will fail. Afterwards the transition to
ReplicaDeletionStarted will fail as well since the only valid previous state for ReplicaDeletionStarted
is OfflineReplica. Furthermore, it means the topic deletion will never finish.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message