kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1396673 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/controller/ main/scala/kafka/server/ test/scala/unit/kafka/api/ test/scala/unit/kafka/server/
Date Wed, 10 Oct 2012 16:25:43 GMT
Author: junrao
Date: Wed Oct 10 16:25:42 2012
New Revision: 1396673

URL: http://svn.apache.org/viewvc?rev=1396673&view=rev
Log:
broker needs to know the replication factor per partition; patched by Yang Ye; reviewed by
Jun Rao; kafka-510

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Wed
Oct 10 16:25:42 2012
@@ -27,38 +27,46 @@ import collection.mutable.HashMap
 object LeaderAndIsr {
   val initialLeaderEpoch: Int = 0
   val initialZKVersion: Int = 0
-  def readFrom(buffer: ByteBuffer): LeaderAndIsr = {
+}
+
+case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int){
+  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR,
LeaderAndIsr.initialZKVersion)
+
+  override def toString(): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
+    jsonDataMap.put("ISR", isr.mkString(","))
+    Utils.stringMapToJsonString(jsonDataMap)
+  }
+}
+
+
+object PartitionInfo {
+  def readFrom(buffer: ByteBuffer): PartitionInfo = {
     val leader = buffer.getInt
     val leaderGenId = buffer.getInt
     val ISRString = Utils.readShortString(buffer, "UTF-8")
     val ISR = ISRString.split(",").map(_.toInt).toList
     val zkVersion = buffer.getInt
-    new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion)
+    val replicationFactor = buffer.getInt
+    PartitionInfo(LeaderAndIsr(leader, leaderGenId, ISR, zkVersion), replicationFactor)
   }
 }
 
-case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion:
Int){
-  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR,
LeaderAndIsr.initialZKVersion)
-
+case class PartitionInfo(val leaderAndIsr: LeaderAndIsr, val replicationFactor: Int) {
   def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(leader)
-    buffer.putInt(leaderEpoch)
-    Utils.writeShortString(buffer, isr.mkString(","), "UTF-8")
-    buffer.putInt(zkVersion)
+    buffer.putInt(leaderAndIsr.leader)
+    buffer.putInt(leaderAndIsr.leaderEpoch)
+    Utils.writeShortString(buffer, leaderAndIsr.isr.mkString(","), "UTF-8")
+    buffer.putInt(leaderAndIsr.zkVersion)
+    buffer.putInt(replicationFactor)
   }
 
   def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + isr.mkString(",").length) + 4
+    val size = 4 + 4 + (2 + leaderAndIsr.isr.mkString(",").length) + 4 + 4
     size
   }
-
-  override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
-    jsonDataMap.put("leader", leader.toString)
-    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
-    jsonDataMap.put("ISR", isr.mkString(","))
-    Utils.stringMapToJsonString(jsonDataMap)
-  }
 }
 
 
@@ -73,17 +81,17 @@ object LeaderAndIsrRequest {
     val versionId = buffer.getShort
     val clientId = Utils.readShortString(buffer)
     val ackTimeoutMs = buffer.getInt
-    val leaderAndISRRequestCount = buffer.getInt
-    val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
+    val partitionInfosCount = buffer.getInt
+    val partitionInfos = new HashMap[(String, Int), PartitionInfo]
 
-    for(i <- 0 until leaderAndISRRequestCount){
+    for(i <- 0 until partitionInfosCount){
       val topic = Utils.readShortString(buffer, "UTF-8")
       val partition = buffer.getInt
-      val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer)
+      val partitionInfo = PartitionInfo.readFrom(buffer)
 
-      leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
+      partitionInfos.put((topic, partition), partitionInfo)
     }
-    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, leaderAndISRInfos)
+    new LeaderAndIsrRequest(versionId, clientId, ackTimeoutMs, partitionInfos)
   }
 }
 
@@ -91,19 +99,19 @@ object LeaderAndIsrRequest {
 case class LeaderAndIsrRequest (versionId: Short,
                                 clientId: String,
                                 ackTimeoutMs: Int,
-                                leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
+                                partitionInfos: Map[(String, Int), PartitionInfo])
         extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
 
-  def this(leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
-    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
leaderAndISRInfos)
+  def this(partitionInfos: Map[(String, Int), PartitionInfo]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
partitionInfos)
   }
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putShort(versionId)
     Utils.writeShortString(buffer, clientId)
     buffer.putInt(ackTimeoutMs)
