kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1397747 [1/2] - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/controller/ core/src/main/scala/kafka/utils/ core/src/test/scala/unit/kafka/admin/
Date Sat, 13 Oct 2012 00:39:42 GMT
Author: nehanarkhede
Date: Sat Oct 13 00:39:41 2012
New Revision: 1397747

URL: http://svn.apache.org/viewvc?rev=1397747&view=rev
Log:
KAFKA-43 Move leader to preferred replica; patched by Neha Narkhede; reviewed by Joel Koshy and Jun Rao

Added:
    incubator/kafka/branches/0.8/bin/kafka-preferred-replica-election.sh   (with props)
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
Modified:
    incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh
    incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala

Modified: incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-check-reassignment-status.sh Sat Oct 13 00:39:41 2012
@@ -15,5 +15,4 @@
 # limitations under the License.
 
 base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
 $base_dir/kafka-run-class.sh kafka.admin.CheckReassignmentStatus $@

Added: incubator/kafka/branches/0.8/bin/kafka-preferred-replica-election.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-preferred-replica-election.sh?rev=1397747&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-preferred-replica-election.sh (added)
+++ incubator/kafka/branches/0.8/bin/kafka-preferred-replica-election.sh Sat Oct 13 00:39:41 2012
@@ -0,0 +1,18 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+base_dir=$(dirname $0)
+$base_dir/kafka-run-class.sh kafka.admin.PreferredReplicaLeaderElectionCommand $@

Propchange: incubator/kafka/branches/0.8/bin/kafka-preferred-replica-election.sh
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh (original)
+++ incubator/kafka/branches/0.8/bin/kafka-reassign-partitions.sh Sat Oct 13 00:39:41 2012
@@ -15,5 +15,4 @@
 # limitations under the License.
 
 base_dir=$(dirname $0)
-export KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote -Dlog4j.configuration=file:$base_dir/kafka-console-consumer-log4j.properties"
 $base_dir/kafka-run-class.sh kafka.admin.ReassignPartitionsCommand $@

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CheckReassignmentStatus.scala Sat Oct 13 00:39:41 2012
@@ -5,6 +5,7 @@ import joptsimple.OptionParser
 import org.I0Itec.zkclient.ZkClient
 import kafka.utils._
 import scala.collection.Map
+import kafka.common.TopicAndPartition
 
 object CheckReassignmentStatus extends Logging {
 
@@ -46,48 +47,45 @@ object CheckReassignmentStatus extends L
             val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
             val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
             val newReplicas = replicasList.split(",").map(_.toInt)
-            ((topic, partition), newReplicas.toSeq)
+            (TopicAndPartition(topic, partition), newReplicas.toSeq)
           }.toMap
-        case None => Map.empty[(String, Int), Seq[Int]]
+        case None => Map.empty[TopicAndPartition, Seq[Int]]
       }
 
       val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkClient, partitionsToBeReassigned)
       reassignedPartitionsStatus.foreach { partition =>
         partition._2 match {
           case ReassignmentCompleted =>
-            println("Partition [%s,%d] reassignment to %s completed successfully".format(partition._1, partition._2,
-            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+            println("Partition %s reassignment completed successfully".format(partition._1))
           case ReassignmentFailed =>
-            println("Partition [%s,%d] reassignment to %s failed".format(partition._1, partition._2,
-            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+            println("Partition %s reassignment failed".format(partition._1))
           case ReassignmentInProgress =>
-            println("Partition [%s,%d] reassignment to %s in progress".format(partition._1, partition._2,
-            partitionsToBeReassigned((partition._1._1, partition._1._2))))
+            println("Partition %s reassignment in progress".format(partition._1))
         }
       }
     }
   }
 
-  def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[(String, Int), Seq[Int]])
-  :Map[(String, Int), ReassignmentStatus] = {
+  def checkIfReassignmentSucceeded(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]])
+  :Map[TopicAndPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas)
     // for all partitions whose replica reassignment is complete, check the status
     partitionsToBeReassigned.map { topicAndPartition =>
-      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1._1, topicAndPartition._1._2,
+      (topicAndPartition._1, checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition._1,
         topicAndPartition._2, partitionsToBeReassigned, partitionsBeingReassigned))
     }
   }
 
