kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-330 Delete topic followup - more tests and Joel's review comments
Date Sat, 08 Feb 2014 19:09:45 GMT
Updated Branches:
  refs/heads/trunk 36eae8f63 -> 1a7048d41


KAFKA-330 Delete topic followup - more tests and Joel's review comments


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1a7048d4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1a7048d4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1a7048d4

Branch: refs/heads/trunk
Commit: 1a7048d41bb56118d4f31ff972618ebe13007857
Parents: 36eae8f
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Sat Feb 8 11:09:39 2014 -0800
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Sat Feb 8 11:09:39 2014 -0800

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 35 ++++++----
 .../kafka/controller/KafkaController.scala      | 20 +++---
 .../controller/PartitionStateMachine.scala      | 41 ++++++++----
 .../kafka/controller/ReplicaStateMachine.scala  | 62 +++++++++---------
 .../kafka/controller/TopicDeletionManager.scala | 69 ++++++++++----------
 .../src/main/scala/kafka/server/KafkaApis.scala |  5 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      | 63 +++++++++++++++++-
 7 files changed, 193 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a1ee5a7..8ab8ab6 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -235,18 +235,29 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
                                          callback: (RequestOrResponse) => Unit = null) {
     val partitionList = controllerContext.partitionLeadershipInfo.keySet.dropWhile(
       p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
-    partitionList.foreach { partition =>
-      val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
-      leaderIsrAndControllerEpochOpt match {
-        case Some(leaderIsrAndControllerEpoch) =>
-          val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
-          val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
+    if(partitionList.size > 0) {
+      partitionList.foreach { partition =>
+        val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
+        leaderIsrAndControllerEpochOpt match {
+          case Some(leaderIsrAndControllerEpoch) =>
+            val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
+            val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
+            brokerIds.filter(b => b >= 0).foreach { brokerId =>
+              updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
+              updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
+            }
+          case None =>
+            info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition))
+        }
+      }
+    } else {
+      if(controllerContext.partitionLeadershipInfo.keySet.size > 0) {
+        // last set of topics are being deleted
+        controllerContext.partitionLeadershipInfo.foreach { case(partition, leaderIsrAndControllerEpoch) =>
           brokerIds.filter(b => b >= 0).foreach { brokerId =>
-            updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
-            updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
+            updateMetadataRequestMap.put(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
           }
-        case None =>
-          info("Leader not assigned yet for partition %s. Skip sending udpate metadata request".format(partition))
+        }
       }
     }
   }
@@ -272,10 +283,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       val broker = m._1
       val partitionStateInfos = m._2.toMap
       val updateMetadataRequest = new UpdateMetadataRequest(controllerId, controllerEpoch, correlationId, clientId,
-                                                            partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
+        partitionStateInfos, controllerContext.liveOrShuttingDownBrokers)
       partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
         "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
-                                                                 correlationId, broker, p._1)))
+        correlationId, broker, p._1)))
       controller.sendRequest(broker, updateMetadataRequest, null)
     }
     updateMetadataRequestMap.clear()

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d812cb4..8acd076 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -433,7 +433,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     if(replicasForTopicsToBeDeleted.size > 0) {
       // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
       // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
-      // since topic deletion cannot be retried if at least one replica is in TopicDeletionStarted state
+      // since topic deletion cannot be retried until at least one replica is in TopicDeletionStarted state
       deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
     }
   }
@@ -443,6 +443,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * and partitions as input. It does the following -
    * 1. Registers partition change listener. This is not required until KAFKA-347
    * 2. Invokes the new partition callback
+   * 3. Send metadata request with the new topic to all brokers so they allow requests for that topic to be served
    */
   def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
     info("New topic creation callback for %s".format(newPartitions.mkString(",")))
@@ -581,8 +582,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
               // first register ISR change listener
               watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
               controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