-    buffer.putInt(leaderAndISRInfos.size)
-    for((key, value) <- leaderAndISRInfos){
+    buffer.putInt(partitionInfos.size)
+    for((key, value) <- partitionInfos){
       Utils.writeShortString(buffer, key._1, "UTF-8")
       buffer.putInt(key._2)
       value.writeTo(buffer)
@@ -112,7 +120,7 @@ case class LeaderAndIsrRequest (versionI
 
   def sizeInBytes(): Int = {
     var size = 1 + 2 + (2 + clientId.length) + 4 + 4
-    for((key, value) <- leaderAndISRInfos)
+    for((key, value) <- partitionInfos)
       size += (2 + key._1.length) + 4 + value.sizeInBytes
     size
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Wed Oct
10 16:25:42 2012
@@ -21,15 +21,17 @@ import kafka.utils._
 import java.lang.Object
 import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
-import kafka.common.ErrorMapping
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
+import kafka.common.ErrorMapping
+
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR,
RAR
  */
 class Partition(val topic: String,
                 val partitionId: Int,
+                var replicationFactor: Int,
                 time: Time,
                 val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup
{
   private val localBrokerId = replicaManager.config.brokerId
@@ -57,8 +59,7 @@ class Partition(val topic: String,
   )
 
   def isUnderReplicated(): Boolean = {
-    // TODO: need to pass in replication factor from controller
-    inSyncReplicas.size < replicaManager.config.defaultReplicationFactor
+    inSyncReplicas.size < replicationFactor
   }
 
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
@@ -292,7 +293,7 @@ class Partition(val topic: String,
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",
")))
     val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r =>
r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString,
zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString(),
zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newISR
       zkVersion = newVersion

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Wed Oct 10
16:25:42 2012
@@ -67,7 +67,7 @@ class Replica(val brokerId: Int,
   def highWatermark_=(newHighWatermark: Long) {
     if (isLocal) {
       trace("Setting hw for replica %d topic %s partition %d on broker %d to %d"
-              .format(brokerId, topic, partitionId, newHighWatermark))
+              .format(brokerId, topic, partitionId, brokerId, newHighWatermark))
       highWatermarkValue.set(newHighWatermark)
     } else
       throw new KafkaException("Unable to set highwatermark for replica %d topic %s partition
%d since it's not local"

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
Wed Oct 10 16:25:42 2012
@@ -141,7 +141,7 @@ class RequestSendThread(val controllerId
 // request
 class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse)
=> Unit) => Unit)
   extends  Logging {
-  val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+  val brokerRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionInfo]]
 
   def newBatch() {
     // raise error if the previous batch is not empty
@@ -151,18 +151,19 @@ class ControllerBrokerRequestBatch(sendR
     brokerRequestMap.clear()
   }
 
-  def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr:
LeaderAndIsr) {
+  def addRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, leaderAndIsr:
LeaderAndIsr, replicationFactor: Int) {
+    val partitionInfo = PartitionInfo(leaderAndIsr, replicationFactor)
     brokerIds.foreach { brokerId =>
-      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), LeaderAndIsr])
-      brokerRequestMap(brokerId).put((topic, partition), leaderAndIsr)
+      brokerRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionInfo])
+      brokerRequestMap(brokerId).put((topic, partition), partitionInfo)
     }
   }
 
   def sendRequestsToBrokers() {
     brokerRequestMap.foreach { m =>
       val broker = m._1
-      val leaderAndIsr = m._2
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(leaderAndIsr)
+      val partitionInfo = m._2
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionInfo)
       info("Sending to broker %d leaderAndIsr request of %s".format(broker, leaderAndIsrRequest))
       sendRequest(broker, leaderAndIsrRequest, null)
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
Wed Oct 10 16:25:42 2012
@@ -230,14 +230,14 @@ class PartitionStateMachine(controller: 
         debug("Live assigned replicas for partition [%s, %d] are: [%s]".format(topic, partition,
liveAssignedReplicas))
         // make the first replica in the list of assigned replicas, the leader
         val leader = liveAssignedReplicas.head
-        var leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+        val leaderAndIsr = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
         try {
           ZkUtils.createPersistentPath(controllerContext.zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString)
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), leaderAndIsr.toString())
           // TODO: the above write can fail only if the current controller lost its zk session
and the new controller
           // took over and initialized this partition. This can happen if the current controller
went into a long
           // GC pause
-          brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition,
leaderAndIsr)
+          brokerRequestBatch.addRequestForBrokers(liveAssignedReplicas, topic, partition,
leaderAndIsr, replicaAssignment.size)
           controllerContext.allLeaders.put((topic, partition), leaderAndIsr.leader)
           partitionState.put((topic, partition), OnlinePartition)
         }catch {
@@ -271,7 +271,7 @@ class PartitionStateMachine(controller: 
             info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader,
topic, partition))
             // store new leader and isr info in cache
             brokerRequestBatch.addRequestForBrokers(liveAssignedReplicasToThisPartition,
topic, partition,
-              newLeaderAndIsr)
+              newLeaderAndIsr, assignedReplicas.size)
           }catch {
             case e => throw new StateChangeFailedException(("Error while electing leader
for partition" +
               " [%s, %d]").format(topic, partition), e)
@@ -338,7 +338,7 @@ class PartitionStateMachine(controller: 
           // update the new leadership decision in zookeeper or retry
           val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
             ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+            newLeaderAndIsr.toString(), currentLeaderAndIsr.zkVersion)
           newLeaderAndIsr.zkVersion = newVersion
           zookeeperPathUpdateSucceeded = updateSucceeded
           newLeaderAndIsr

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
Wed Oct 10 16:25:42 2012
@@ -37,7 +37,7 @@ class ReplicaStateMachine(controller: Ka
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest)
-  private var isShuttingDown = new AtomicBoolean(false)
+  private val isShuttingDown = new AtomicBoolean(false)
 
   /**
    * Invoked on successful controller election. First registers a broker change listener
since that triggers all
@@ -102,6 +102,7 @@ class ReplicaStateMachine(controller: Ka
    */
   private def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState:
ReplicaState) {
     try {
+      val replicaAssignment = controllerContext.partitionReplicaAssignment((topic, partition))
       targetState match {
         case OnlineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(OnlineReplica, OfflineReplica),
targetState)
@@ -113,7 +114,7 @@ class ReplicaStateMachine(controller: Ka
             case Some(leaderAndIsr) =>
               controllerContext.liveBrokerIds.contains(leaderAndIsr.leader) match {
                 case true => // leader is alive
-                  brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition,
leaderAndIsr)
+                  brokerRequestBatch.addRequestForBrokers(List(replicaId), topic, partition,
leaderAndIsr, replicaAssignment.size)
                   replicaState.put((topic, partition, replicaId), OnlineReplica)
                   info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId,
topic, partition))
                 case false => // ignore partitions whose leader is not alive
@@ -135,7 +136,7 @@ class ReplicaStateMachine(controller: Ka
                 info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition,
newLeaderAndIsr.toString()))
                 // update the new leadership decision in zookeeper or retry
                 val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString,