-  def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topic: String, partition: Int,
+  def checkIfPartitionReassignmentSucceeded(zkClient: ZkClient, topicAndPartition: TopicAndPartition,
                                             reassignedReplicas: Seq[Int],
-                                            partitionsToBeReassigned: Map[(String, Int), Seq[Int]],
-                                            partitionsBeingReassigned: Map[(String, Int), Seq[Int]]): ReassignmentStatus = {
-    val newReplicas = partitionsToBeReassigned((topic, partition))
-    partitionsBeingReassigned.get((topic, partition)) match {
+                                            partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]],
+                                            partitionsBeingReassigned: Map[TopicAndPartition, Seq[Int]]): ReassignmentStatus = {
+    val newReplicas = partitionsToBeReassigned(topicAndPartition)
+    partitionsBeingReassigned.get(topicAndPartition) match {
       case Some(partition) => ReassignmentInProgress
       case None =>
         // check if AR == RAR
-        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition)
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
         if(assignedReplicas == newReplicas)
           ReassignmentCompleted
         else

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala?rev=1397747&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala Sat Oct 13 00:39:41 2012
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import joptsimple.OptionParser
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException}
+import kafka.common.{TopicAndPartition, AdminCommandFailedException}
+
+object PreferredReplicaLeaderElectionCommand extends Logging {
+
+  def main(args: Array[String]): Unit = {
+    val parser = new OptionParser
+    val jsonFileOpt = parser.accepts("path to json file", "The JSON file with the list of partitions " +
+      "for which preferred replica leader election should be done, in the following format - \n" +
+       "[{\"topic\": \"foo\", \"partition\": \"1\"}, {\"topic\": \"foobar\", \"partition\": \"2\"}]. \n" +
+      "Defaults to all existing partitions")
+      .withRequiredArg
+      .describedAs("list of partitions for which preferred replica leader election needs to be triggered")
+      .ofType(classOf[String])
+      .defaultsTo("")
+    val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " +
+      "form host:port. Multiple URLS can be given to allow fail-over.")
+      .withRequiredArg
+      .describedAs("urls")
+      .ofType(classOf[String])
+
+    val options = parser.parse(args : _*)
+
+    Utils.checkRequiredArgs(parser, options, jsonFileOpt, zkConnectOpt)
+
+    val jsonFile = options.valueOf(jsonFileOpt)
+    val zkConnect = options.valueOf(zkConnectOpt)
+    val jsonString = Utils.readFileIntoString(jsonFile)
+    var zkClient: ZkClient = null
+
+    try {
+      zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
+      val partitionsForPreferredReplicaElection =
+        if(jsonFile == "") ZkUtils.getAllPartitions(zkClient) else parsePreferredReplicaJsonData(jsonString)
+      val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection)
+
+      // attach shutdown handler to catch control-c
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        override def run() = {
+          // delete the admin path so it can be retried
+          ZkUtils.deletePathRecursive(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+          zkClient.close()
+        }
+      })
+
+      preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
+      println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection))
+    } catch {
+      case e =>
+        println("Failed to start preferred replica election")
+        println(Utils.stackTrace(e))
+    } finally {
+      if (zkClient != null)
+        zkClient.close()
+    }
+  }
+
+  def parsePreferredReplicaJsonData(jsonString: String): Set[TopicAndPartition] = {
+    SyncJSON.parseFull(jsonString) match {
+      case Some(partitionList) =>
+        val partitions = (partitionList.asInstanceOf[List[Any]])
+        Set.empty[TopicAndPartition] ++ partitions.map { m =>
+          val topic = m.asInstanceOf[Map[String, String]].get("topic").get
+          val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
+          TopicAndPartition(topic, partition)
+        }
+      case None => throw new AdministrationException("Preferred replica election data is empty")
+    }
+  }
+
+  def writePreferredReplicaElectionData(zkClient: ZkClient,
+                                        partitionsUndergoingPreferredReplicaElection: scala.collection.Set[TopicAndPartition]) {
+    val zkPath = ZkUtils.PreferredReplicaLeaderElectionPath
+    val jsonData = Utils.arrayToJson(partitionsUndergoingPreferredReplicaElection.map { p =>
+      Utils.stringMapToJsonString(Map(("topic" -> p.topic), ("partition" -> p.partition.toString)))
+    }.toArray)
+    try {
+      ZkUtils.createPersistentPath(zkClient, zkPath, jsonData)
+      info("Created preferred replica election path with %s".format(jsonData))
+    } catch {
+      case nee: ZkNodeExistsException =>
+        val partitionsUndergoingPreferredReplicaElection =
+          PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(ZkUtils.readData(zkClient, zkPath)._1)
+        throw new AdministrationException("Preferred replica leader election currently in progress for " +
+          "%s. Aborting operation".format(partitionsUndergoingPreferredReplicaElection))
+      case e2 => throw new AdministrationException(e2.toString)
+    }
+  }
+}
+
+class PreferredReplicaLeaderElectionCommand(zkClient: ZkClient, partitions: scala.collection.Set[TopicAndPartition])
+  extends Logging {
+  def moveLeaderToPreferredReplica() = {
+    try {
+      val validPartitions = partitions.filter(p => validatePartition(zkClient, p.topic, p.partition))
+      PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions)
+    } catch {
+      case e => throw new AdminCommandFailedException("Admin command failed", e)
+    }
+  }
+
+  def validatePartition(zkClient: ZkClient, topic: String, partition: Int): Boolean = {
+    // check if partition exists
+    val partitionsOpt = ZkUtils.getPartitionsForTopics(zkClient, List(topic)).get(topic)
+    partitionsOpt match {
+      case Some(partitions) =>
+        if(partitions.contains(partition)) {
+          true
+        } else {
+          error("Skipping preferred replica leader election for partition [%s,%d] ".format(topic, partition) +
+            "since it doesn't exist")
+          false
+        }
+      case None => error("Skipping preferred replica leader election for partition " +
+        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
+        false
+    }
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala Sat Oct 13 00:39:41 2012
@@ -19,8 +19,8 @@ package kafka.admin
 import joptsimple.OptionParser
 import kafka.utils._
 import org.I0Itec.zkclient.ZkClient
-import kafka.common.AdminCommandFailedException
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.common.{TopicAndPartition, AdminCommandFailedException}
 
 object ReassignPartitionsCommand extends Logging {
 
@@ -63,7 +63,7 @@ object ReassignPartitionsCommand extends
             val partition = m.asInstanceOf[Map[String, String]].get("partition").get.toInt
             val replicasList = m.asInstanceOf[Map[String, String]].get("replicas").get
             val newReplicas = replicasList.split(",").map(_.toInt)
-            ((topic, partition), newReplicas.toSeq)
+            (TopicAndPartition(topic, partition), newReplicas.toSeq)
           }.toMap
         case None => throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile))
       }
@@ -94,13 +94,13 @@ object ReassignPartitionsCommand extends
   }
 }
 
