kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject svn commit: r1397747 [2/2] - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/controller/ core/src/main/scala/kafka/utils/ core/src/test/scala/unit/kafka/admin/
Date Sat, 13 Oct 2012 00:39:42 GMT
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Sat Oct 13
00:39:41 2012
@@ -26,9 +26,9 @@ import scala.collection._
 import kafka.api.LeaderAndIsr
 import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
-import kafka.common.{KafkaException, NoEpochForPartitionException}
 import kafka.controller.{PartitionAndReplica, ReassignedPartitionsContext}
 import kafka.admin._
+import kafka.common.{TopicAndPartition, KafkaException, NoEpochForPartitionException}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -36,6 +36,7 @@ object ZkUtils extends Logging {
   val BrokerTopicsPath = "/brokers/topics"
   val ControllerPath = "/controller"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
+  val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -428,13 +429,13 @@ object ZkUtils extends Logging {
   }
 
   def getPartitionLeaderAndIsrForTopics(zkClient: ZkClient, topics: Seq[String]):
-  mutable.Map[(String, Int), LeaderAndIsr] = {
-    val ret = new mutable.HashMap[(String, Int), LeaderAndIsr]
+  mutable.Map[TopicAndPartition, LeaderAndIsr] = {
+    val ret = new mutable.HashMap[TopicAndPartition, LeaderAndIsr]
     val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
     for((topic, partitions) <- partitionsForTopics) {
       for(partition <- partitions) {
         ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition.toInt) match {
-          case Some(leaderAndIsr) => ret.put((topic, partition.toInt), leaderAndIsr)
+          case Some(leaderAndIsr) => ret.put(TopicAndPartition(topic, partition.toInt),
leaderAndIsr)
           case None =>
         }
       }
@@ -442,8 +443,8 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[(String,
Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[TopicAndPartition,
Seq[Int]] = {
+    val ret = new mutable.HashMap[TopicAndPartition, Seq[Int]]
     topics.foreach { topic =>
       val jsonPartitionMapOpt = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       jsonPartitionMapOpt match {
@@ -452,7 +453,7 @@ object ZkUtils extends Logging {
             case Some(m) =>
               val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
               for((partition, replicas) <- replicaMap){
-                ret.put((topic, partition.toInt), replicas.map(_.toInt))
+                ret.put(TopicAndPartition(topic, partition.toInt), replicas.map(_.toInt))
                 debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic,
partition, replicas))
               }
             case None =>
@@ -519,7 +520,7 @@ object ZkUtils extends Logging {
     }.flatten[(String, Int)].toSeq
   }
 
-  def getPartitionsBeingReassigned(zkClient: ZkClient): Map[(String, Int), ReassignedPartitionsContext]
= {
+  def getPartitionsBeingReassigned(zkClient: ZkClient): Map[TopicAndPartition, ReassignedPartitionsContext]
= {
     // read the partitions and their new replica list
     val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1
     jsonPartitionMapOpt match {
@@ -529,11 +530,11 @@ object ZkUtils extends Logging {
           val newReplicas = p._2
           (p._1 -> new ReassignedPartitionsContext(newReplicas))
         }
-      case None => Map.empty[(String, Int), ReassignedPartitionsContext]
+      case None => Map.empty[TopicAndPartition, ReassignedPartitionsContext]
     }
   }
 
-  def parsePartitionReassignmentData(jsonData: String):Map[(String, Int), Seq[Int]] = {
+  def parsePartitionReassignmentData(jsonData: String):Map[TopicAndPartition, Seq[Int]] =
{
     SyncJSON.parseFull(jsonData) match {
       case Some(m) =>
         val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
@@ -541,20 +542,21 @@ object ZkUtils extends Logging {
           val topic = reassignedPartitions._1.split(",").head
           val partition = reassignedPartitions._1.split(",").last.toInt
           val newReplicas = reassignedPartitions._2.map(_.toInt)
-          (topic, partition) -> newReplicas
+          TopicAndPartition(topic, partition) -> newReplicas
         }
-      case None => Map.empty[(String, Int), Seq[Int]]
+      case None => Map.empty[TopicAndPartition, Seq[Int]]
     }
   }
 
-  def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[(String,
Int), Seq[Int]]) {
+  def updatePartitionReassignmentData(zkClient: ZkClient, partitionsToBeReassigned: Map[TopicAndPartition,
Seq[Int]]) {
     val zkPath = ZkUtils.ReassignPartitionsPath
     partitionsToBeReassigned.size match {
       case 0 => // need to delete the /admin/reassign_partitions path
         deletePath(zkClient, zkPath)
         info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath))
       case _ =>
-        val jsonData = Utils.mapToJson(partitionsToBeReassigned.map(p => ("%s,%s".format(p._1._1,
p._1._2)) -> p._2.map(_.toString)))
+        val jsonData = Utils.mapToJson(partitionsToBeReassigned.map(p =>
+          ("%s,%s".format(p._1.topic, p._1.partition)) -> p._2.map(_.toString)))
         try {
           updatePersistentPath(zkClient, zkPath, jsonData)
           info("Updated partition reassignment path with %s".format(jsonData))
@@ -567,8 +569,8 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]):
Seq[PartitionAndReplica] = {
-    brokerIds.map { brokerId =>
+  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]):
Set[PartitionAndReplica] = {
+    Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId =>
       // read all the partitions and their assigned replicas into a map organized by
       // { replica id -> partition 1, partition 2...
       val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics,
brokerId)
@@ -577,7 +579,30 @@ object ZkUtils extends Logging {
       partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId))
     }.flatten
   }
+  
+  def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition]
= {
+    // read the partitions and their new replica list
+    val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1
+    jsonPartitionListOpt match {
+      case Some(jsonPartitionList) => parsePreferredReplicaElectionData(jsonPartitionList)
+      case None => Set.empty[TopicAndPartition]
+    }
+  }
 
+  def parsePreferredReplicaElectionData(jsonData: String):Set[TopicAndPartition] = {
+    SyncJSON.parseFull(jsonData) match {
+      case Some(m) =>
+        val topicAndPartitions = m.asInstanceOf[Array[Map[String, String]]]
+        val partitions = topicAndPartitions.map { p =>
+          val topicPartitionMap = p.asInstanceOf[Map[String, String]]
+          val topic = topicPartitionMap.get("topic").get
+          val partition = topicPartitionMap.get("partition").get.toInt
+          TopicAndPartition(topic, partition)
+        }
+        Set.empty[TopicAndPartition] ++ partitions
+      case None => Set.empty[TopicAndPartition]
+    }
+  }
 
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
     val brokerIdPath = BrokerIdsPath + "/" + brokerId
@@ -629,6 +654,16 @@ object ZkUtils extends Logging {
     if(topics == null) Seq.empty[String]
     else topics
   }
+
+  def getAllPartitions(zkClient: ZkClient): Set[TopicAndPartition] = {
+    val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
+    if(topics == null) Set.empty[TopicAndPartition]
+    else {
+      topics.map { topic =>
+        getChildren(zkClient, getTopicPartitionsPath(topic)).map(_.toInt).map(TopicAndPartition(topic,
_))
+      }.flatten.toSet
+    }
+  }
 }
 
 class LeaderExistsOrChangedListener(topic: String,

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1397747&r1=1397746&r2=1397747&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Sat
Oct 13 00:39:41 2012
@@ -20,10 +20,9 @@ import junit.framework.Assert._
 import org.junit.Test
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
-import kafka.server.{KafkaServer, KafkaConfig}
-import collection.mutable.ListBuffer
-import kafka.common.ErrorMapping
-import kafka.utils.{Utils, ZkUtils, TestUtils}
+import kafka.server.KafkaConfig
+import kafka.utils.{ZkUtils, TestUtils}
+import kafka.common.{TopicAndPartition, ErrorMapping}
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -219,13 +218,14 @@ class AdminTest extends JUnit3Suite with
     // reassign partition 0
     val newReplicas = Seq(0, 2, 3)
     val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned)
-> newReplicas))
+    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition
-> newReplicas))
     assertTrue("Partition reassignment attempt failed for [test, 0]", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned,
newReplicas,
-      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned)
== ReassignmentCompleted;
+      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
+      Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
@@ -243,13 +243,14 @@ class AdminTest extends JUnit3Suite with
     // reassign partition 0
     val newReplicas = Seq(1, 2, 3)
     val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned)