+                  ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), newLeaderAndIsr.toString(),
                   leaderAndIsr.zkVersion)
                 newLeaderAndIsr.zkVersion = newVersion
                 zookeeperPathUpdateSucceeded = updateSucceeded
@@ -144,7 +145,7 @@ class ReplicaStateMachine(controller: Ka
             }
           }
           // send the shrunk ISR state change request only to the leader
-          brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition,
newLeaderAndIsr)
+          brokerRequestBatch.addRequestForBrokers(List(newLeaderAndIsr.leader), topic, partition,
newLeaderAndIsr, replicaAssignment.size)
           // update the local leader and isr cache
           controllerContext.allLeaders.put((topic, partition), newLeaderAndIsr.leader)
           replicaState.put((topic, partition, replicaId), OfflineReplica)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Oct
10 16:25:42 2012
@@ -17,7 +17,6 @@
 
 package kafka.server
 
-import java.io.IOException
 import kafka.admin.{CreateTopicCommand, AdminUtils}
 import kafka.api._
 import kafka.message._
@@ -26,7 +25,6 @@ import kafka.utils.{Pool, SystemTime, Lo
 import org.apache.log4j.Logger
 import scala.collection._
 import mutable.HashMap
-import scala.math._
 import kafka.network.RequestChannel.Response
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
@@ -34,6 +32,7 @@ import kafka.metrics.KafkaMetricsGroup
 import org.I0Itec.zkclient.ZkClient
 import kafka.common._
 
+
 /**
  * Logic to handle the various Kafka requests
  */
@@ -127,10 +126,13 @@ class KafkaApis(val requestChannel: Requ
     produceRequest.data.foreach(partitionAndData =>
       maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
 
+    val allPartitionHasReplicationFactorOne =
+      !produceRequest.data.keySet.exists(m => replicaManager.getReplicationFactorForPartition(m.topic,
m.partition) != 1)
     if (produceRequest.requiredAcks == 0 ||
         produceRequest.requiredAcks == 1 ||
         produceRequest.numPartitions <= 0 ||
-        numPartitionsInError == produceRequest.numPartitions) {
+        allPartitionHasReplicationFactorOne ||
+        numPartitionsInError == produceRequest.numPartitions){
       val statuses = localProduceResults.map(r => r.key -> ProducerResponseStatus(r.errorCode,
r.start)).toMap
       val response = ProducerResponse(produceRequest.versionId, produceRequest.correlationId,
statuses)
       requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@@ -512,8 +514,11 @@ class KafkaApis(val requestChannel: Requ
       trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
         .format(topic, partitionId, fetchPartitionStatus.acksPending))
       if (fetchPartitionStatus.acksPending) {
-        val partition = replicaManager.getOrCreatePartition(topic, partitionId)
-        val (hasEnough, errorCode) = partition.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset,
produce.requiredAcks)
+        val partitionOpt = replicaManager.getPartition(topic, partitionId)
+        val (hasEnough, errorCode) = if(partitionOpt.isDefined)
+          partitionOpt.get.checkEnoughReplicasReachOffset(fetchPartitionStatus.requiredOffset,
produce.requiredAcks)
+        else
+          (false, ErrorMapping.UnknownTopicOrPartitionCode)
         if (errorCode != ErrorMapping.NoError) {
           fetchPartitionStatus.acksPending = false
           fetchPartitionStatus.error = errorCode

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Wed
Oct 10 16:25:42 2012
@@ -22,11 +22,12 @@ import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
 import kafka.log.LogManager
-import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
-import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
+import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.api.{PartitionInfo, LeaderAndIsrRequest, LeaderAndIsr}
+
 
 object ReplicaManager {
   val UnknownLogEndOffset = -1L
@@ -39,7 +40,6 @@ class ReplicaManager(val config: KafkaCo
   private val leaderPartitionsLock = new Object
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
-
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
   val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
   info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
@@ -69,6 +69,14 @@ class ReplicaManager(val config: KafkaCo
       kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread",
0, config.defaultFlushIntervalMs)
   }
 
+  def getReplicationFactorForPartition(topic: String, partitionId: Int) = {
+    val partitionOpt = getPartition(topic, partitionId)
+    if(partitionOpt.isDefined)
+      partitionOpt.get.replicationFactor
+    else
+      -1
+  }
+
   def startup() {
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
@@ -93,12 +101,14 @@ class ReplicaManager(val config: KafkaCo
     errorCode
   }
 
-  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
+  def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition
= {
     var partition = allPartitions.get((topic, partitionId))
     if (partition == null) {
-      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId,
time, this))
+      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId,
replicationFactor, time, this))
       partition = allPartitions.get((topic, partitionId))
     }
+    if(partition.replicationFactor != replicationFactor)
+      partition.replicationFactor = replicationFactor
     partition
   }
 
@@ -125,10 +135,6 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  def getOrCreateReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId):
Replica =  {
-    getOrCreatePartition(topic, partitionId).getOrCreateReplica(replicaId)
-  }
-
   def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica]
=  {
     val partitionOpt = getPartition(topic, partitionId)
     partitionOpt match {
@@ -141,23 +147,23 @@ class ReplicaManager(val config: KafkaCo
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
 
-    for((partitionInfo, leaderAndISR) <- leaderAndISRRequest.leaderAndISRInfos){
+    for((topicAndPartition, partitionInfo) <- leaderAndISRRequest.partitionInfos){
       var errorCode = ErrorMapping.NoError
-      val topic = partitionInfo._1
-      val partitionId = partitionInfo._2
+      val topic = topicAndPartition._1
+      val partitionId = topicAndPartition._2
 
-      val requestedLeaderId = leaderAndISR.leader
+      val requestedLeaderId = partitionInfo.leaderAndIsr.leader
       try {
         if(requestedLeaderId == config.brokerId)
-          makeLeader(topic, partitionId, leaderAndISR)
+          makeLeader(topic, partitionId, partitionInfo)
         else
-          makeFollower(topic, partitionId, leaderAndISR)
+          makeFollower(topic, partitionId, partitionInfo)
       } catch {
         case e =>
           error("Error processing leaderAndISR request %s".format(leaderAndISRRequest), e)
           errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
       }
-      responseMap.put(partitionInfo, errorCode)
+      responseMap.put(topicAndPartition, errorCode)
     }
 
     /**
@@ -167,7 +173,7 @@ class ReplicaManager(val config: KafkaCo
      */
 //    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
 //      startHighWaterMarksCheckPointThread
-//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry
=> entry._1)
+//      val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.partitionInfos.contains(p._1)).map(entry
=> entry._1)
 //      info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
 //      partitionsToRemove.foreach(p => stopReplica(p._1, p._2))
 //    }
@@ -175,10 +181,11 @@ class ReplicaManager(val config: KafkaCo
     responseMap
   }
 
-  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
+  private def makeLeader(topic: String, partitionId: Int, partitionInfo: PartitionInfo) =
{
+    val leaderAndIsr = partitionInfo.leaderAndIsr
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
-    val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
+    val partition = getOrCreatePartition(topic, partitionId, partitionInfo.replicationFactor)
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, true)) {
       // also add this partition to the list of partitions for which the leader is the current
broker
       leaderPartitionsLock synchronized {
         leaderPartitions += partition
@@ -187,13 +194,14 @@ class ReplicaManager(val config: KafkaCo
     info("Completed the leader state transition for topic %s partition %d".format(topic,
partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
-    val leaderBrokerId: Int = leaderAndISR.leader
+  private def makeFollower(topic: String, partitionId: Int, partitionInfo: PartitionInfo)
{
+    val leaderAndIsr = partitionInfo.leaderAndIsr
+    val leaderBrokerId: Int = leaderAndIsr.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition
%d"
                  .format(leaderBrokerId, topic, partitionId))
 
-    val partition = getOrCreatePartition(topic, partitionId)
-    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, false)) {
+    val partition = getOrCreatePartition(topic, partitionId, partitionInfo.replicationFactor)
+    if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndIsr, false)) {
       // remove this replica's partition from the ISR expiration queue
       leaderPartitionsLock synchronized {
         leaderPartitions -= partition
@@ -209,8 +217,12 @@ class ReplicaManager(val config: KafkaCo
   }
 
   def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long)
= {
-    val partition = getOrCreatePartition(topic, partitionId)
-    partition.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
+    val partitionOpt = getPartition(topic, partitionId)
+    if(partitionOpt.isDefined){
+      partitionOpt.get.updateLeaderHWAndMaybeExpandISR(replicaId, offset)
+    } else {
+      warn("In recording follower position, the partition [%s, %d] hasn't been created, skip
updating leader HW".format(topic, partitionId))
+    }
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Wed Oct 10 16:25:42 2012
@@ -21,7 +21,6 @@ import org.junit._
 import org.scalatest.junit.JUnitSuite
 import junit.framework.Assert._
 import java.nio.ByteBuffer
-import kafka.api._
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.cluster.Broker
 import collection.mutable._
@@ -83,8 +82,8 @@ object SerializationTestUtils{
   def createTestLeaderAndISRRequest() : LeaderAndIsrRequest = {
     val leaderAndISR1 = new LeaderAndIsr(leader1, 1, isr1, 1)
     val leaderAndISR2 = new LeaderAndIsr(leader2, 1, isr2, 2)
-    val map = Map(((topic1, 0), leaderAndISR1),
-                  ((topic2, 0), leaderAndISR2))
+    val map = Map(((topic1, 0), PartitionInfo(leaderAndISR1, 3)),
+                  ((topic2, 0), PartitionInfo(leaderAndISR2, 3)))
     new LeaderAndIsrRequest(map)
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Wed Oct 10 16:25:42 2012
@@ -45,7 +45,7 @@ class HighwatermarkPersistenceTest exten
     replicaManager.checkpointHighWatermarks()
     var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
     assertEquals(0L, fooPartition0Hw)
-    val partition0 = replicaManager.getOrCreatePartition(topic, 0)
+    val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
     // create leader log
     val log0 = getMockLog
     // create leader and follower replicas
@@ -86,7 +86,7 @@ class HighwatermarkPersistenceTest exten
     replicaManager.checkpointHighWatermarks()
     var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
     assertEquals(0L, topic1Partition0Hw)
-    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
+    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
     // create leader log
     val topic1Log0 = getMockLog
     // create a local replica for topic1
@@ -102,7 +102,7 @@ class HighwatermarkPersistenceTest exten
     assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
     assertEquals(5L, topic1Partition0Hw)
     // add another partition and set highwatermark
-    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
+    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
     // create leader log
     val topic2Log0 = getMockLog
     // create a local replica for topic2

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
Wed Oct 10 16:25:42 2012
@@ -81,7 +81,7 @@ class ISRExpirationTest extends JUnit3Su
                                                localLog: Log): Partition = {
     val leaderId=config.brokerId
     val replicaManager = new ReplicaManager(config, time, null, null, null)
-    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
+    val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1396673&r1=1396672&r2=1396673&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Wed Oct 10 16:25:42 2012
@@ -237,7 +237,7 @@ class SimpleFetchTest extends JUnit3Suit
 
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time,
leaderId: Int,
                                                localLog: Log, leaderHW: Long, replicaManager:
ReplicaManager): Partition = {
-    val partition = new Partition(topic, partitionId, time, replicaManager)
+    val partition = new Partition(topic, partitionId, 2, time, replicaManager)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica



Mime
View raw message