-class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[(String, Int), Seq[Int]])
+class ReassignPartitionsCommand(zkClient: ZkClient, partitions: collection.immutable.Map[TopicAndPartition, Seq[Int]])
   extends Logging {
   def reassignPartitions(): Boolean = {
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1._1, p._1._2))
+      val validPartitions = partitions.filter(p => validatePartition(zkClient, p._1.topic, p._1.partition))
       val jsonReassignmentData = Utils.mapToJson(validPartitions.map(p =>
-        ("%s,%s".format(p._1._1, p._1._2)) -> p._2.map(_.toString)))
+        ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2.map(_.toString)))
       ZkUtils.createPersistentPath(zkClient, ZkUtils.ReassignPartitionsPath, jsonReassignmentData)
       true
     }catch {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/KafkaController.scala Sat Oct 13 00:39:41 2012
@@ -30,7 +30,8 @@ import org.I0Itec.zkclient.{IZkDataListe
 import kafka.utils.{Utils, ZkUtils, Logging}
 import org.I0Itec.zkclient.exception.ZkNoNodeException
 import java.lang.{IllegalStateException, Object}
-import kafka.common.KafkaException
+import kafka.admin.PreferredReplicaLeaderElectionCommand
+import kafka.common.{TopicAndPartition, KafkaException}
 
 class ControllerContext(val zkClient: ZkClient,
                         var controllerChannelManager: ControllerChannelManager = null,
@@ -38,10 +39,11 @@ class ControllerContext(val zkClient: Zk
                         var liveBrokers: Set[Broker] = null,
                         var liveBrokerIds: Set[Int] = null,
                         var allTopics: Set[String] = null,
-                        var partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null,
-                        var allLeaders: mutable.Map[(String, Int), Int] = null,
-                        var partitionsBeingReassigned: mutable.Map[(String, Int), ReassignedPartitionsContext] =
-                        new mutable.HashMap)
+                        var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = null,
+                        var allLeaders: mutable.Map[TopicAndPartition, Int] = null,
+                        var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
+                        new mutable.HashMap,
+                        var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet)
 
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Controller " + config.brokerId + "]: "
@@ -52,6 +54,7 @@ class KafkaController(val config : Kafka
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
     config.brokerId)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
+  private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
 
   newGauge(
     "ActiveControllerCount",
@@ -74,6 +77,7 @@ class KafkaController(val config : Kafka
       info("Broker %d starting become controller state transition".format(config.brokerId))
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
+      registerPreferredReplicaElectionListener()
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
@@ -110,7 +114,7 @@ class KafkaController(val config : Kafka
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter(p =>
       p._2.newReplicas.foldLeft(false)((a, replica) => newBrokers.contains(replica) || a))
-    partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1._1, p._1._2, p._2))
+    partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
   }
 
   /**
@@ -128,7 +132,7 @@ class KafkaController(val config : Kafka
     updateLeaderAndIsrCache()
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
     val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
-      deadBrokers.contains(partitionAndLeader._2)).keySet.toSeq
+      deadBrokers.contains(partitionAndLeader._2)).keySet
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -143,7 +147,7 @@ class KafkaController(val config : Kafka
    * 1. Registers partition change listener. This is not required until KAFKA-347
    * 2. Invokes the new partition callback
    */
-  def onNewTopicCreation(topics: Set[String], newPartitions: Seq[(String, Int)]) {
+  def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
     info("New topic creation callback for %s".format(newPartitions.mkString(",")))
     // subscribe to partition changes
     topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
@@ -156,7 +160,7 @@ class KafkaController(val config : Kafka
    * 1. Move the newly created partitions to the NewPartition state
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
    */
-  def onNewPartitionCreation(newPartitions: Seq[(String, Int)]) {
+  def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
     replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
@@ -178,54 +182,40 @@ class KafkaController(val config : Kafka
    * 6. Write new AR
    * 7. Remove partition from the /admin/reassign_partitions path
    */
-  def onPartitionReassignment(topic: String, partition: Int, reassignedPartitionContext: ReassignedPartitionsContext) {
+  def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    areReplicasInIsr(topic, partition, reassignedReplicas) match {
+    areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
       case true =>
         // mark the new replicas as online
         reassignedReplicas.foreach { replica =>
-          replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)),
-            OnlineReplica)
+          replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
+            replica)), OnlineReplica)
         }
         // check if current leader is in the new replicas list. If not, controller needs to trigger leader election
-        moveReassignedPartitionLeaderIfRequired(topic, partition, reassignedPartitionContext)
+        moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
         // stop older replicas
-        stopOldReplicasOfReassignedPartition(topic, partition, reassignedPartitionContext)
+        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext)
         // write the new list of replicas for this partition in zookeeper
-        updateAssignedReplicasForPartition(topic, partition, reassignedPartitionContext)
+        updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext)
         // update the /admin/reassign_partitions path to remove this partition
-        removePartitionFromReassignedPartitions(topic, partition)
-        info("Removed partition [%s, %d] from the list of reassigned partitions in zookeeper".format(topic, partition))
-        controllerContext.partitionsBeingReassigned.remove((topic, partition))
+        removePartitionFromReassignedPartitions(topicAndPartition)
+        info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
+        controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
       case false =>
-        info("New replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) +
+        info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
           "reassigned not yet caught up with the leader")
         // start new replicas
-        startNewReplicasForReassignedPartition(topic, partition, reassignedPartitionContext)
-        info("Waiting for new replicas %s for partition [%s, %d] being ".format(reassignedReplicas.mkString(","), topic, partition) +
+        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext)
+        info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
           "reassigned to catch up with the leader")
     }
   }
 