-> newReplicas))
+    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition
-> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned,
newReplicas,
-      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned)
== ReassignmentCompleted;
+      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
@@ -268,13 +269,14 @@ class AdminTest extends JUnit3Suite with
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned)
-> newReplicas))
+    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition
-> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
       val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topic, partitionToBeReassigned,
newReplicas,
-      Map((topic, partitionToBeReassigned) -> newReplicas), partitionsBeingReassigned)
== ReassignmentCompleted;
+      CheckReassignmentStatus.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition,
newReplicas,
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
@@ -290,10 +292,11 @@ class AdminTest extends JUnit3Suite with
     // reassign partition 0
     val newReplicas = Seq(2, 3)
     val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned)
-> newReplicas))
+    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition
-> newReplicas))
     assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
     val reassignedPartitions = ZkUtils.getPartitionsBeingReassigned(zkClient)
-    assertFalse("Partition should not be reassigned", reassignedPartitions.contains((topic,
partitionToBeReassigned)))
+    assertFalse("Partition should not be reassigned", reassignedPartitions.contains(topicAndPartition))
     // leader should be 2
     servers.foreach(_.shutdown())
   }
@@ -308,7 +311,8 @@ class AdminTest extends JUnit3Suite with
     // reassign partition 0
     val newReplicas = Seq(0, 1)
     val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned)
-> newReplicas))
+    val topicAndPartition = TopicAndPartition(topic, partitionToBeReassigned)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition
-> newReplicas))
     reassignPartitionsCommand.reassignPartitions
     // create brokers
     val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new