-              // halt topic deletion for the partitions being reassigned
-              deleteTopicManager.haltTopicDeletion(Set(topic))
+              // mark topic ineligible for deletion for the partitions being reassigned
+              deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
               onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
             } else {
               // some replica in RAR is not alive. Fail partition reassignment
@@ -605,7 +606,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
-      deleteTopicManager.haltTopicDeletion(partitions.map(_.topic))
+      deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
       partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
@@ -748,17 +749,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
   private def initializeTopicDeletion() {
     val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet
-    val replicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter(r =>
-      r._2.foldLeft(false)((res,r) => res || !controllerContext.liveBrokerIds.contains(r)))
-    val topicsWithReplicasOnDeadBrokers = replicasOnDeadBrokers.map(_._1.topic).toSet
+    val topicsWithReplicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter { case(partition, replicas) =>
+      replicas.exists(r => !controllerContext.liveBrokerIds.contains(r)) }.keySet.map(_.topic)
     val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
     val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
-    val haltedTopicsForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
+    val topicsIneligibleForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
                                   topicsForWhichPreferredReplicaElectionIsInProgress
     info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
-    info("List of topics halted for deletion: %s".format(haltedTopicsForDeletion.mkString(",")))
+    info("List of topics ineligible for deletion: %s".format(topicsIneligibleForDeletion.mkString(",")))
     // initialize the topic deletion manager
-    deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, haltedTopicsForDeletion)
+    deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, topicsIneligibleForDeletion)
   }
 
   private def maybeTriggerPartitionReassignment() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 57c96b5..c69077e 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -51,6 +51,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
+  private var topicChangeListener: TopicChangeListener = null
+  private var deleteTopicsListener: DeleteTopicsListener = null
+  private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
 
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
@@ -167,8 +170,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           assignReplicasToPartitions(topic, partition)
           partitionState.put(topicAndPartition, NewPartition)
           val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from NotExists to New with assigned replicas %s"
-                                    .format(controllerId, controller.epoch, topicAndPartition, assignedReplicas))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
+                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
+                                            assignedReplicas))
           // post: partition has been assigned replicas
         case OnlinePartition =>
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
@@ -184,22 +188,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           }
           partitionState.put(topicAndPartition, OnlinePartition)
           val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d"
-                                    .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
+                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
            // post: partition has a leader
         case OfflinePartition =>
           // pre: partition should be in New or Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline"
-                                    .format(controllerId, controller.epoch, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
+                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
           partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
           // pre: partition should be in Offline state
           assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
-          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Offline to NotExists"
-                                    .format(controllerId, controller.epoch, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
+                                    .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
           partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
@@ -358,15 +362,22 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   private def registerTopicChangeListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
+    topicChangeListener = new TopicChangeListener()
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
   }
 
   def registerPartitionChangeListener(topic: String) = {
-    zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic))
+    addPartitionsListener.put(topic, new AddPartitionsListener(topic))
+    zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+  }
+
+  def deregisterPartitionChangeListener(topic: String) = {
+    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
   }
 
   private def registerDeleteTopicListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, new DeleteTopicsListener())
+    deleteTopicsListener = new DeleteTopicsListener()
+    zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
   }
 
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
@@ -438,21 +449,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         }
         debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
         val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