-  /* TODO: kafka-330  This API is unused until we introduce the delete topic functionality.
-  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
-  //  def onTopicDeletion(topics: Set[String], replicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-  //    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
-  //    for((topicPartition, brokers) <- replicaAssignment){
-  //      for (broker <- brokers){
-  //        if (!brokerToPartitionToStopReplicaMap.contains(broker))
-  //          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
-  //        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
-  //      }
-  //      controllerContext.allLeaders.remove(topicPartition)
-  //      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
-  //    }
-  //    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
-  //      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
-  //      info("Handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
-  //      sendRequest(broker, stopReplicaRequest)
-  //    }
-  //  }
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) {
+    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
+    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+    partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
+  }
 
   /**
    * Invoked when the controller module of a Kafka server is started up. This does not assume that the current broker
@@ -274,30 +264,44 @@ class KafkaController(val config : Kafka
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient,
       controllerContext.allTopics.toSeq)
-    controllerContext.allLeaders = new mutable.HashMap[(String, Int), Int]
+    controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, Int]
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
     startChannelManager()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
-    initializeReassignedPartitionsContext()
+    initializeAndMaybeTriggerPartitionReassignment()
+    initializeAndMaybeTriggerPreferredReplicaElection()
   }
 
-  private def initializeReassignedPartitionsContext() {
+  private def initializeAndMaybeTriggerPartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
     // check if they are already completed
     val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
       controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
-    reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p._1, p._2))
+    reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
     controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned
     controllerContext.partitionsBeingReassigned --= reassignedPartitions
     info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
     info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
     info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString()))
-    controllerContext.partitionsBeingReassigned.foreach(partition =>
-      onPartitionReassignment(partition._1._1, partition._1._2, partition._2))
+    controllerContext.partitionsBeingReassigned.foreach(partition => onPartitionReassignment(partition._1, partition._2))
+  }
+
+  private def initializeAndMaybeTriggerPreferredReplicaElection() {
+    // read the partitions undergoing preferred replica election from zookeeper path
+    val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
+    // check if they are already completed
+    val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
+      controllerContext.allLeaders(partition) == controllerContext.partitionReplicaAssignment(partition).head)
+    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
+    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
+    info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
+    info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
+    info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+    onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
   }
 
   private def startChannelManager() {
@@ -314,7 +318,7 @@ class KafkaController(val config : Kafka
           controllerContext.allLeaders.put(topicPartition, leaderAndIsr.leader)
         case false =>
           debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderAndIsr.leader) +
-            "partition [%s, %d] is dead, just ignore it".format(topicPartition._1, topicPartition._2))
+            "partition %s is dead, just ignore it".format(topicPartition))
       }
     }
   }
@@ -328,72 +332,71 @@ class KafkaController(val config : Kafka
     }
   }
 
-  private def moveReassignedPartitionLeaderIfRequired(topic: String, partition: Int,
+  private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
                                                       reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val currentLeader = controllerContext.allLeaders((topic, partition))
+    val currentLeader = controllerContext.allLeaders(topicAndPartition)
     if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
-      info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
+      info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
       // move the leader to one of the alive and caught up new replicas
-      partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition,
-        reassignedPartitionLeaderSelector)
+      partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
     }else {
       // check if the leader is alive or not
       controllerContext.liveBrokerIds.contains(currentLeader) match {
         case true =>
-          info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
+          info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
             "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
         case false =>
-          info("Leader %s for partition [%s, %d] being reassigned, ".format(currentLeader, topic, partition) +
+          info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
             "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
-          partitionStateMachine.handleStateChanges(List((topic, partition)), OnlinePartition,
-            reassignedPartitionLeaderSelector)
+          partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition, reassignedPartitionLeaderSelector)
       }
     }
   }
 
-  private def stopOldReplicasOfReassignedPartition(topic: String, partition: Int,
+  private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
                                                    reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
+    val topic = topicAndPartition.topic
+    val partition = topicAndPartition.partition
     // send stop replica state change request to the old replicas
-    val oldReplicas = controllerContext.partitionReplicaAssignment((topic, partition)).toSet -- reassignedReplicas.toSet
+    val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
     // first move the replica to offline state (the controller removes it from the ISR)
     oldReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
+      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
     }
     // send stop replica command to the old replicas
     oldReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica)
+      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica)
     }
   }
 
-  private def updateAssignedReplicasForPartition(topic: String, partition: Int,
+  private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
                                                  reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1._1.equals(topic))
-    partitionsAndReplicasForThisTopic.put((topic, partition), reassignedReplicas)
-    updateAssignedReplicasForPartition(topic, partition, partitionsAndReplicasForThisTopic)
-    info("Updated assigned replicas for partition [%s, %d] being reassigned ".format(topic, partition) +
-      "to %s".format(reassignedReplicas.mkString(",")))
+    val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
+    partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas)
+    updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
+    info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(",")))
     // update the assigned replica list after a successful zookeeper write
-    controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas)
+    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
     // stop watching the ISR changes for this partition
-    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-      controllerContext.partitionsBeingReassigned((topic, partition)).isrChangeListener)
+    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+      controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
     // update the assigned replica list
-    controllerContext.partitionReplicaAssignment.put((topic, partition), reassignedReplicas)
+    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
   }
 
-  private def startNewReplicasForReassignedPartition(topic: String, partition: Int,
+  private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
                                                      reassignedPartitionContext: ReassignedPartitionsContext) {
     // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
     // replicas list
-    val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment((topic, partition))
+    val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment(topicAndPartition)
     val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas
     val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq
     newReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Seq(new PartitionAndReplica(topic, partition, replica)), NewReplica)
+      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
     }
   }
 
@@ -401,35 +404,54 @@ class KafkaController(val config : Kafka
     zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
   }
 
-  def removePartitionFromReassignedPartitions(topic: String, partition: Int) {
+  private def registerPreferredReplicaElectionListener() {
+    zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
+  }
+
+  def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
     // read the current list of reassigned partitions from zookeeper
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
     // remove this partition from that list
-    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - ((topic, partition))
+    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
     // write the new list to zookeeper
     ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
     // update the cache
-    controllerContext.partitionsBeingReassigned.remove((topic, partition))
+    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
   }
 
-  def updateAssignedReplicasForPartition(topic: String, partition: Int,
-                                         newReplicaAssignmentForTopic: Map[(String, Int), Seq[Int]]) {
+  def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
+                                         newReplicaAssignmentForTopic: Map[TopicAndPartition, Seq[Int]]) {
     try {
-      val zkPath = ZkUtils.getTopicPath(topic)
+      val zkPath = ZkUtils.getTopicPath(topicAndPartition.topic)
       val jsonPartitionMap = Utils.mapToJson(newReplicaAssignmentForTopic.map(e =>
-        (e._1._2.toString -> e._2.map(_.toString))))
+        (e._1.partition.toString -> e._2.map(_.toString))))
       ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
       debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
     } catch {
-      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topic))
+      case e: ZkNoNodeException => throw new IllegalStateException("Topic %s doesn't exist".format(topicAndPartition.topic))
       case e2 => throw new KafkaException(e2.toString)
     }
   }
 
-  private def getAllReplicasForPartition(partitions: Seq[(String, Int)]): Seq[PartitionAndReplica] = {
+  def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
+    partitionsToBeRemoved.foreach { partition =>
+      // check the status
+      val currentLeader = controllerContext.allLeaders(partition)
+      val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
+      if(currentLeader == preferredReplica) {
+        info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
+      }else {
+        warn("Partition %s failed to complete preferred replica leader election. Leader is %d".format(partition, currentLeader))
+      }
+    }
+    ZkUtils.deletePath(zkClient, ZkUtils.PreferredReplicaLeaderElectionPath)
+    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
+  }
+
+  private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
     partitions.map { p =>
       val replicas = controllerContext.partitionReplicaAssignment(p)
-      replicas.map(r => new PartitionAndReplica(p._1, p._2, r))
+      replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
     }.flatten
   }
 
@@ -488,39 +510,40 @@ class PartitionsReassignedListener(contr
     val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
     newPartitions.foreach { partitionToBeReassigned =>
       controllerContext.controllerLock synchronized {
-        val topic = partitionToBeReassigned._1._1
-        val partition = partitionToBeReassigned._1._2
+        val topic = partitionToBeReassigned._1.topic
+        val partition = partitionToBeReassigned._1.partition
         val newReplicas = partitionToBeReassigned._2
+        val topicAndPartition = partitionToBeReassigned._1
         val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
         try {
-          val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get((topic, partition))
+          val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
           assignedReplicasOpt match {
             case Some(assignedReplicas) =>
               if(assignedReplicas == newReplicas) {
-                throw new KafkaException("Partition [%s, %d] to be reassigned is already assigned to replicas"
-                  .format(topic, partition) +
+                throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
+                  .format(topicAndPartition) +
                   " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
               }else {
                 if(aliveNewReplicas == newReplicas) {
-                  info("Handling reassignment of partition [%s, %d] to new replicas %s".format(topic, partition,
+                  info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
                     newReplicas.mkString(",")))
                   val context = createReassignmentContextForPartition(topic, partition, newReplicas)
-                  controllerContext.partitionsBeingReassigned.put((topic, partition), context)
-                  controller.onPartitionReassignment(topic, partition, context)
+                  controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
+                  controller.onPartitionReassignment(topicAndPartition, context)
                 }else {
                   // some replica in RAR is not alive. Fail partition reassignment
                   throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) +
-                    " %s for partition [%s, %d] to be reassigned are alive. ".format(newReplicas.mkString(","), topic, partition) +
+                    " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) +
                     "Failing partition reassignment")
                 }
               }
-            case None => throw new KafkaException("Attempt to reassign partition [%s, %d] that doesn't exist"
-              .format(topic, partition))
+            case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
+              .format(topicAndPartition))
           }
         }catch {
-          case e => error("Error completing reassignment of partition [%s, %d]".format(topic, partition), e)
+          case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
           // remove the partition from the admin path to unblock the admin client
-          controller.removePartitionFromReassignedPartitions(topic, partition)
+          controller.removePartitionFromReassignedPartitions(topicAndPartition)
         }
       }
     }
@@ -563,7 +586,7 @@ class ReassignedPartitionsIsrChangeListe
   val controllerContext = controller.controllerContext
 
   /**
-   * Invoked when some partitions are reassigned by the admin command
+   * Invoked when some partitions need to move leader to preferred replica
    * @throws Exception On any error.
    */
   @throws(classOf[Exception])