KafkaConfig(b)))
@@ -319,70 +323,41 @@ class AdminTest extends JUnit3Suite with
   }
 
   @Test
-  def testResumePartitionReassignmentAfterLeaderWasMoved() {
-    var expectedReplicaAssignment = Map(0  -> List(1, 0, 2, 3))
-    val leaderForPartitionMap = Map(0 -> 2)
+  def testPreferredReplicaJsonData() {
+    // write preferred replica json data to zk path
+    val partitionsForPreferredReplicaElection = Set(TopicAndPartition("test", 1), TopicAndPartition("test2",
1))
+    PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, partitionsForPreferredReplicaElection)
+    // try to read it back and compare with what was written
+    val preferredReplicaElectionZkData = ZkUtils.readData(zkClient,
+        ZkUtils.PreferredReplicaLeaderElectionPath)._1
+    val partitionsUndergoingPreferredReplicaElection =
+      PreferredReplicaLeaderElectionCommand.parsePreferredReplicaJsonData(preferredReplicaElectionZkData)
+    assertEquals("Preferred replica election ser-de failed", partitionsForPreferredReplicaElection,
+      partitionsUndergoingPreferredReplicaElection)
+  }
+
+  @Test
+  def testBasicPreferredReplicaElection() {
+    val expectedReplicaAssignment = Map(1  -> List("0", "1", "2"))
     val topic = "test"
-    val serverConfigs = TestUtils.createBrokerConfigs(4).map(b => new KafkaConfig(b))
-    val servers = new ListBuffer[KafkaServer]
-    // create the topic
-    AdminUtils.createTopicPartitionAssignmentPathInZK(topic,
-      expectedReplicaAssignment.map(r => (r._1) -> r._2.map(_.toString)), zkClient)
-    // bring up just brokers 0 and 1
-    servers.append(TestUtils.createServer(serverConfigs(0)))
-    servers.append(TestUtils.createServer(serverConfigs(1)))
-    val newReplicas = Seq(2, 3)
-    val partitionToBeReassigned = 0
-    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map((topic, partitionToBeReassigned)
-> newReplicas))
-    reassignPartitionsCommand.reassignPartitions
-    // this partition reassignment request should be ignored since replicas 2 and 3 are not
online
-    // and the admin path should be deleted as well
-    TestUtils.waitUntilTrue(checkIfReassignmentPathIsDeleted, 1000)
-    var assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should not be reassigned to 2, 3 yet", expectedReplicaAssignment(0),
assignedReplicas)
+    val partition = 1
+    val preferredReplica = 0
     // create brokers
-    servers.append(TestUtils.createServer(serverConfigs(2)))
-    servers.append(TestUtils.createServer(serverConfigs(3)))
-    // wait until new replicas catch up with leader
-    TestUtils.waitUntilTrue(checkIfNewReplicasInIsr, 2000)
-    // change the assigned replicas to 0 and 1
-    updateAssignedReplicasForPartition("test", 0, List(0, 1))
-    // reissue the partition reassignment
-    reassignPartitionsCommand.reassignPartitions
-    // create leaders for the partition to be reassigned
-    TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
-    // bounce controller
-    servers.head.shutdown()
-    servers.head.startup()
-    TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1500)
-    assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
+    val serverConfigs = TestUtils.createBrokerConfigs(3).map(new KafkaConfig(_))
+    // create the topic
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    val servers = serverConfigs.reverse.map(s => TestUtils.createServer(s))
+    // broker 2 should be the leader since it was started first
+    val currentLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition,
1000, None).get
+    // trigger preferred replica election
+    val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(TopicAndPartition(topic,
partition)))
+    preferredReplicaElection.moveLeaderToPreferredReplica()
+    val newLeader = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, partition,
1000, Some(currentLeader)).get
+    assertEquals("Preferred replica election failed", preferredReplica, newLeader)
     servers.foreach(_.shutdown())
   }
 
   private def checkIfReassignPartitionPathExists(): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }
-
-  private def checkIfReassignmentPathIsDeleted(): Boolean = {
-    !ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
-  }
-
-  private def checkIfNewReplicasInIsr(): Boolean = {
-    val leaderAndIsrOpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, "test", 0)
-    leaderAndIsrOpt match {
-      case Some(leaderAndIsr) =>
-        if(leaderAndIsr.isr.contains(2) && leaderAndIsr.isr.contains(3))
-          true
-        else
-          false
-      case None => false
-    }
-  }
-
-  private def updateAssignedReplicasForPartition(topic: String, partition: Int, newAssignedReplicas:
Seq[Int]) {
-    val zkPath = ZkUtils.getTopicPath(topic)
-    val jsonPartitionMap = Utils.mapToJson(Map(partition.toString -> newAssignedReplicas.map(_.toString)))
-    ZkUtils.updatePersistentPath(zkClient, zkPath, jsonPartitionMap)
-  }
 }



Mime
View raw message