kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [1/2] kafka git commit: MINOR: Eliminate unnecessary Topic(And)Partition allocations in Controller
Date Tue, 07 Nov 2017 09:55:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 58138126c -> 3735a6ca8


http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index e41007b..2156a67 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -17,12 +17,12 @@
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
-import kafka.common.{StateChangeFailedException, TopicAndPartition}
-import kafka.controller.Callbacks.CallbackBuilder
+import kafka.common.StateChangeFailedException
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 
 import scala.collection.mutable
@@ -82,7 +82,7 @@ class ReplicaStateMachine(config: KafkaConfig,
   private def initializeReplicaState() {
     controllerContext.partitionReplicaAssignment.foreach { case (partition, replicas) =>
       replicas.foreach { replicaId =>
-        val partitionAndReplica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+        val partitionAndReplica = PartitionAndReplica(partition, replicaId)
         if (controllerContext.isReplicaOnline(replicaId, partition))
           replicaState.put(partitionAndReplica, OnlineReplica)
         else
@@ -95,12 +95,12 @@ class ReplicaStateMachine(config: KafkaConfig,
   }
 
   def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState,
-                         callbacks: Callbacks = (new CallbackBuilder).build): Unit = {
+                         callbacks: Callbacks = new Callbacks()): Unit = {
     if (replicas.nonEmpty) {
       try {
         controllerBrokerRequestBatch.newBatch()
         replicas.groupBy(_.replica).map { case (replicaId, replicas) =>
-          val partitions = replicas.map(_.topicAndPartition)
+          val partitions = replicas.map(_.topicPartition)
           doHandleStateChanges(replicaId, partitions, targetState, callbacks)
         }
         controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch)
@@ -145,16 +145,16 @@ class ReplicaStateMachine(config: KafkaConfig,
    * @param partitions The partitions on this replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
-  private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicAndPartition], targetState: ReplicaState,
+  private def doHandleStateChanges(replicaId: Int, partitions: Seq[TopicPartition], targetState: ReplicaState,
                                    callbacks: Callbacks): Unit = {
-    val replicas = partitions.map(partition => PartitionAndReplica(partition.topic, partition.partition, replicaId))
+    val replicas = partitions.map(partition => PartitionAndReplica(partition, replicaId))
     replicas.foreach(replica => replicaState.getOrElseUpdate(replica, NonExistentReplica))
     val (validReplicas, invalidReplicas) = replicas.partition(replica => isValidTransition(replica, targetState))
     invalidReplicas.foreach(replica => logInvalidTransition(replica, targetState))
     targetState match {
       case NewReplica =>
         validReplicas.foreach { replica =>
-          val partition = replica.topicAndPartition
+          val partition = replica.topicPartition
           controllerContext.partitionLeadershipInfo.get(partition) match {
             case Some(leaderIsrAndControllerEpoch) =>
               if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
@@ -162,10 +162,9 @@ class ReplicaStateMachine(config: KafkaConfig,
                 logFailedStateChange(replica, replicaState(replica), OfflineReplica, exception)
               } else {
                 controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
-                  replica.topic,
-                  replica.partition,
+                  replica.topicPartition,
                   leaderIsrAndControllerEpoch,
-                  controllerContext.partitionReplicaAssignment(replica.topicAndPartition),
+                  controllerContext.partitionReplicaAssignment(replica.topicPartition),
                   isNew = true)
                 logSuccessfulTransition(replicaId, partition, replicaState(replica), NewReplica)
                 replicaState.put(replica, NewReplica)
@@ -177,7 +176,7 @@ class ReplicaStateMachine(config: KafkaConfig,
         }
       case OnlineReplica =>
         validReplicas.foreach { replica =>
-          val partition = replica.topicAndPartition
+          val partition = replica.topicPartition
           replicaState(replica) match {
             case NewReplica =>
               val assignment = controllerContext.partitionReplicaAssignment(partition)
@@ -188,8 +187,7 @@ class ReplicaStateMachine(config: KafkaConfig,
               controllerContext.partitionLeadershipInfo.get(partition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
                   controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(replicaId),
-                    replica.topic,
-                    replica.partition,
+                    replica.topicPartition,
                     leaderIsrAndControllerEpoch,
                     controllerContext.partitionReplicaAssignment(partition), isNew = false)
                 case None =>
@@ -200,48 +198,47 @@ class ReplicaStateMachine(config: KafkaConfig,
         }
       case OfflineReplica =>
         validReplicas.foreach { replica =>
-          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topic, replica.partition, deletePartition = false, null)
+          controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId), replica.topicPartition,
+            deletePartition = false, (_, _) => ())
         }
-        val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicAndPartition))
-        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicAndPartition))
+        val replicasToRemoveFromIsr = validReplicas.filter(replica => controllerContext.partitionLeadershipInfo.contains(replica.topicPartition))
+        val updatedLeaderIsrAndControllerEpochs = removeReplicasFromIsr(replicaId, replicasToRemoveFromIsr.map(_.topicPartition))
         updatedLeaderIsrAndControllerEpochs.foreach { case (partition, leaderIsrAndControllerEpoch) =>
           if (!topicDeletionManager.isPartitionToBeDeleted(partition)) {
             val recipients = controllerContext.partitionReplicaAssignment(partition).filterNot(_ == replicaId)
             controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipients,
-              partition.topic,
-              partition.partition,
+              partition,
               leaderIsrAndControllerEpoch,
               controllerContext.partitionReplicaAssignment(partition), isNew = false)
           }
-          val replica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+          val replica = PartitionAndReplica(partition, replicaId)
           logSuccessfulTransition(replicaId, partition, replicaState(replica), OfflineReplica)
           replicaState.put(replica, OfflineReplica)
         }
       case ReplicaDeletionStarted =>
         validReplicas.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionStarted)
+          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionStarted)
           replicaState.put(replica, ReplicaDeletionStarted)
           controllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(replicaId),
-            replica.topic,
-            replica.partition,
+            replica.topicPartition,
             deletePartition = true,
             callbacks.stopReplicaResponseCallback)
         }
       case ReplicaDeletionIneligible =>
         validReplicas.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionIneligible)