@@ -572,7 +595,8 @@ class ReassignedPartitionsIsrChangeListe
       controllerContext.controllerLock synchronized {
         debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
         // check if this partition is still being reassigned or not
-        controllerContext.partitionsBeingReassigned.get((topic, partition)) match {
+        val topicAndPartition = TopicAndPartition(topic, partition)
+        controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
           case Some(reassignedPartitionContext) =>
             // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
             val newLeaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition)
@@ -584,7 +608,7 @@ class ReassignedPartitionsIsrChangeListe
                   info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
                     "Resuming partition reassignment")
-                  controller.onPartitionReassignment(topic, partition, reassignedPartitionContext)
+                  controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
                 }else {
                   info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
                     .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
@@ -602,7 +626,47 @@ class ReassignedPartitionsIsrChangeListe
   }
 
   /**
-   * Called when the leader information stored in zookeeper has been delete. Try to elect as the leader
+   * @throws Exception
+   *             On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataDeleted(dataPath: String) {
+  }
+}
+
+/**
+ * Starts the preferred replica leader election for the list of partitions specified under
+ * /admin/preferred_replica_election -
+ */
+class PreferredReplicaElectionListener(controller: KafkaController) extends IZkDataListener with Logging {
+  this.logIdent = "[PreferredReplicaElectionListener on " + controller.config.brokerId + "]: "
+  val zkClient = controller.controllerContext.zkClient
+  val controllerContext = controller.controllerContext
+
+  /**
+   * Invoked when some partitions are reassigned by the admin command
+   * @throws Exception On any error.
+   */
+  @throws(classOf[Exception])
+  def handleDataChange(dataPath: String, data: Object) {
+    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election" +
+      " %s".format(dataPath, data.toString))
+    val partitionsForPreferredReplicaElection =
+      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(data.toString)
+    val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+    controllerContext.controllerLock synchronized {
+      try {
+        controller.onPreferredReplicaElection(newPartitions)
+      } catch {
+        case e => error("Error completing preferred replica leader election for partitions %s"
+          .format(partitionsForPreferredReplicaElection.mkString(",")), e)
+      } finally {
+        controller.removePartitionsFromPreferredReplicaElection(newPartitions)
+      }
+    }
+  }
+
+  /**
    * @throws Exception
    *             On any error.
    */

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala Sat Oct 13 00:39:41 2012
@@ -13,12 +13,12 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.utils.Logging
-import kafka.common.{StateChangeFailedException, PartitionOfflineException}
+import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
 
 trait PartitionLeaderSelector {
 
@@ -44,17 +44,15 @@ trait PartitionLeaderSelector {
  * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
  */
 class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
+  this.logIdent = "[OfflinePartitionLeaderSelector]: "
 
   def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    controllerContext.partitionReplicaAssignment.get((topic, partition)) match {
+    controllerContext.partitionReplicaAssignment.get(TopicAndPartition(topic, partition)) match {
       case Some(assignedReplicas) =>
         val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
         val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
         val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
         val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-        debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
-          .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
-          currentLeaderIsrZkPathVersion))
         val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
           case true =>
             debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
@@ -83,7 +81,7 @@ class OfflinePartitionLeaderSelector(con
       case None =>
         ControllerStat.offlinePartitionRate.mark()
         throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
-                                            "replicas assigned to it")
+          "replicas assigned to it")
     }
   }
 }