-        if(nonExistentTopics.size > 0)
+        if(nonExistentTopics.size > 0) {
           warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
+          nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
+        }
         topicsToBeDeleted --= nonExistentTopics
         if(topicsToBeDeleted.size > 0) {
           info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
           // add topic to deletion list
           controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
-          // halt if other state changes are in progress
+          // mark topic ineligible for deletion if other state changes are in progress
           topicsToBeDeleted.foreach { topic =>
             val preferredReplicaElectionInProgress =
               controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
             val partitionReassignmentInProgress =
               controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
             if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
-              controller.deleteTopicManager.haltTopicDeletion(Set(topic))
+              controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 613aec6..5e016d5 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -40,7 +40,7 @@ import kafka.utils.Utils._
  * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica
  * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is
  *                        moved to this state. Valid previous state is ReplicaDeletionStarted
- * 6. ReplicaDeletionFailed: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted
+ * 6. ReplicaDeletionIneligible: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted
  * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
  *                        ReplicaDeletionSuccessful
  */
@@ -115,7 +115,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
    *   partition to every live broker
    *
-   * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionFailed -> OfflineReplica
+   * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionIneligible -> OfflineReplica
    * --send StopReplicaRequest to the replica (w/o deletion)
    * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
    *   UpdateMetadata request for the partition to every live broker.
@@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * ReplicaDeletionStarted -> ReplicaDeletionSuccessful
    * -- mark the state of the replica in the state machine
    *
-   * ReplicaDeletionStarted -> ReplicaDeletionFailed
+   * ReplicaDeletionStarted -> ReplicaDeletionIneligible
    * -- mark the state of the replica in the state machine
    *
    * ReplicaDeletionSuccessful -> NonExistentReplica
@@ -146,8 +146,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
                                             "to %s failed because replica state machine has not started")
                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
+    val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
     try {
-      replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
       targetState match {
         case NewReplica =>
@@ -165,45 +165,47 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
             case None => // new leader request will be sent to this replica when one gets elected
           }
           replicaState.put(partitionAndReplica, NewReplica)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
-                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
+                                            targetState))
         case ReplicaDeletionStarted =>
           assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
           replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
             callbacks.stopReplicaResponseCallback)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionStarted"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
-        case ReplicaDeletionFailed =>
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
+        case ReplicaDeletionIneligible =>
           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
-          replicaState.put(partitionAndReplica, ReplicaDeletionFailed)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionFailed"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+          replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case ReplicaDeletionSuccessful =>
           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
           replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionSuccessful"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case NonExistentReplica =>
           assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
           replicaState.remove(partitionAndReplica)
-          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
-            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
         case OnlineReplica =>
           assertValidPreviousStates(partitionAndReplica,
-            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState)
+            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
           replicaState(partitionAndReplica) match {
             case NewReplica =>
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
               if(!currentAssignedReplicas.contains(replicaId))
                 controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
-              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
-                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+              stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+                                        .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
+                                                targetState))
             case _ =>
               // check if the leader for this partition ever existed
               controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
@@ -211,8 +213,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                     replicaAssignment)
                   replicaState.put(partitionAndReplica, OnlineReplica)
-                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
-                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+                  stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+                    .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
                   // started a log for that partition and does not have a high watermark value for this partition
               }
@@ -220,7 +222,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           replicaState.put(partitionAndReplica, OnlineReplica)
         case OfflineReplica =>
           assertValidPreviousStates(partitionAndReplica,
-            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState)
+            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
           // send stop replica command to the replica so that it stops fetching from the leader
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
           // As an optimization, the controller removes dead replicas from the ISR
@@ -233,8 +235,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
                       topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                     replicaState.put(partitionAndReplica, OfflineReplica)
-                    stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
-                      .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+                    stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
+                      .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
                     false
                   case None =>
                     true
@@ -250,8 +252,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
     catch {
       case t: Throwable =>
-        stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] to %s failed"
-                                  .format(controllerId, controller.epoch, replicaId, topic, partition, targetState), t)
+        stateChangeLogger.error("Controller %d epoch %d initiated state change of replica %d for partition [%s,%d] from %s to %s failed"
+                                  .format(controllerId, controller.epoch, replicaId, topic, partition, currState, targetState), t)
     }
   }
 
@@ -273,7 +275,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   }
 
   def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
-    val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionFailed)
+    val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
     replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
   }
 
@@ -304,8 +306,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           case false =>
             // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
             // This is required during controller failover since during controller failover a broker can go down,
-            // so the replicas on that broker should be moved to ReplicaDeletionFailed to be on the safer side.
-            replicaState.put(partitionAndReplica, ReplicaDeletionFailed)
+            // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
+            replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
         }
       }
     }
@@ -356,7 +358,7 @@ case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
 case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
 case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4}
 case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
