kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1411070 - in /incubator/kafka/branches/0.8/core/src/main/scala/kafka: consumer/ConsumerFetcherManager.scala server/AbstractFetcherManager.scala server/AbstractFetcherThread.scala server/KafkaApis.scala server/ReplicaManager.scala
Date Mon, 19 Nov 2012 05:52:00 GMT
Author: junrao
Date: Mon Nov 19 05:51:59 2012
New Revision: 1411070

URL: http://svn.apache.org/viewvc?rev=1411070&view=rev
Log:
move shutting down of fetcher thread out of critical path; patched by Jun Rao; reviewed by
Neha Narkhede; KAFKA-612

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala?rev=1411070&r1=1411069&r2=1411070&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
Mon Nov 19 05:51:59 2012
@@ -75,6 +75,8 @@ class ConsumerFetcherManager(private val
               addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(),
leaderBroker)
           }
           noLeaderPartitionSet --= leaderForPartitionsMap.keySet
+
+          shutdownIdleFetcherThreads()
         } catch {
           case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet),
t)
         }
@@ -124,6 +126,7 @@ class ConsumerFetcherManager(private val
     lock.lock()
     try {
       if (partitionMap != null) {
+        partitionList.foreach(tp => removeFetcher(tp.topic, tp.partition))
         noLeaderPartitionSet ++= partitionList
         cond.signalAll()
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala?rev=1411070&r1=1411069&r2=1411070&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
Mon Nov 19 05:51:59 2012
@@ -23,7 +23,7 @@ import kafka.cluster.Broker
 
 abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends
Logging {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
-  private val fetcherThreadMap = new mutable.HashMap[(Broker, Int), AbstractFetcherThread]
+  private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
@@ -37,17 +37,17 @@ abstract class AbstractFetcherManager(pr
   def addFetcher(topic: String, partitionId: Int, initialOffset: Long, sourceBroker: Broker)
{
     mapLock synchronized {
       var fetcherThread: AbstractFetcherThread = null
-      val key = (sourceBroker, getFetcherId(topic, partitionId))
+      val key = new BrokerAndFetcherId(sourceBroker, getFetcherId(topic, partitionId))
       fetcherThreadMap.get(key) match {
         case Some(f) => fetcherThread = f
         case None =>
-          fetcherThread = createFetcherThread(key._2, sourceBroker)
+          fetcherThread = createFetcherThread(key.fetcherId, sourceBroker)
           fetcherThreadMap.put(key, fetcherThread)
           fetcherThread.start
       }
       fetcherThread.addPartition(topic, partitionId, initialOffset)
       info("adding fetcher on topic %s, partion %d, initOffset %d to broker %d with fetcherId
%d"
-          .format(topic, partitionId, initialOffset, sourceBroker.id, key._2))
+          .format(topic, partitionId, initialOffset, sourceBroker.id, key.fetcherId))
     }
   }
 
@@ -56,11 +56,20 @@ abstract class AbstractFetcherManager(pr
     mapLock synchronized {
       for ((key, fetcher) <- fetcherThreadMap) {
         fetcher.removePartition(topic, partitionId)
+      }
+    }
+  }
+
+  def shutdownIdleFetcherThreads() {
+    mapLock synchronized {
+      val keysToBeRemoved = new mutable.HashSet[BrokerAndFetcherId]
+      for ((key, fetcher) <- fetcherThreadMap) {
         if (fetcher.partitionCount <= 0) {
           fetcher.shutdown()
-          fetcherThreadMap.remove(key)
+          keysToBeRemoved += key
         }
       }
+      fetcherThreadMap --= keysToBeRemoved
     }
   }
 
@@ -73,3 +82,5 @@ abstract class AbstractFetcherManager(pr
     }
   }
 }
+
+case class BrokerAndFetcherId(broker: Broker, fetcherId: Int)
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1411070&r1=1411069&r2=1411070&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
(original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
Mon Nov 19 05:51:59 2012
@@ -29,6 +29,7 @@ import com.yammer.metrics.core.Gauge
 import java.util.concurrent.atomic.AtomicLong
 import kafka.utils.{Pool, ShutdownableThread}
 import java.util.concurrent.TimeUnit
+import java.util.concurrent.locks.ReentrantLock
 
 
 /**
@@ -38,17 +39,12 @@ abstract class AbstractFetcherThread(nam
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1)
   extends ShutdownableThread(name) {
 
-  private val fetchMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition)
-> offset map
-  private val fetchMapLock = new Object
+  private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition)
-> offset map
+  private val partitionMapLock = new ReentrantLock
+  private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout,
socketBufferSize)
   val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
 
-  val fetchRequestuilder = new FetchRequestBuilder().
-          clientId(clientId).
-          replicaId(fetcherBrokerId).
-          maxWait(maxWait).
-          minBytes(minBytes)
-
   /* callbacks to be defined in subclass */
 
   // process fetched data
@@ -67,12 +63,23 @@ abstract class AbstractFetcherThread(nam
   }
 
   override def doWork() {
-    fetchMapLock synchronized {
-      fetchMap.foreach {
+    val fetchRequestuilder = new FetchRequestBuilder().
+            clientId(clientId).
+            replicaId(fetcherBrokerId).
+            maxWait(maxWait).
+            minBytes(minBytes)
+
+    partitionMapLock.lock()
+    try {
+      while (partitionMap.isEmpty)
+        partitionMapCond.await()
+      partitionMap.foreach {
         case((topicAndPartition, offset)) =>
           fetchRequestuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
                            offset, fetchSize)
       }
+    } finally {
+      partitionMapLock.unlock()
     }
 
     val fetchRequest = fetchRequestuilder.build()
@@ -85,9 +92,8 @@ abstract class AbstractFetcherThread(nam
       case t =>
         debug("error in fetch %s".format(fetchRequest), t)
         if (isRunning.get) {
-          fetchMapLock synchronized {
-            partitionsWithError ++= fetchMap.keys
-            fetchMap.clear()
+          partitionMapLock synchronized {
+            partitionsWithError ++= partitionMap.keys
           }
         }
     }
@@ -95,11 +101,12 @@ abstract class AbstractFetcherThread(nam
 
     if (response != null) {
       // process fetched data
-      fetchMapLock synchronized {
+      partitionMapLock.lock()
+      try {
         response.data.foreach {
           case(topicAndPartition, partitionData) =>
             val (topic, partitionId) = topicAndPartition.asTuple
-            val currentOffset = fetchMap.get(topicAndPartition)
+            val currentOffset = partitionMap.get(topicAndPartition)
             if (currentOffset.isDefined) {
               partitionData.error match {
                 case ErrorMapping.NoError =>
@@ -109,24 +116,25 @@ abstract class AbstractFetcherThread(nam
                     case Some(m: MessageAndOffset) => m.nextOffset
                     case None => currentOffset.get
                   }
-                  fetchMap.put(topicAndPartition, newOffset)
+                  partitionMap.put(topicAndPartition, newOffset)
                   FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw
- newOffset
                   fetcherMetrics.byteRate.mark(validBytes)
                   // Once we hand off the partition data to the subclass, we can't mess with
it any more in this thread
                   processPartitionData(topicAndPartition, currentOffset.get, partitionData)
                 case ErrorMapping.OffsetOutOfRangeCode =>
                   val newOffset = handleOffsetOutOfRange(topicAndPartition)
-                  fetchMap.put(topicAndPartition, newOffset)
+                  partitionMap.put(topicAndPartition, newOffset)
                   warn("current offset %d for topic %s partition %d out of range; reset offset
to %d"
                     .format(currentOffset.get, topic, partitionId, newOffset))
                 case _ =>
-                  error("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
+                  warn("error for %s %d to broker %d".format(topic, partitionId, sourceBroker.id),
                     ErrorMapping.exceptionFor(partitionData.error))
                   partitionsWithError += topicAndPartition
-                  fetchMap.remove(topicAndPartition)
               }
             }
         }
+      } finally {
+        partitionMapLock.unlock()
       }
     }
 
@@ -137,26 +145,39 @@ abstract class AbstractFetcherThread(nam
   }
 
   def addPartition(topic: String, partitionId: Int, initialOffset: Long) {
-    fetchMapLock synchronized {
-      fetchMap.put(TopicAndPartition(topic, partitionId), initialOffset)
+    partitionMapLock.lock()
+    try {
+      partitionMap.put(TopicAndPartition(topic, partitionId), initialOffset)
+      partitionMapCond.signalAll()
+    } finally {
+      partitionMapLock.unlock()
     }
   }
 
   def removePartition(topic: String, partitionId: Int) {
-    fetchMapLock synchronized {
-      fetchMap.remove(TopicAndPartition(topic, partitionId))
+    partitionMapLock.lock()
+    try {
+      partitionMap.remove(TopicAndPartition(topic, partitionId))
+    } finally {
+      partitionMapLock.unlock()
     }
   }
 
   def hasPartition(topic: String, partitionId: Int): Boolean = {
-    fetchMapLock synchronized {
-      fetchMap.get(TopicAndPartition(topic, partitionId)).isDefined
+    partitionMapLock.lock()
+    try {
+      partitionMap.get(TopicAndPartition(topic, partitionId)).isDefined
+    } finally {
+      partitionMapLock.unlock()
     }
   }
 
   def partitionCount() = {
-    fetchMapLock synchronized {
-      fetchMap.size
+    partitionMapLock.lock()
+    try {
+      partitionMap.size
+    } finally {
+      partitionMapLock.unlock()
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1411070&r1=1411069&r2=1411070&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Nov
19 05:51:59 2012
@@ -146,6 +146,8 @@ class KafkaApis(val requestChannel: Requ
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.versionId, response.toMap,
error)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(stopReplicaResponse)))
+
+    replicaManager.replicaFetcherManager.shutdownIdleFetcherThreads()
   }
 
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1411070&r1=1411069&r2=1411070&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Mon
Nov 19 05:51:59 2012
@@ -206,6 +206,8 @@ class ReplicaManager(val config: KafkaCo
         }
         responseMap.put(topicAndPartition, errorCode)
       }
+      info("Completed leader and isr request %s".format(leaderAndISRRequest))
+      replicaFetcherManager.shutdownIdleFetcherThreads()
       (responseMap, ErrorMapping.NoError)
     }
   }



Mime
View raw message