@@ -92,20 +90,18 @@ class OfflinePartitionLeaderSelector(con
  * Picks one of the alive in-sync reassigned replicas as the new leader
  */
 class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
+  this.logIdent = "[ReassignedPartitionLeaderSelector]: "
 
   def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val reassignedReplicas = controllerContext.partitionsBeingReassigned((topic, partition)).newReplicas
+    val reassignedReplicas = controllerContext.partitionsBeingReassigned(TopicAndPartition(topic, partition)).newReplicas
     val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
     val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-    debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
-      .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
-      currentLeaderIsrZkPathVersion))
     // pick any replica from the newly assigned replicas list that is in the ISR
     val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
     val newLeaderOpt = aliveReassignedReplicas.headOption
     newLeaderOpt match {
       case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
-                              currentLeaderIsrZkPathVersion + 1), reassignedReplicas)
+        currentLeaderIsrZkPathVersion + 1), reassignedReplicas)
       case None =>
         reassignedReplicas.size match {
           case 0 =>
@@ -117,4 +113,37 @@ class ReassignedPartitionLeaderSelector(
         }
     }
   }
+}
+
+/**
+ * Picks the preferred replica as the new leader if -
+ * 1. It is already not the current leader
+ * 2. It is alive
+ */
+class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
+with Logging {
+  this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
+
+  def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+    val preferredReplica = assignedReplicas.head
+    // check if preferred replica is the current leader
+    val currentLeader = controllerContext.allLeaders(topicAndPartition)
+    if(currentLeader == preferredReplica) {
+      throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
+        .format(preferredReplica, topic, partition))
+    } else {
+      info("Current leader %d for partition [%s,%d] is not the preferred replica.".format(currentLeader, topic, partition) +
+        " Trigerring preferred replica leader election")
+      // check if preferred replica is not the current leader and is alive and in the isr
+      if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
+        (new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr,
+          currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
+      } else {
+        throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
+          "[%s,%d] is either not alive or not in the isr. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+      }
+    }
+  }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala Sat Oct 13 00:39:41 2012
@@ -21,9 +21,9 @@ import kafka.api.LeaderAndIsr
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import collection.JavaConversions._
-import kafka.common.{StateChangeFailedException, PartitionOfflineException}
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
 
 /**
  * This class represents the state machine for partitions. It defines the states that a partition can be in, and
@@ -41,7 +41,7 @@ class PartitionStateMachine(controller: 
   this.logIdent = "[Partition state machine on Controller " + controller.config.brokerId + "]: "
   private val controllerContext = controller.controllerContext
   private val zkClient = controllerContext.zkClient
-  var partitionState: mutable.Map[(String, Int), PartitionState] = mutable.Map.empty
+  var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private var isShuttingDown = new AtomicBoolean(false)
@@ -83,7 +83,7 @@ class PartitionStateMachine(controller: 
       // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
       partitionState.filter(partitionAndState =>
         partitionAndState._2.equals(OfflinePartition) || partitionAndState._2.equals(NewPartition)).foreach {
-        partitionAndState => handleStateChange(partitionAndState._1._1, partitionAndState._1._2, OnlinePartition,
+        partitionAndState => handleStateChange(partitionAndState._1.topic, partitionAndState._1.partition, OnlinePartition,
                                                offlinePartitionSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers()
@@ -97,13 +97,13 @@ class PartitionStateMachine(controller: 
    * @param partitions   The list of partitions that need to be transitioned to the target state
    * @param targetState  The state that the partitions should be moved to
    */