+          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionIneligible)
           replicaState.put(replica, ReplicaDeletionIneligible)
         }
       case ReplicaDeletionSuccessful =>
         validReplicas.foreach { replica =>
-          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), ReplicaDeletionSuccessful)
+          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), ReplicaDeletionSuccessful)
           replicaState.put(replica, ReplicaDeletionSuccessful)
         }
       case NonExistentReplica =>
         validReplicas.foreach { replica =>
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicAndPartition)
-          controllerContext.partitionReplicaAssignment.put(replica.topicAndPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
-          logSuccessfulTransition(replicaId, replica.topicAndPartition, replicaState(replica), NonExistentReplica)
+          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(replica.topicPartition)
+          controllerContext.partitionReplicaAssignment.put(replica.topicPartition, currentAssignedReplicas.filterNot(_ == replica.replica))
+          logSuccessfulTransition(replicaId, replica.topicPartition, replicaState(replica), NonExistentReplica)
           replicaState.remove(replica)
         }
     }
@@ -254,16 +251,16 @@ class ReplicaStateMachine(config: KafkaConfig,
    * @param partitions The partitions from which we're trying to remove the replica from isr
    * @return The updated LeaderIsrAndControllerEpochs of all partitions for which we successfully removed the replica from isr.
    */
-  private def removeReplicasFromIsr(replicaId: Int, partitions: Seq[TopicAndPartition]):
-  Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
-    var results = Map.empty[TopicAndPartition, LeaderIsrAndControllerEpoch]
+  private def removeReplicasFromIsr(replicaId: Int, partitions: Seq[TopicPartition]):
+  Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
+    var results = Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]
     var remaining = partitions
     while (remaining.nonEmpty) {
       val (successfulRemovals, removalsToRetry, failedRemovals) = doRemoveReplicasFromIsr(replicaId, remaining)
       results ++= successfulRemovals
       remaining = removalsToRetry
       failedRemovals.foreach { case (partition, e) =>
-        val replica = PartitionAndReplica(partition.topic, partition.partition, replicaId)
+        val replica = PartitionAndReplica(partition, replicaId)
         logFailedStateChange(replica, replicaState(replica), OfflineReplica, e)
       }
     }