-case object ReplicaDeletionFailed extends ReplicaState { val state: Byte = 6}
+case object ReplicaDeletionIneligible extends ReplicaState { val state: Byte = 6}
 case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 91a446d..58f1c42 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -30,8 +30,8 @@ import kafka.api.{StopReplicaResponse, RequestOrResponse}
  * 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread
  *    is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to
  *    be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the
- *    controller. In the future, it can be triggered based on the configured TTL for the topic. A topic's deletion will
- *    be halted in the following scenarios -
+ *    controller. In the future, it can be triggered based on the configured TTL for the topic. A topic will be ineligible
+ *    for deletion in the following scenarios -
  *    3.1 broker hosting one of the replicas for that topic goes down
  *    3.2 partition reassignment for partitions of that topic is in progress
  *    3.3 preferred replica election for partitions of that topic is in progress
@@ -62,17 +62,17 @@ import kafka.api.{StopReplicaResponse, RequestOrResponse}
  *    it marks the topic for deletion retry.
  * @param controller
  * @param initialTopicsToBeDeleted The topics that are queued up for deletion in zookeeper at the time of controller failover
- * @param initialHaltedTopicsForDeletion The topics for which deletion is halted due to any of the conditions mentioned in #3 above
+ * @param initialTopicsIneligibleForDeletion The topics ineligible for deletion due to any of the conditions mentioned in #3 above
  */
 class TopicDeletionManager(controller: KafkaController,
                            initialTopicsToBeDeleted: Set[String] = Set.empty,
-                           initialHaltedTopicsForDeletion: Set[String] = Set.empty) extends Logging {
+                           initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
   val controllerContext = controller.controllerContext
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
   var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
-  var haltedTopicsForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
-    (initialHaltedTopicsForDeletion & initialTopicsToBeDeleted)
+  var topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
+    (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
   val deleteTopicsCond = controllerContext.controllerLock.newCondition()
   var deleteTopicStateChanged: Boolean = false
   var deleteTopicsThread: DeleteTopicsThread = null
@@ -92,7 +92,7 @@ class TopicDeletionManager(controller: KafkaController,
   def shutdown() {
     deleteTopicsThread.shutdown()
     topicsToBeDeleted.clear()
-    haltedTopicsForDeletion.clear()
+    topicsIneligibleForDeletion.clear()
   }
 
   /**
@@ -117,7 +117,7 @@ class TopicDeletionManager(controller: KafkaController,
   def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
     val topicsToResumeDeletion = topics & topicsToBeDeleted
     if(topicsToResumeDeletion.size > 0) {
-      haltedTopicsForDeletion --= topicsToResumeDeletion
+      topicsIneligibleForDeletion --= topicsToResumeDeletion
       resumeTopicDeletionThread()
     }
   }
@@ -125,8 +125,8 @@ class TopicDeletionManager(controller: KafkaController,
   /**
    * Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for
    * StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas
-   * are moved from ReplicaDeletionStarted to ReplicaDeletionFailed state. Also, the topic is added to the list of topics
-   * for which deletion is halted until further notice. The delete topic thread is notified so it can retry topic deletion
+   * are moved from ReplicaDeletionStarted to ReplicaDeletionIneligible state. Also, the topic is added to the list of topics
+   * ineligible for deletion until further notice. The delete topic thread is notified so it can retry topic deletion
    * if it has received a response for all replicas of a topic to be deleted
    * @param replicas Replicas for which deletion has failed
    */
@@ -136,8 +136,8 @@ class TopicDeletionManager(controller: KafkaController,
       val topics = replicasThatFailedToDelete.map(_.topic)
       debug("Deletion failed for replicas %s. Halting deletion for topics %s"
         .format(replicasThatFailedToDelete.mkString(","), topics))
-      controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionFailed)
-      haltTopicDeletion(topics)
+      controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
+      markTopicIneligibleForDeletion(topics)
       resumeTopicDeletionThread()
     }
   }
@@ -147,17 +147,17 @@ class TopicDeletionManager(controller: KafkaController,
    * 1. replicas being down
    * 2. partition reassignment in progress for some partitions of the topic
    * 3. preferred replica election in progress for some partitions of the topic
-   * @param topics Topics for which deletion should be halted. No op if the topic is was not previously queued up for deletion
+   * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
    */
-  def haltTopicDeletion(topics: Set[String]) {
+  def markTopicIneligibleForDeletion(topics: Set[String]) {
     val newTopicsToHaltDeletion = topicsToBeDeleted & topics
-    haltedTopicsForDeletion ++= newTopicsToHaltDeletion
+    topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
     if(newTopicsToHaltDeletion.size > 0)
       info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
   }
 
-  def isTopicDeletionHalted(topic: String): Boolean = {
-    haltedTopicsForDeletion.contains(topic)
+  def isTopicIneligibleForDeletion(topic: String): Boolean = {
+    topicsIneligibleForDeletion.contains(topic)
   }
 
   def isTopicDeletionInProgress(topic: String): Boolean = {
@@ -205,26 +205,29 @@ class TopicDeletionManager(controller: KafkaController,
    * Topic deletion can be retried if -
    * 1. Topic deletion is not already complete
    * 2. Topic deletion is currently not in progress for that topic
-   * 3. Topic deletion is currently halted for that topic
+   * 3. Topic is currently marked ineligible for deletion
    * @param topic Topic
    * @return Whether or not deletion can be retried for the topic
    */
   private def isTopicEligibleForDeletion(topic: String): Boolean = {
-    topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicDeletionHalted(topic))
+    topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicIneligibleForDeletion(topic))
   }
 
   /**
    * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
-   * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionFailed to OfflineReplica state
+   * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionIneligible to OfflineReplica state
    *@param topic Topic for which deletion should be retried
    */
   private def markTopicForDeletionRetry(topic: String) {
-    // reset replica states from ReplicaDeletionFailed to OfflineReplica
-    val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed)
+    // reset replica states from ReplicaDeletionIneligible to OfflineReplica
+    val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
     controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
   }
 
   private def completeDeleteTopic(topic: String) {
+    // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
+    // firing before the new topic listener when a deleted topic gets auto created
+    partitionStateMachine.deregisterPartitionChangeListener(topic)
     val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
     // controller will remove this replica from the state machine as well as its partition assignment cache
     replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
@@ -245,6 +248,8 @@ class TopicDeletionManager(controller: KafkaController,
    */
   private def onTopicDeletion(topics: Set[String]) {
     info("Topic deletion callback for %s".format(topics.mkString(",")))
+    // send update metadata so that brokers stop serving data for topics to be deleted
+    controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
     val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
     topics.foreach { topic =>
       onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
@@ -257,34 +262,32 @@ class TopicDeletionManager(controller: KafkaController,
    * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
    * is never retried. A topic is removed from the in progress list when
    * 1. Either the topic is successfully deleted OR
-   * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionFailed state
+   * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionIneligible state
    * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
    * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
    * the replicas a StopReplicaRequest (delete=true)
    * This callback does the following things -
    * 1. Send metadata request to all brokers excluding the topics to be deleted
-   * 2. Move all dead replicas directly to ReplicaDeletionFailed state. Also halt the deletion of respective topics if
-   *    some replicas are dead since it won't complete successfully anyway
+   * 2. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
+   *    for deletion if some replicas are dead since it won't complete successfully anyway
    * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
    *@param replicasForTopicsToBeDeleted
    */
   private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
     replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
-      // send update metadata so that brokers stop serving data
-      controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
       var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
       val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
       val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
       val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
       // move dead replicas directly to failed state
-      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionFailed)
+      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
       // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
       replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
         new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
       if(deadReplicasForTopic.size > 0)
-        haltTopicDeletion(Set(topic))
+        markTopicIneligibleForDeletion(Set(topic))
     }
   }
 
@@ -314,7 +317,7 @@ class TopicDeletionManager(controller: KafkaController,
       stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet
     val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
     inLock(controllerContext.controllerLock) {
-      // move all the failed replicas to ReplicaDeletionFailed
+      // move all the failed replicas to ReplicaDeletionIneligible
       failReplicaDeletion(replicasInError)
       if(replicasInError.size != stopReplicaResponse.responseMap.size) {
         // some replicas could have been successfully deleted
@@ -350,7 +353,7 @@ class TopicDeletionManager(controller: KafkaController,
               // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
               // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion
               // should be retried
-              val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed)
+              val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
               // mark topic for deletion retry
               markTopicForDeletionRetry(topic)
               info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
@@ -362,8 +365,8 @@ class TopicDeletionManager(controller: KafkaController,
             info("Deletion of topic %s (re)started".format(topic))
             // topic deletion will be kicked off
             onTopicDeletion(Set(topic))
-          } else if(isTopicDeletionHalted(topic)) {
-            info("Not retrying deletion of topic %s at this time since it is halted".format(topic))
+          } else if(isTopicIneligibleForDeletion(topic)) {
+            info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c56ad50..ae2df20 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -145,7 +145,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
       // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are
       // currently being deleted by the controller
-      val topicsKnownToThisBroker = metadataCache.map{
+      val topicsKnownToThisBroker = metadataCache.map {
         case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
       val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map {
         case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
@@ -568,6 +568,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       partitionMetadataLock synchronized {
         uniqueTopics.map { topic =>
           if(metadataCache.keySet.map(_.topic).contains(topic)) {
+            debug("Topic %s exists in metadata cache on broker %d".format(topic, config.brokerId))
             val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
             val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
             val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
@@ -600,6 +601,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             }
             new TopicMetadata(topic, partitionMetadata)
           } else {
+            debug("Topic %s does not exist in metadata cache on broker %d".format(topic, config.brokerId))
             // topic doesn't exist, send appropriate error code
             new TopicMetadata(topic, Seq.empty[PartitionMetadata], ErrorMapping.UnknownTopicOrPartitionCode)
           }
@@ -621,6 +623,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             }
             topicsMetadata += new TopicMetadata(topicMetadata.topic, topicMetadata.partitionsMetadata, ErrorMapping.LeaderNotAvailableCode)
           } else {
+            debug("Auto create topic skipped for %s".format(topicMetadata.topic))
             topicsMetadata += topicMetadata
           }
         case _ =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/1a7048d4/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 974b057..dbe078c 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -296,9 +296,8 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
   def testDeleteTopicDuringAddPartition() {
     val topic = "test"
     val servers = createTestTopicAndCluster(topic)
-    // add partitions to topic
-    val topicAndPartition = TopicAndPartition(topic, 0)
     val newPartition = TopicAndPartition(topic, 1)
+    // add partitions to topic
     AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
     // start topic deletion
     AdminUtils.deleteTopic(zkClient, topic)
@@ -366,6 +365,66 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     servers.foreach(_.shutdown())
   }
 
+  @Test
+  def testAutoCreateAfterDeleteTopic() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    verifyTopicDeletion(topic, servers)
+    // test if first produce request after topic deletion auto creates the topic
+    val props = new Properties()
+    props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("producer.type", "sync")
+    props.put("request.required.acks", "1")
+    props.put("message.send.max.retries", "1")
+    val producerConfig = new ProducerConfig(props)
+    val producer = new Producer[String, String](producerConfig)
+    try{
+      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
+    } catch {
+      case e: FailedToSendMessageException => fail("Topic should have been auto created")
+      case oe: Throwable => fail("fails with exception", oe)
+    }
+    // test the topic path exists
+    assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+    // wait until leader is elected
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
+    try {
+      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
+    } catch {
+      case e: FailedToSendMessageException => fail("Topic should have been auto created")
+      case oe: Throwable => fail("fails with exception", oe)
+    } finally {
+      producer.close()
+    }
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testDeleteNonExistingTopic() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, "test2")
+    // verify delete topic path for test2 is removed from zookeeper
+    verifyTopicDeletion("test2", servers)
+    // verify that topic test is untouched
+    assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+      res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
+    // test the topic path exists
+    assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+    // topic test should have a leader
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
+    servers.foreach(_.shutdown())
+
+  }
+
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)


Mime
View raw message