-  def handleStateChanges(partitions: Seq[(String, Int)], targetState: PartitionState,
+  def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
                          leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) {
     info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
       partitions.foreach { topicAndPartition =>
-        handleStateChange(topicAndPartition._1, topicAndPartition._2, targetState, leaderSelector)
+        handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers()
     }catch {
@@ -120,23 +120,24 @@ class PartitionStateMachine(controller: 
    */
   private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
                                 leaderSelector: PartitionLeaderSelector) {
+    val topicAndPartition = TopicAndPartition(topic, partition)
     try {
-      partitionState.getOrElseUpdate((topic, partition), NonExistentPartition)
+      partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
       targetState match {
         case NewPartition =>
           // pre: partition did not exist before this
           // post: partition has been assigned replicas
-          assertValidPreviousStates(topic, partition, List(NonExistentPartition), NewPartition)
+          assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
           assignReplicasToPartitions(topic, partition)
-          partitionState.put((topic, partition), NewPartition)
+          partitionState.put(topicAndPartition, NewPartition)
           info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
-            "%s".format(controllerContext.partitionReplicaAssignment(topic, partition).mkString(",")))
+            "%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
         case OnlinePartition =>
-          assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
-          partitionState(topic, partition) match {
+          assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
+          partitionState(topicAndPartition) match {
             case NewPartition =>
               // initialize leader and isr path for new partition
-              initializeLeaderAndIsrForPartition(topic, partition)
+              initializeLeaderAndIsrForPartition(topicAndPartition)
             case OfflinePartition =>
               electLeaderForPartition(topic, partition, leaderSelector)
             case OnlinePartition => // invoked when the leader needs to be re-elected
@@ -144,26 +145,26 @@ class PartitionStateMachine(controller: 
             case _ => // should never come here since illegal previous states are checked above
           }
           info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
-            partitionState(topic, partition), controllerContext.allLeaders(topic, partition)))
-          partitionState.put((topic, partition), OnlinePartition)
+            partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition)))
+          partitionState.put(topicAndPartition, OnlinePartition)
            // post: partition has a leader
         case OfflinePartition =>
           // pre: partition should be in Online state
-          assertValidPreviousStates(topic, partition, List(NewPartition, OnlinePartition), OfflinePartition)
+          assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
           info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
-          partitionState.put((topic, partition), OfflinePartition)
+          partitionState.put(topicAndPartition, OfflinePartition)
           // post: partition has no alive leader
         case NonExistentPartition =>
           // pre: partition could be in either of the above states
-          assertValidPreviousStates(topic, partition, List(OfflinePartition), NonExistentPartition)
+          assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
           info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
-          partitionState.put((topic, partition), NonExistentPartition)
+          partitionState.put(topicAndPartition, NonExistentPartition)
           // post: partition state is deleted from all brokers and zookeeper
       }
     }catch {
       case e => error("State change for partition [%s, %d] ".format(topic, partition) +
-        "from %s to %s failed".format(partitionState(topic, partition), targetState), e)
+        "from %s to %s failed".format(partitionState(topicAndPartition), targetState), e)
     }
   }
 
@@ -173,8 +174,8 @@ class PartitionStateMachine(controller: 
    */
   private def initializePartitionState() {
     for((topicPartition, replicaAssignment) <- controllerContext.partitionReplicaAssignment) {
-      val topic = topicPartition._1
-      val partition = topicPartition._2
+      val topic = topicPartition.topic
+      val partition = topicPartition.partition
       // check if leader and isr path exists for partition. If not, then it is in NEW state
       ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
         case Some(currentLeaderAndIsr) =>
@@ -191,12 +192,12 @@ class PartitionStateMachine(controller: 
     }
   }
 
-  private def assertValidPreviousStates(topic: String, partition: Int, fromStates: Seq[PartitionState],
+  private def assertValidPreviousStates(topicAndPartition: TopicAndPartition, fromStates: Seq[PartitionState],
                                         targetState: PartitionState) {
-    if(!fromStates.contains(partitionState((topic, partition))))
-      throw new IllegalStateException("Partition [%s, %d] should be in the %s states before moving to %s state"
-        .format(topic, partition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
-        .format(partitionState((topic, partition))))
+    if(!fromStates.contains(partitionState(topicAndPartition)))
+      throw new IllegalStateException("Partition %s should be in the %s states before moving to %s state"
+        .format(topicAndPartition, fromStates.mkString(","), targetState) + ". Instead it is in %s state"
+        .format(partitionState(topicAndPartition)))
   }
 
   /**
@@ -207,7 +208,7 @@ class PartitionStateMachine(controller: 
    */
   private def assignReplicasToPartitions(topic: String, partition: Int) {
     val assignedReplicas = ZkUtils.getReplicasForPartition(controllerContext.zkClient, topic, partition)
-    controllerContext.partitionReplicaAssignment += (topic, partition) -> assignedReplicas
+    controllerContext.partitionReplicaAssignment += TopicAndPartition(topic, partition) -> assignedReplicas
   }
 
   /**
@@ -220,35 +221,36 @@ class PartitionStateMachine(controller: 
    * @brokerRequestBatch  The object that holds the leader and isr requests to be sent to each broker as a result of
    *                      this state change
    */
-  private def initializeLeaderAndIsrForPartition(topic: String, partition: Int) {
-    debug("Initializing leader and isr for partition [%s, %d]".format(topic, partition))
-    val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
+  private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
+    debug("Initializing leader and isr for partition %s".format(topicAndPartition))
+    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
       case 0 =>
         ControllerStat.offlinePartitionRate.mark()
-        throw new StateChangeFailedException(("During state change of partition (%s, %d) from NEW to ONLINE, assigned replicas are " +
-          "[%s], live brokers are [%s]. No assigned replica is alive").format(topic, partition,
+        throw new StateChangeFailedException(("During state change of partition %s from NEW to ONLINE, assigned replicas are " +
+          "[%s], live brokers are [%s]. No assigned replica is alive").format(topicAndPartition,
           replicaAssignment.mkString(","), controllerContext.liveBrokerIds))
       case _ =>
-        debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition, liveAssignedReplicas))
+        debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
         // make the first replica in the list of assigned replicas, the leader
         val leader = liveAssignedReplicas.head
         val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString())
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), leaderAndIsr.toString)
           // NOTE: the above write can fail only if the current controller lost its zk session and the new controller
           // took over and initialized this partition. This can happen if the current controller went into a long
           // GC pause
-          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topic, partition, leaderAndIsr, replicaAssignment.size)
-          controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
-          partitionState.put((topic, partition), OnlinePartition)
+          brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
+            topicAndPartition.partition, leaderAndIsr, replicaAssignment.size)
+          controllerContext.allLeaders.put(topicAndPartition, leaderAndIsr.leader)
+          partitionState.put(topicAndPartition, OnlinePartition)
         }catch {
           case e: ZkNodeExistsException =>
             ControllerStat.offlinePartitionRate.mark()
-            throw new StateChangeFailedException("Error while changing partition [%s, %d]'s state from New to Online"
-              .format(topic, partition) + " since Leader and ISR path already exists")
+            throw new StateChangeFailedException("Error while changing partition %s's state from New to Online"
+              .format(topicAndPartition) + " since Leader and ISR path already exists")
         }
     }
   }