@@ -282,10 +279,10 @@ class ReplicaStateMachine(config: KafkaConfig,
    *         the partition leader updated partition state while the controller attempted to update partition state.
    *         3. Exceptions corresponding to failed removals that should not be retried.
    */
-  private def doRemoveReplicasFromIsr(replicaId: Int, partitions: Seq[TopicAndPartition]):
-  (Map[TopicAndPartition, LeaderIsrAndControllerEpoch],
-    Seq[TopicAndPartition],
-    Map[TopicAndPartition, Exception]) = {
+  private def doRemoveReplicasFromIsr(replicaId: Int, partitions: Seq[TopicPartition]):
+  (Map[TopicPartition, LeaderIsrAndControllerEpoch],
+    Seq[TopicPartition],
+    Map[TopicPartition, Exception]) = {
     val (leaderAndIsrs, partitionsWithNoLeaderAndIsrInZk, failedStateReads) = getTopicPartitionStatesFromZk(partitions)
     val (leaderAndIsrsWithReplica, leaderAndIsrsWithoutReplica) = leaderAndIsrs.partition { case (partition, leaderAndIsr) => leaderAndIsr.isr.contains(replicaId) }
     val adjustedLeaderAndIsrs = leaderAndIsrsWithReplica.mapValues { leaderAndIsr =>
@@ -318,13 +315,13 @@ class ReplicaStateMachine(config: KafkaConfig,
    *         didn't finish partition initialization.
    *         3. Exceptions corresponding to failed zookeeper lookups or states whose controller epoch exceeds our current epoch.
    */
-  private def getTopicPartitionStatesFromZk(partitions: Seq[TopicAndPartition]):
-  (Map[TopicAndPartition, LeaderAndIsr],
-    Seq[TopicAndPartition],
-    Map[TopicAndPartition, Exception]) = {
-    val leaderAndIsrs = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
-    val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicAndPartition]
-    val failed = mutable.Map.empty[TopicAndPartition, Exception]
+  private def getTopicPartitionStatesFromZk(partitions: Seq[TopicPartition]):
+  (Map[TopicPartition, LeaderAndIsr],
+    Seq[TopicPartition],
+    Map[TopicPartition, Exception]) = {
+    val leaderAndIsrs = mutable.Map.empty[TopicPartition, LeaderAndIsr]
+    val partitionsWithNoLeaderAndIsrInZk = mutable.Buffer.empty[TopicPartition]
+    val failed = mutable.Map.empty[TopicPartition, Exception]
     val getDataResponses = try {
       zkClient.getTopicPartitionStatesRaw(partitions)
     } catch {
@@ -333,7 +330,7 @@ class ReplicaStateMachine(config: KafkaConfig,
         return (leaderAndIsrs.toMap, partitionsWithNoLeaderAndIsrInZk, failed.toMap)
     }
     getDataResponses.foreach { getDataResponse =>
-      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
       if (getDataResponse.resultCode == Code.OK) {
         val leaderIsrAndControllerEpochOpt = TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat)
         if (leaderIsrAndControllerEpochOpt.isEmpty) {
@@ -377,7 +374,7 @@ class ReplicaStateMachine(config: KafkaConfig,
   private def isValidTransition(replica: PartitionAndReplica, targetState: ReplicaState) =
     targetState.validPreviousStates.contains(replicaState(replica))
 
-  private def logSuccessfulTransition(replicaId: Int, partition: TopicAndPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
+  private def logSuccessfulTransition(replicaId: Int, partition: TopicPartition, currState: ReplicaState, targetState: ReplicaState): Unit = {
     stateChangeLogger.withControllerEpoch(controllerContext.epoch)
       .trace(s"Changed state of replica $replicaId for partition $partition from $currState to $targetState")
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 2e93f9d..eaf6b09 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -16,10 +16,9 @@
  */
 package kafka.controller
 
-
-import kafka.common.TopicAndPartition
 import kafka.utils.Logging
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.TopicPartition
 
 import scala.collection.{Set, mutable}
 
@@ -63,7 +62,7 @@ class TopicDeletionManager(controller: KafkaController,
   val controllerContext = controller.controllerContext
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
   val topicsToBeDeleted = mutable.Set.empty[String]
-  val partitionsToBeDeleted = mutable.Set.empty[TopicAndPartition]
+  val partitionsToBeDeleted = mutable.Set.empty[TopicPartition]
   val topicsIneligibleForDeletion = mutable.Set.empty[String]
 
   def init(initialTopicsToBeDeleted: Set[String], initialTopicsIneligibleForDeletion: Set[String]): Unit = {
@@ -175,7 +174,7 @@ class TopicDeletionManager(controller: KafkaController,
       false
   }
 
-  def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition) = {
+  def isPartitionToBeDeleted(topicAndPartition: TopicPartition) = {
     if(isDeleteTopicEnabled) {
       partitionsToBeDeleted.contains(topicAndPartition)
     } else
@@ -292,8 +291,8 @@ class TopicDeletionManager(controller: KafkaController,
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, OfflineReplica)
       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry.toSeq, ReplicaDeletionStarted,
-        new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
-          eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))).build)
+        new Callbacks(stopReplicaResponseCallback = (stopReplicaResponseObj, replicaId) =>
+          eventManager.put(controller.TopicDeletionStopReplicaResponseReceived(stopReplicaResponseObj, replicaId))))
       if (deadReplicasForTopic.nonEmpty) {
         debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
         markTopicIneligibleForDeletion(Set(topic))
@@ -312,7 +311,7 @@ class TopicDeletionManager(controller: KafkaController,
    * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
    *    will delete all persistent data from all replicas of the respective partitions
    */
-  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
+  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicPartition]) {
     info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
     val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
     startReplicaDeletion(replicasPerPartition)
@@ -335,7 +334,7 @@ class TopicDeletionManager(controller: KafkaController,
           // ignore since topic deletion is in progress
           val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
           val replicaIds = replicasInDeletionStartedState.map(_.replica)
-          val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
+          val partitions = replicasInDeletionStartedState.map(_.topicPartition)
           info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
             partitions.mkString(","), topic))
         } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index e79a6e3..da61077 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -28,7 +28,7 @@ import kafka.message.UncompressedCodec
 import kafka.server.Defaults
 import kafka.server.ReplicaManager
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
-import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
+import kafka.utils.{Logging, Pool, Scheduler}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 4637521..3f8ac49 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
-import kafka.common.{OffsetAndMetadata, OffsetMetadata, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, OffsetMetadata}
 import kafka.server.QuotaFactory.{QuotaManagers, UnboundedQuota}
 import kafka.controller.{KafkaController}
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
@@ -37,7 +37,7 @@ import kafka.network.RequestChannel
 import kafka.network.RequestChannel.{CloseConnectionAction, NoOpAction, SendAction}
 import kafka.security.SecurityUtils
 import kafka.security.auth.{Resource, _}
-import kafka.utils.{CoreUtils, Logging, ZKGroupTopicDirs, ZkUtils}
+import kafka.utils.{CoreUtils, Logging, ZkUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.FatalExitError
@@ -236,10 +236,10 @@ class KafkaApis(val requestChannel: RequestChannel,
     val controlledShutdownRequest = request.body[ControlledShutdownRequest]
     authorizeClusterAction(request)
 
-    def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicAndPartition]]): Unit = {
+    def controlledShutdownCallback(controlledShutdownResult: Try[Set[TopicPartition]]): Unit = {
       val response = controlledShutdownResult match {
         case Success(partitionsRemaining) =>
-          new ControlledShutdownResponse(Errors.NONE, partitionsRemaining.map(_.asTopicPartition).asJava)
+          new ControlledShutdownResponse(Errors.NONE, partitionsRemaining.asJava)
 
         case Failure(throwable) =>
           controlledShutdownRequest.getErrorResponse(throwable)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/utils/LogDirUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/LogDirUtils.scala b/core/src/main/scala/kafka/utils/LogDirUtils.scala
index 8457ce5..669edf9 100644
--- a/core/src/main/scala/kafka/utils/LogDirUtils.scala
+++ b/core/src/main/scala/kafka/utils/LogDirUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.utils
 
-import kafka.controller.LogDirEventNotificationListener
+import kafka.controller.LogDirEventNotificationHandler
 import scala.collection.Map
 
 object LogDirUtils extends Logging {
@@ -32,7 +32,7 @@ object LogDirUtils extends Logging {
   }
 
   private def logDirFailureEventZkData(brokerId: Int): String = {
-    Json.encode(Map("version" -> LogDirEventNotificationListener.version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
+    Json.encode(Map("version" -> LogDirEventNotificationHandler.Version, "broker" -> brokerId, "event" -> LogDirFailureEvent))
   }
 
   def deleteLogDirEvents(zkUtils: ZkUtils) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index cc08055..1a0633c 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,7 +18,7 @@
 package kafka.utils
 
 import kafka.api.LeaderAndIsr
-import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
 import kafka.utils.ZkUtils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
@@ -88,7 +88,7 @@ object ReplicationUtils extends Logging {
 
   private def generateIsrChangeJson(isrChanges: Set[TopicPartition]): String = {
     val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray
-    Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitions))
+    Json.encode(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitions))
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/zk/KafkaZkClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 90d53d4..3267a74 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -21,7 +21,6 @@ import java.util.Properties
 
 import kafka.api.LeaderAndIsr
 import kafka.cluster.Broker
-import kafka.common.TopicAndPartition
 import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.server.ConfigType
@@ -29,7 +28,7 @@ import kafka.utils._
 import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
-import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.data.{ACL, Stat}
 import org.apache.zookeeper.{CreateMode, KeeperException}
 
 import scala.collection.mutable
@@ -53,7 +52,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param partitions the partitions for which we want ot get states.
    * @return sequence of GetDataResponses whose contexts are the partitions they are associated with.
    */
-  def getTopicPartitionStatesRaw(partitions: Seq[TopicAndPartition]): Seq[GetDataResponse] = {
+  def getTopicPartitionStatesRaw(partitions: Seq[TopicPartition]): Seq[GetDataResponse] = {
     val getDataRequests = partitions.map { partition =>
       GetDataRequest(TopicPartitionStateZNode.path(partition), ctx = Some(partition))
     }
@@ -65,7 +64,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
    * @return sequence of SetDataResponse whose contexts are the partitions they are associated with.
    */
-  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
+  def setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[SetDataResponse] = {
     val setDataRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
       val path = TopicPartitionStateZNode.path(partition)
       val data = TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch)
@@ -79,7 +78,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param leaderIsrAndControllerEpochs the partition states of each partition whose state we wish to set.
    * @return sequence of CreateResponse whose contexts are the partitions they are associated with.
    */
-  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicAndPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
+  def createTopicPartitionStatesRaw(leaderIsrAndControllerEpochs: Map[TopicPartition, LeaderIsrAndControllerEpoch]): Seq[CreateResponse] = {
     createTopicPartitions(leaderIsrAndControllerEpochs.keys.map(_.topic).toSet.toSeq)
     createTopicPartition(leaderIsrAndControllerEpochs.keys.toSeq)
     val createRequests = leaderIsrAndControllerEpochs.map { case (partition, leaderIsrAndControllerEpoch) =>
@@ -118,10 +117,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param controllerEpoch The current controller epoch.
    * @return UpdateLeaderAndIsrResult instance containing per partition results.
    */
-  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicAndPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
-    val successfulUpdates = mutable.Map.empty[TopicAndPartition, LeaderAndIsr]
-    val updatesToRetry = mutable.Buffer.empty[TopicAndPartition]
-    val failed = mutable.Map.empty[TopicAndPartition, Exception]
+  def updateLeaderAndIsr(leaderAndIsrs: Map[TopicPartition, LeaderAndIsr], controllerEpoch: Int): UpdateLeaderAndIsrResult = {
+    val successfulUpdates = mutable.Map.empty[TopicPartition, LeaderAndIsr]
+    val updatesToRetry = mutable.Buffer.empty[TopicPartition]
+    val failed = mutable.Map.empty[TopicPartition, Exception]
     val leaderIsrAndControllerEpochs = leaderAndIsrs.map { case (partition, leaderAndIsr) => partition -> LeaderIsrAndControllerEpoch(leaderAndIsr, controllerEpoch) }
     val setDataResponses = try {
       setTopicPartitionStatesRaw(leaderIsrAndControllerEpochs)
@@ -131,7 +130,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
         return UpdateLeaderAndIsrResult(successfulUpdates.toMap, updatesToRetry, failed.toMap)
     }
     setDataResponses.foreach { setDataResponse =>
-      val partition = setDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      val partition = setDataResponse.ctx.get.asInstanceOf[TopicPartition]
       if (setDataResponse.resultCode == Code.OK) {
         val updatedLeaderAndIsr = leaderAndIsrs(partition).withZkVersion(setDataResponse.stat.getVersion)
         successfulUpdates.put(partition, updatedLeaderAndIsr)
@@ -227,7 +226,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param assignment the partition to replica mapping to set for the given topic
    * @return SetDataResponse
    */
-  def setTopicAssignmentRaw(topic: String, assignment: Map[TopicAndPartition, Seq[Int]]): SetDataResponse = {
+  def setTopicAssignmentRaw(topic: String, assignment: collection.Map[TopicPartition, Seq[Int]]): SetDataResponse = {
     val setDataRequest = SetDataRequest(TopicZNode.path(topic), TopicZNode.encode(assignment), ZkVersion.NoVersion)
     retryRequestUntilConnected(setDataRequest)
   }
@@ -296,7 +295,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param topics the topics whose partitions we wish to get the assignments for.
    * @return the replica assignment for each partition from the given topics.
    */
-  def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicAndPartition, Seq[Int]] = {
+  def getReplicaAssignmentForTopics(topics: Set[String]): Map[TopicPartition, Seq[Int]] = {
     val getDataRequests = topics.map(topic => GetDataRequest(TopicZNode.path(topic), ctx = Some(topic)))
     val getDataResponses = retryRequestsUntilConnected(getDataRequests.toSeq)
     getDataResponses.flatMap { getDataResponse =>
@@ -304,7 +303,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
       if (getDataResponse.resultCode == Code.OK) {
         TopicZNode.decode(topic, getDataResponse.data)
       } else if (getDataResponse.resultCode == Code.NONODE) {
-        Map.empty[TopicAndPartition, Seq[Int]]
+        Map.empty[TopicPartition, Seq[Int]]
       } else {
         throw getDataResponse.resultException.get
       }
@@ -418,13 +417,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * Returns all reassignments.
    * @return the reassignments for each partition.
    */
-  def getPartitionReassignment: Map[TopicAndPartition, Seq[Int]] = {
+  def getPartitionReassignment: Map[TopicPartition, Seq[Int]] = {
     val getDataRequest = GetDataRequest(ReassignPartitionsZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     if (getDataResponse.resultCode == Code.OK) {
       ReassignPartitionsZNode.decode(getDataResponse.data)
     } else if (getDataResponse.resultCode == Code.NONODE) {
-      Map.empty[TopicAndPartition, Seq[Int]]
+      Map.empty[TopicPartition, Seq[Int]]
     } else {
       throw getDataResponse.resultException.get
     }
@@ -437,7 +436,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param reassignment the reassignment to set on the reassignment znode
    * @throws KeeperException if there is an error while setting or creating the znode
    */
-  def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicAndPartition, Seq[Int]]): Unit = {
+  def setOrCreatePartitionReassignment(reassignment: collection.Map[TopicPartition, Seq[Int]]): Unit = {
 
     def set(reassignmentData: Array[Byte]): SetDataResponse = {
       val setDataRequest = SetDataRequest(ReassignPartitionsZNode.path, reassignmentData, ZkVersion.NoVersion)
@@ -473,10 +472,10 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param partitions the partitions for which we want ot get states.
    * @return map containing LeaderIsrAndControllerEpoch of each partition for we were able to lookup the partition state.
    */
-  def getTopicPartitionStates(partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = {
+  def getTopicPartitionStates(partitions: Seq[TopicPartition]): Map[TopicPartition, LeaderIsrAndControllerEpoch] = {
     val getDataResponses = getTopicPartitionStatesRaw(partitions)
     getDataResponses.flatMap { getDataResponse =>
-      val partition = getDataResponse.ctx.get.asInstanceOf[TopicAndPartition]
+      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
       if (getDataResponse.resultCode == Code.OK) {
         TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat).map(partition -> _)
       } else if (getDataResponse.resultCode == Code.NONODE) {
@@ -507,7 +506,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * @param sequenceNumbers the sequence numbers associated with the isr change notifications.
    * @return partitions associated with the given isr change notifications.
    */
-  def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicAndPartition] = {
+  def getPartitionsFromIsrChangeNotifications(sequenceNumbers: Seq[String]): Seq[TopicPartition] = {
     val getDataRequests = sequenceNumbers.map { sequenceNumber =>
       GetDataRequest(IsrChangeNotificationSequenceZNode.path(sequenceNumber))
     }
@@ -550,13 +549,13 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
    * Gets the partitions marked for preferred replica election.
    * @return sequence of partitions.
    */
-  def getPreferredReplicaElection: Set[TopicAndPartition] = {
+  def getPreferredReplicaElection: Set[TopicPartition] = {
     val getDataRequest = GetDataRequest(PreferredReplicaElectionZNode.path)
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     if (getDataResponse.resultCode == Code.OK) {
       PreferredReplicaElectionZNode.decode(getDataResponse.data)
     } else if (getDataResponse.resultCode == Code.NONODE) {
-      Set.empty[TopicAndPartition]
+      Set.empty[TopicPartition]
     } else {
       throw getDataResponse.resultException.get
     }
@@ -775,7 +774,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     }
   }
 
-  private def createTopicPartition(partitions: Seq[TopicAndPartition]) = {
+  private def createTopicPartition(partitions: Seq[TopicPartition]): Seq[CreateResponse] = {
     val createRequests = partitions.map { partition =>
       val path = TopicPartitionZNode.path(partition)
       CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(partition))
@@ -783,7 +782,7 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     retryRequestsUntilConnected(createRequests)
   }
 
-  private def createTopicPartitions(topics: Seq[String]) = {
+  private def createTopicPartitions(topics: Seq[String]): Seq[CreateResponse] = {
     val createRequests = topics.map { topic =>
       val path = TopicPartitionsZNode.path(topic)
       CreateRequest(path, null, acls(path), CreateMode.PERSISTENT, Some(topic))
@@ -791,14 +790,14 @@ class KafkaZkClient(zooKeeperClient: ZooKeeperClient, isSecure: Boolean) extends
     retryRequestsUntilConnected(createRequests)
   }
 
-  private def getTopicConfigs(topics: Seq[String]) = {
+  private def getTopicConfigs(topics: Seq[String]): Seq[GetDataResponse] = {
     val getDataRequests = topics.map { topic =>
       GetDataRequest(ConfigEntityZNode.path(ConfigType.Topic, topic), ctx = Some(topic))
     }
     retryRequestsUntilConnected(getDataRequests)
   }
 
-  private def acls(path: String) = {
+  private def acls(path: String): Seq[ACL] = {
     import scala.collection.JavaConverters._
     ZkUtils.defaultAcls(isSecure, path).asScala
   }
@@ -892,7 +891,7 @@ object KafkaZkClient {
    *                      update partition state.
    * @param failedPartitions Exceptions corresponding to failed partition state updates.
    */
-  case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicAndPartition, LeaderAndIsr],
-                                      partitionsToRetry: Seq[TopicAndPartition],
-                                      failedPartitions: Map[TopicAndPartition, Exception])
+  case class UpdateLeaderAndIsrResult(successfulPartitions: Map[TopicPartition, LeaderAndIsr],
+                                      partitionsToRetry: Seq[TopicPartition],
+                                      failedPartitions: Map[TopicPartition, Exception])
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/main/scala/kafka/zk/ZkData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index e46f438..a1ff559 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -21,9 +21,9 @@ import java.util.Properties
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
-import kafka.common.TopicAndPartition
-import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
+import kafka.controller.{IsrChangeNotificationHandler, LeaderIsrAndControllerEpoch}
 import kafka.utils.Json
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.data.Stat
 
 import scala.collection.Seq
@@ -82,17 +82,17 @@ object TopicsZNode {
 
 object TopicZNode {
   def path(topic: String) = s"${TopicsZNode.path}/$topic"
-  def encode(assignment: Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
+  def encode(assignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
     val assignmentJson = assignment.map { case (partition, replicas) => partition.partition.toString -> replicas }
     Json.encodeAsBytes(Map("version" -> 1, "partitions" -> assignmentJson))
   }
-  def decode(topic: String, bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = {
+  def decode(topic: String, bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = {
     Json.parseBytes(bytes).flatMap { js =>
       val assignmentJson = js.asJsonObject
       val partitionsJsonOpt = assignmentJson.get("partitions").map(_.asJsonObject)
       partitionsJsonOpt.map { partitionsJson =>
         partitionsJson.iterator.map { case (partition, replicas) =>
-          TopicAndPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
+          new TopicPartition(topic, partition.toInt) -> replicas.to[Seq[Int]]
         }
       }
     }.map(_.toMap).getOrElse(Map.empty)
@@ -104,11 +104,11 @@ object TopicPartitionsZNode {
 }
 
 object TopicPartitionZNode {
-  def path(partition: TopicAndPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
+  def path(partition: TopicPartition) = s"${TopicPartitionsZNode.path(partition.topic)}/${partition.partition}"
 }
 
 object TopicPartitionStateZNode {
-  def path(partition: TopicAndPartition) = s"${TopicPartitionZNode.path(partition)}/state"
+  def path(partition: TopicPartition) = s"${TopicPartitionZNode.path(partition)}/state"
   def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = {
     val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
     val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
@@ -155,19 +155,19 @@ object IsrChangeNotificationZNode {
 object IsrChangeNotificationSequenceZNode {
   val SequenceNumberPrefix = "isr_change_"
   def path(sequenceNumber: String) = s"${IsrChangeNotificationZNode.path}/$SequenceNumberPrefix$sequenceNumber"
-  def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+  def encode(partitions: Set[TopicPartition]): Array[Byte] = {
     val partitionsJson = partitions.map(partition => Map("topic" -> partition.topic, "partition" -> partition.partition))
-    Json.encodeAsBytes(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitionsJson))
+    Json.encodeAsBytes(Map("version" -> IsrChangeNotificationHandler.Version, "partitions" -> partitionsJson))
   }
 
-  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = {
+  def decode(bytes: Array[Byte]): Set[TopicPartition] = {
     Json.parseBytes(bytes).map { js =>
       val partitionsJson = js.asJsonObject("partitions").asJsonArray
       partitionsJson.iterator.map { partitionsJson =>
         val partitionJson = partitionsJson.asJsonObject
         val topic = partitionJson("topic").to[String]
         val partition = partitionJson("partition").to[Int]
-        TopicAndPartition(topic, partition)
+        new TopicPartition(topic, partition)
       }
     }
   }.map(_.toSet).getOrElse(Set.empty)
@@ -204,13 +204,13 @@ object DeleteTopicsTopicZNode {
 
 object ReassignPartitionsZNode {
   def path = s"${AdminZNode.path}/reassign_partitions"
-  def encode(reassignment: collection.Map[TopicAndPartition, Seq[Int]]): Array[Byte] = {
-    val reassignmentJson = reassignment.map { case (TopicAndPartition(topic, partition), replicas) =>
-      Map("topic" -> topic, "partition" -> partition, "replicas" -> replicas)
+  def encode(reassignment: collection.Map[TopicPartition, Seq[Int]]): Array[Byte] = {
+    val reassignmentJson = reassignment.map { case (tp, replicas) =>
+      Map("topic" -> tp.topic, "partition" -> tp.partition, "replicas" -> replicas)
     }
     Json.encodeAsBytes(Map("version" -> 1, "partitions" -> reassignmentJson))
   }
-  def decode(bytes: Array[Byte]): Map[TopicAndPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
+  def decode(bytes: Array[Byte]): Map[TopicPartition, Seq[Int]] = Json.parseBytes(bytes).flatMap { js =>
     val reassignmentJson = js.asJsonObject
     val partitionsJsonOpt = reassignmentJson.get("partitions")
     partitionsJsonOpt.map { partitionsJson =>
@@ -219,7 +219,7 @@ object ReassignPartitionsZNode {
         val topic = partitionFields("topic").to[String]
         val partition = partitionFields("partition").to[Int]
         val replicas = partitionFields("replicas").to[Seq[Int]]
-        TopicAndPartition(topic, partition) -> replicas
+        new TopicPartition(topic, partition) -> replicas
       }
     }
   }.map(_.toMap).getOrElse(Map.empty)
@@ -227,18 +227,18 @@ object ReassignPartitionsZNode {
 
 object PreferredReplicaElectionZNode {
   def path = s"${AdminZNode.path}/preferred_replica_election"
-  def encode(partitions: Set[TopicAndPartition]): Array[Byte] = {
+  def encode(partitions: Set[TopicPartition]): Array[Byte] = {
     val jsonMap = Map("version" -> 1,
       "partitions" -> partitions.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)))
     Json.encodeAsBytes(jsonMap)
   }
-  def decode(bytes: Array[Byte]): Set[TopicAndPartition] = Json.parseBytes(bytes).map { js =>
+  def decode(bytes: Array[Byte]): Set[TopicPartition] = Json.parseBytes(bytes).map { js =>
     val partitionsJson = js.asJsonObject("partitions").asJsonArray
     partitionsJson.iterator.map { partitionsJson =>
       val partitionJson = partitionsJson.asJsonObject
       val topic = partitionJson("topic").to[String]
       val partition = partitionJson("partition").to[Int]
-      TopicAndPartition(topic, partition)
+      new TopicPartition(topic, partition)
     }
   }.map(_.toSet).getOrElse(Set.empty)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 7f4eed7..2f520a3 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -369,8 +369,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
 
     val controllerId = zkUtils.getController()
     val controller = servers.find(p => p.config.brokerId == controllerId).get.kafkaController
-    val resultQueue = new LinkedBlockingQueue[Try[Set[TopicAndPartition]]]()
-    val controlledShutdownCallback = (controlledShutdownResult: Try[Set[TopicAndPartition]]) => resultQueue.put(controlledShutdownResult)
+    val resultQueue = new LinkedBlockingQueue[Try[Set[TopicPartition]]]()
+    val controlledShutdownCallback = (controlledShutdownResult: Try[Set[TopicPartition]]) => resultQueue.put(controlledShutdownResult)
     controller.shutdownBroker(2, controlledShutdownCallback)
     var partitionsRemaining = resultQueue.take().get
     var activeServers = servers.filter(s => s.config.brokerId != 2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 06ddd66..78f022a 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -25,7 +25,7 @@ import org.junit.Assert._
 import org.junit.{After, Test}
 import java.util.Properties
 
-import kafka.common.{TopicAlreadyMarkedForDeletionException, TopicAndPartition}
+import kafka.common.{TopicAndPartition, TopicAlreadyMarkedForDeletionException}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
 
@@ -43,8 +43,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
 
   @Test
   def testDeleteTopicWithAllAliveReplicas() {
-    val topicPartition = new TopicPartition("test", 0)
-    val topic = topicPartition.topic
+    val topic = "test"
     servers = createTestTopicAndCluster(topic)
     // start topic deletion
     AdminUtils.deleteTopic(zkUtils, topic)
@@ -128,7 +127,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     // reassign partition 0
     val oldAssignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     val newReplicas = Seq(1, 2, 3)
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None, Map(new TopicAndPartition(topicPartition) -> newReplicas))
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkUtils, None,
+      Map(new TopicAndPartition(topicPartition) -> newReplicas))
     assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
@@ -139,7 +139,7 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     val controllerId = zkUtils.getController()
     val controller = servers.filter(s => s.config.brokerId == controllerId).head
     assertFalse("Partition reassignment should fail",
-      controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(new TopicAndPartition(topicPartition)))
+      controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicPartition))
     val assignedReplicas = zkUtils.getReplicasForPartition(topic, 0)
     assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
     follower.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
index 446d8ae..04f9bea 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerFailoverTest.scala
@@ -21,10 +21,10 @@ import java.util.Properties
 import java.util.concurrent.CountDownLatch
 
 import kafka.admin.AdminUtils
-import kafka.common.TopicAndPartition
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.log4j.Logger
 import org.junit.{After, Test}
@@ -61,7 +61,7 @@ class ControllerFailoverTest extends KafkaServerTestHarness with Logging {
     val initialEpoch = initialController.epoch
     // Create topic with one partition
     AdminUtils.createTopic(servers.head.zkUtils, topic, 1, 1)
-    val topicPartition = TopicAndPartition("topic1", 0)
+    val topicPartition = new TopicPartition("topic1", 0)
     TestUtils.waitUntilTrue(() =>
       initialController.partitionStateMachine.partitionsInState(OnlinePartition).contains(topicPartition),
       s"Partition $topicPartition did not transition to online state")

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 81df1e1..2e56c33 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -17,13 +17,13 @@
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
 import kafka.zookeeper.{CreateResponse, GetDataResponse, ZooKeeperClientException}
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -38,13 +38,13 @@ class PartitionStateMachineTest extends JUnitSuite {
   private var mockZkClient: KafkaZkClient = null
   private var mockControllerBrokerRequestBatch: ControllerBrokerRequestBatch = null
   private var mockTopicDeletionManager: TopicDeletionManager = null
-  private var partitionState: mutable.Map[TopicAndPartition, PartitionState] = null
+  private var partitionState: mutable.Map[TopicPartition, PartitionState] = null
   private var partitionStateMachine: PartitionStateMachine = null
 
   private val brokerId = 5
   private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
   private val controllerEpoch = 50
-  private val partition = TopicAndPartition("t", 0)
+  private val partition = new TopicPartition("t", 0)
   private val partitions = Seq(partition)
 
   @Before
@@ -54,7 +54,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     mockZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     mockControllerBrokerRequestBatch = EasyMock.createMock(classOf[ControllerBrokerRequestBatch])
     mockTopicDeletionManager = EasyMock.createMock(classOf[TopicDeletionManager])
-    partitionState = mutable.Map.empty[TopicAndPartition, PartitionState]
+    partitionState = mutable.Map.empty[TopicPartition, PartitionState]
     partitionStateMachine = new PartitionStateMachine(config, new StateChangeLogger(brokerId, true, None), controllerContext, mockTopicDeletionManager,
       mockZkClient, partitionState, mockControllerBrokerRequestBatch)
   }
@@ -87,7 +87,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockZkClient.createTopicPartitionStatesRaw(Map(partition -> leaderIsrAndControllerEpoch)))
       .andReturn(Seq(CreateResponse(Code.OK, null, Some(partition), null)))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
+      partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = true))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     partitionStateMachine.handleStateChanges(partitions, OnlinePartition, Option(OfflinePartitionLeaderElectionStrategy))
@@ -161,8 +161,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
-      Seq(brokerId), isNew = false))
+      partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
@@ -193,8 +192,8 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
-      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch),
-      Seq(brokerId, otherBrokerId), isNew = false))
+      partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId, otherBrokerId),
+      isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 
@@ -246,7 +245,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> leaderAndIsrAfterElection), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
+      partition, LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch), Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
index 6363d41..5d24d79 100644
--- a/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ReplicaStateMachineTest.scala
@@ -17,12 +17,12 @@
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
-import kafka.common.TopicAndPartition
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
-import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zk.{KafkaZkClient, TopicPartitionStateZNode}
 import kafka.zookeeper.GetDataResponse
+import org.apache.kafka.common.TopicPartition
 import org.apache.zookeeper.KeeperException.Code
 import org.apache.zookeeper.data.Stat
 import org.easymock.EasyMock
@@ -43,9 +43,9 @@ class ReplicaStateMachineTest extends JUnitSuite {
   private val brokerId = 5
   private val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(brokerId, "zkConnect"))
   private val controllerEpoch = 50
-  private val partition = TopicAndPartition("t", 0)
+  private val partition = new TopicPartition("t", 0)
   private val partitions = Seq(partition)
-  private val replica = PartitionAndReplica(partition.topic, partition.partition, brokerId)
+  private val replica = PartitionAndReplica(partition, brokerId)
   private val replicas = Seq(replica)
 
   @Before
@@ -113,8 +113,8 @@ class ReplicaStateMachineTest extends JUnitSuite {
   def testNewReplicaToOfflineReplicaTransition(): Unit = {
     replicaState.put(replica, NewReplica)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, false, null))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)),
+      EasyMock.eq(partition), EasyMock.eq(false), EasyMock.anyObject()))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OfflineReplica)
@@ -155,7 +155,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+      partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
@@ -175,19 +175,19 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
     val stat = new Stat(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
-    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, false, null))
+    EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(EasyMock.eq(Seq(brokerId)),
+      EasyMock.eq(partition), EasyMock.eq(false), EasyMock.anyObject()))
     val adjustedLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(LeaderAndIsr.NoLeader, List(otherBrokerId))
     val updatedLeaderAndIsr = adjustedLeaderAndIsr.withZkVersion(adjustedLeaderAndIsr .zkVersion + 1)
     val updatedLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(updatedLeaderAndIsr, controllerEpoch)
-    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions))
-      .andReturn(Seq(GetDataResponse(Code.OK, null, Some(partition),
+    EasyMock.expect(mockZkClient.getTopicPartitionStatesRaw(partitions)).andReturn(
+      Seq(GetDataResponse(Code.OK, null, Some(partition),
         TopicPartitionStateZNode.encode(leaderIsrAndControllerEpoch), stat)))
     EasyMock.expect(mockZkClient.updateLeaderAndIsr(Map(partition -> adjustedLeaderAndIsr), controllerEpoch))
       .andReturn(UpdateLeaderAndIsrResult(Map(partition -> updatedLeaderAndIsr), Seq.empty, Map.empty))
     EasyMock.expect(mockTopicDeletionManager.isPartitionToBeDeleted(partition)).andReturn(false)
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(otherBrokerId),
-      partition.topic, partition.partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
+      partition, updatedLeaderIsrAndControllerEpoch, replicaIds, isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
 
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch, mockTopicDeletionManager)
@@ -230,7 +230,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+      partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)
@@ -240,11 +240,11 @@ class ReplicaStateMachineTest extends JUnitSuite {
 
   @Test
   def testOfflineReplicaToReplicaDeletionStartedTransition(): Unit = {
-    val callbacks = (new Callbacks.CallbackBuilder).build
+    val callbacks = new Callbacks()
     replicaState.put(replica, OfflineReplica)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockControllerBrokerRequestBatch.addStopReplicaRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, true, callbacks.stopReplicaResponseCallback))
+      partition, true, callbacks.stopReplicaResponseCallback))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, ReplicaDeletionStarted, callbacks)
@@ -348,7 +348,7 @@ class ReplicaStateMachineTest extends JUnitSuite {
     controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
     EasyMock.expect(mockControllerBrokerRequestBatch.newBatch())
     EasyMock.expect(mockControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(Seq(brokerId),
-      partition.topic, partition.partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
+      partition, leaderIsrAndControllerEpoch, Seq(brokerId), isNew = false))
     EasyMock.expect(mockControllerBrokerRequestBatch.sendRequestsToBrokers(controllerEpoch))
     EasyMock.replay(mockZkClient, mockControllerBrokerRequestBatch)
     replicaStateMachine.handleStateChanges(replicas, OnlineReplica)

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 438d736..afd619d 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -195,7 +195,7 @@ class LogDirFailureTest extends IntegrationTestHarness {
     // The controller should have marked the replica on the original leader as offline
     val controllerServer = servers.find(_.kafkaController.isActive).get
     val offlineReplicas = controllerServer.kafkaController.replicaStateMachine.replicasInState(topic, OfflineReplica)
-    assertTrue(offlineReplicas.contains(PartitionAndReplica(topic, 0, leaderServerId)))
+    assertTrue(offlineReplicas.contains(PartitionAndReplica(new TopicPartition(topic, 0), leaderServerId)))
   }
 
   private def subscribeAndWaitForAssignment(topic: String, consumer: KafkaConsumer[Array[Byte], Array[Byte]]) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3735a6ca/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 77ac748..775c68e 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,15 +16,12 @@
 */
 package kafka.zk
 
-import kafka.common.TopicAndPartition
 import kafka.server.Defaults
 import kafka.zookeeper.ZooKeeperClient
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{After, Before, Test}
 
-import scala.collection.mutable
-
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private var zooKeeperClient: ZooKeeperClient = null
@@ -102,10 +99,11 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     // create a topic path
     zkClient.createRecursive(TopicZNode.path(topic))
 
-    val assignment = new mutable.HashMap[TopicAndPartition, Seq[Int]]()
-    assignment.put(new TopicAndPartition(topic, 0), Seq(0,1))
-    assignment.put(new TopicAndPartition(topic, 1), Seq(0,1))
-    zkClient.setTopicAssignmentRaw(topic, assignment.toMap)
+    val assignment = Map(
+      new TopicPartition(topic, 0) -> Seq(0, 1),
+      new TopicPartition(topic, 1) -> Seq(0, 1)
+    )
+    zkClient.setTopicAssignmentRaw(topic, assignment)
 
     assertEquals(2, zkClient.getTopicPartitionCount(topic).get)
   }
@@ -165,15 +163,15 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(Map.empty, zkClient.getPartitionReassignment)
 
     val reassignment = Map(
-      TopicAndPartition("topic_a", 0) -> Seq(0, 1, 3),
-      TopicAndPartition("topic_a", 1) -> Seq(2, 1, 3),
-      TopicAndPartition("topic_b", 0) -> Seq(4, 5),
-      TopicAndPartition("topic_c", 0) -> Seq(5, 3)
+      new TopicPartition("topic_a", 0) -> Seq(0, 1, 3),
+      new TopicPartition("topic_a", 1) -> Seq(2, 1, 3),
+      new TopicPartition("topic_b", 0) -> Seq(4, 5),
+      new TopicPartition("topic_c", 0) -> Seq(5, 3)
     )
     zkClient.setOrCreatePartitionReassignment(reassignment)
     assertEquals(reassignment, zkClient.getPartitionReassignment)
 
-    val updatedReassingment = reassignment - TopicAndPartition("topic_b", 0)
+    val updatedReassingment = reassignment - new TopicPartition("topic_b", 0)
     zkClient.setOrCreatePartitionReassignment(updatedReassingment)
     assertEquals(updatedReassingment, zkClient.getPartitionReassignment)
 


Mime
View raw message