@@ -280,11 +282,12 @@ class PartitionStateMachine(controller: 
         replicasForThisPartition = replicas
       }
       // update the leader cache
-      controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+      controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderAndIsr.leader)
       info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition,
-                                                          topic, partition, newLeaderAndIsr, controllerContext.partitionReplicaAssignment((topic, partition)).size)
+                                                          topic, partition, newLeaderAndIsr,
+        controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
     }catch {
       case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
         .format(topic, partition) + " Marking this partition offline", poe)
@@ -307,7 +310,7 @@ class PartitionStateMachine(controller: 
       case Some(currentLeaderAndIsr) => currentLeaderAndIsr
       case None =>
         throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
-          "[%s, %d] in %s state".format(topic, partition, partitionState((topic, partition))))
+          "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))
     }
   }
 
@@ -330,17 +333,17 @@ class PartitionStateMachine(controller: 
             controllerContext.allTopics = currentChildren
 
             val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
-            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
+            controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
+              !deletedTopics.contains(p._1.topic))
             controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
             info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
               deletedTopics, addedPartitionReplicaAssignment))
             if(newTopics.size > 0)
-              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSeq)
+              controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
           } catch {
             case e => error("Error while handling new topic", e )
           }
           // TODO: kafka-330  Handle deleted topics
-          // handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
         }
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala Sat Oct 13 00:39:41 2012
@@ -20,9 +20,9 @@ import collection._
 import kafka.utils.{ZkUtils, Logging}
 import collection.JavaConversions._
 import kafka.api.LeaderAndIsr
-import kafka.common.StateChangeFailedException
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.IZkChildListener
+import kafka.common.{TopicAndPartition, StateChangeFailedException}
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
@@ -79,7 +79,7 @@ class ReplicaStateMachine(controller: Ka
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
-  def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState) {
+  def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState) {
     info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
@@ -99,9 +99,10 @@ class ReplicaStateMachine(controller: Ka
    * @param targetState The end state that the replica should be moved to
    */
   def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+    val topicAndPartition = TopicAndPartition(topic, partition)
     try {
       replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
-      val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
+      val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
       targetState match {
         case NewReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
@@ -123,8 +124,8 @@ class ReplicaStateMachine(controller: Ka
           // send stop replica command
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition)
           // remove this replica from the assigned replicas list for its partition
-          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition))
-          controllerContext.partitionReplicaAssignment.put((topic, partition),
+          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+          controllerContext.partitionReplicaAssignment.put(topicAndPartition,
             currentAssignedReplicas.filterNot(_ == replicaId))
           info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
           replicaState.remove((topic, partition, replicaId))
@@ -133,8 +134,8 @@ class ReplicaStateMachine(controller: Ka
           replicaState((topic, partition, replicaId)) match {
             case NewReplica =>
               // add this replica to the assigned replicas list for its partition
-              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment((topic, partition))
-              controllerContext.partitionReplicaAssignment.put((topic, partition), currentAssignedReplicas :+ replicaId)
+              val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+              controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
               info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
             case _ =>
               // check if the leader for this partition is alive or even exists
@@ -182,7 +183,7 @@ class ReplicaStateMachine(controller: Ka
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(newLeaderAndIsr.leader),
                                                               topic, partition, newLeaderAndIsr, replicaAssignment.size)
           // update the local leader and isr cache
-          controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+          controllerContext.allLeaders.put(topicAndPartition, newLeaderAndIsr.leader)
           replicaState.put((topic, partition, replicaId), OfflineReplica)
           info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
           info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
@@ -211,8 +212,8 @@ class ReplicaStateMachine(controller: Ka
    */
   private def initializeReplicaState() {
     for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
-      val topic = topicPartition._1
-      val partition = topicPartition._2
+      val topic = topicPartition.topic
+      val partition = topicPartition.partition
       assignedReplicas.foreach { replicaId =>
         controllerContext.liveBrokerIds.contains(replicaId) match {
           case true => replicaState.put((topic, partition, replicaId), OnlineReplica)
@@ -222,7 +223,7 @@ class ReplicaStateMachine(controller: Ka
     }
   }
 
-  def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[(String, Int)] = {
+  def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
     controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Sat Oct 13 00:39:41 2012
@@ -631,7 +631,7 @@ object Utils extends Logging {
     for ( (key, value) <- jsonDataMap) {
       if (numElements > 0)
         builder.append(",")
-      builder.append("\"" + key + "\": ")
+      builder.append("\"" + key + "\":")
       builder.append("\"" + value + "\"")
       numElements += 1
     }
@@ -654,6 +654,20 @@ object Utils extends Logging {
     builder.toString
   }
 
+  def arrayToJson[T <: Any](arr: Array[String]): String = {
+    val builder = new StringBuilder
+    builder.append("[ ")
+    var numElements = 0
+    for ( value <- arr ) {
+      if (numElements > 0)
+        builder.append(",")
+      builder.append(" " + value + "  ")
+      numElements += 1
+    }
+    builder.append(" ]")
+    builder.toString
+  }
+
   def getAllBrokersFromBrokerList(brokerListStr: String): Seq[Broker] = {
     val brokersStr = Utils.getCSVList(brokerListStr)
 



Mime
View raw message