kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-1461; Implement per-partition back-off for replica fetcher; reviewed by Jun Rao and Guozhang Wang
Date Tue, 07 Apr 2015 22:26:51 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d9ab917dc -> 79f7cca85


KAFKA-1461; Implement per-partition back-off for replica fetcher; reviewed by Jun Rao and
Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79f7cca8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79f7cca8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79f7cca8

Branch: refs/heads/trunk
Commit: 79f7cca85e9ed6511ad50cb9412bfa5c8e5b9ddb
Parents: d9ab917
Author: Sriharsha Chintalapani <harsha@hortonworks.com>
Authored: Tue Apr 7 15:26:26 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Apr 7 15:26:26 2015 -0700

----------------------------------------------------------------------
 .../kafka/server/AbstractFetcherThread.scala    | 139 ++++++++++++-------
 .../kafka/server/ReplicaFetcherThread.scala     |   2 +-
 2 files changed, 87 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/79f7cca8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f178527..a439046 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -22,6 +22,7 @@ import kafka.utils.{Pool, ShutdownableThread}
 import kafka.consumer.{PartitionTopicInfo, SimpleConsumer}
 import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder}
 import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping}
+import kafka.utils.DelayedItem
 import kafka.utils.CoreUtils.inLock
 import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset}
 import kafka.metrics.KafkaMetricsGroup
@@ -36,12 +37,11 @@ import com.yammer.metrics.core.Gauge
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
  */
-
 abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint,
socketTimeout: Int, socketBufferSize: Int,
                                      fetchSize: Int, fetcherBrokerId: Int = -1, maxWait:
Int = 0, minBytes: Int = 1, fetchBackOffMs: Int = 0,
                                      isInterruptible: Boolean = true)
   extends ShutdownableThread(name, isInterruptible) {
-  private val partitionMap = new mutable.HashMap[TopicAndPartition, Long] // a (topic, partition)
-> offset map
+  private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState]
// a (topic, partition) -> partitionFetchState map
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout,
socketBufferSize, clientId)
@@ -76,19 +76,24 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke
   }
 
   override def doWork() {
+
     inLock(partitionMapLock) {
-      if (partitionMap.isEmpty)
-        partitionMapCond.await(200L, TimeUnit.MILLISECONDS)
       partitionMap.foreach {
-        case((topicAndPartition, offset)) =>
-          fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
-                           offset, fetchSize)
+        case((topicAndPartition, partitionFetchState)) =>
+          if(partitionFetchState.isActive)
+            fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition,
+              partitionFetchState.offset, fetchSize)
       }
     }
 
     val fetchRequest = fetchRequestBuilder.build()
+
     if (!fetchRequest.requestInfo.isEmpty)
       processFetchRequest(fetchRequest)
+    else {
+      trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
+      partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
+    }
   }
 
   private def processFetchRequest(fetchRequest: FetchRequest) {
@@ -116,53 +121,53 @@ abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroke
         response.data.foreach {
           case(topicAndPartition, partitionData) =>
             val (topic, partitionId) = topicAndPartition.asTuple
-            val currentOffset = partitionMap.get(topicAndPartition)
-            // we append to the log if the current offset is defined and it is the same as
the offset requested during fetch
-            if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset
== currentOffset.get) {
-              partitionData.error match {
-                case ErrorMapping.NoError =>
-                  try {
-                    val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
-                    val validBytes = messages.validBytes
-                    val newOffset = messages.shallowIterator.toSeq.lastOption match {
-                      case Some(m: MessageAndOffset) => m.nextOffset
-                      case None => currentOffset.get
+            partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
+              // we append to the log if the current offset is defined and it is the same
as the offset requested during fetch
+              if (fetchRequest.requestInfo(topicAndPartition).offset == currentPartitionFetchState.offset)
{
+                partitionData.error match {
+                  case ErrorMapping.NoError =>
+                    try {
+                      val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
+                      val validBytes = messages.validBytes
+                      val newOffset = messages.shallowIterator.toSeq.lastOption match {
+                        case Some(m: MessageAndOffset) => m.nextOffset
+                        case None => currentPartitionFetchState.offset
+                      }
+                      partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
+                      fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw
- newOffset
+                      fetcherStats.byteRate.mark(validBytes)
+                      // Once we hand off the partition data to the subclass, we can't mess
with it any more in this thread
+                      processPartitionData(topicAndPartition, currentPartitionFetchState.offset,
partitionData)
+                    } catch {
+                      case ime: InvalidMessageException =>
+                        // we log the error and continue. This ensures two things
+                        // 1. If there is a corrupt message in a topic partition, it does
not bring the fetcher thread down and cause other topic partition to also lag
+                        // 2. If the message is corrupt due to a transient state in the log
(truncation, partial writes can cause this), we simply continue and
+                        // should get fixed in the subsequent fetches
+                        logger.error("Found invalid messages during fetch for partition ["
+ topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset  + " error "
+ ime.getMessage)
+                      case e: Throwable =>
+                        throw new KafkaException("error processing data for partition [%s,%d]
offset %d"
+                          .format(topic, partitionId, currentPartitionFetchState.offset),
e)
+                    }
+                  case ErrorMapping.OffsetOutOfRangeCode =>
+                    try {
+                      val newOffset = handleOffsetOutOfRange(topicAndPartition)
+                      partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
+                      error("Current offset %d for partition [%s,%d] out of range; reset
offset to %d"
+                        .format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
+                    } catch {
+                      case e: Throwable =>
+                        error("Error getting offset for partition [%s,%d] to broker %d".format(topic,
partitionId, sourceBroker.id), e)
+                        partitionsWithError += topicAndPartition
                     }
-                    partitionMap.put(topicAndPartition, newOffset)
-                    fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw
- newOffset
-                    fetcherStats.byteRate.mark(validBytes)
-                    // Once we hand off the partition data to the subclass, we can't mess
with it any more in this thread
-                    processPartitionData(topicAndPartition, currentOffset.get, partitionData)
-                  } catch {
-                    case ime: InvalidMessageException =>
-                      // we log the error and continue. This ensures two things
-                      // 1. If there is a corrupt message in a topic partition, it does not
bring the fetcher thread down and cause other topic partition to also lag
-                      // 2. If the message is corrupt due to a transient state in the log
(truncation, partial writes can cause this), we simply continue and
-                      //    should get fixed in the subsequent fetches
-                      logger.error("Found invalid messages during fetch for partition ["
+ topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage)
-                    case e: Throwable =>
-                      throw new KafkaException("error processing data for partition [%s,%d]
offset %d"
-                                               .format(topic, partitionId, currentOffset.get),
e)
-                  }
-                case ErrorMapping.OffsetOutOfRangeCode =>
-                  try {
-                    val newOffset = handleOffsetOutOfRange(topicAndPartition)
-                    partitionMap.put(topicAndPartition, newOffset)
-                    error("Current offset %d for partition [%s,%d] out of range; reset offset
to %d"
-                      .format(currentOffset.get, topic, partitionId, newOffset))
-                  } catch {
-                    case e: Throwable =>
-                      error("Error getting offset for partition [%s,%d] to broker %d".format(topic,
partitionId, sourceBroker.id), e)
+                  case _ =>
+                    if (isRunning.get) {
+                      error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId,
sourceBroker.id,
+                        ErrorMapping.exceptionFor(partitionData.error).getClass))
                       partitionsWithError += topicAndPartition
-                  }
-                case _ =>
-                  if (isRunning.get) {
-                    error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId,
sourceBroker.id,
-                      ErrorMapping.exceptionFor(partitionData.error).getClass))
-                    partitionsWithError += topicAndPartition
-                  }
-              }
-            }
+                    }
+                }
+            })
         }
       }
     }
@@ -181,7 +186,23 @@ abstract class AbstractFetcherThread(name: String, clientId: String,
sourceBroke
         if (!partitionMap.contains(topicAndPartition))
           partitionMap.put(
             topicAndPartition,
-            if (PartitionTopicInfo.isOffsetInvalid(offset)) handleOffsetOutOfRange(topicAndPartition)
else offset)
+            if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition))
+            else new PartitionFetchState(offset)
+          )}
+      partitionMapCond.signalAll()
+    } finally {
+      partitionMapLock.unlock()
+    }
+  }
+
+  def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) {
+    partitionMapLock.lockInterruptibly()
+    try {
+      for (partition <- partitions) {
+        partitionMap.get(partition).foreach (currentPartitionFetchState =>
+          if(currentPartitionFetchState.isActive)
+            partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset,
new DelayedItem(delay)))
+        )
       }
       partitionMapCond.signalAll()
     } finally {
@@ -248,3 +269,15 @@ class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup
{
 case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
   override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
 }
+
+/**
+  * case class to keep partition offset and its state(active , inactive)
+  */
+case class PartitionFetchState(offset: Long, delay: DelayedItem) {
+
+  def this(offset: Long) = this(offset, new DelayedItem(0))
+
+  def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 }
+
+  override def toString = "%d-%b".format(offset, isActive)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/79f7cca8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 2d84afa..b31b432 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -120,6 +120,6 @@ class ReplicaFetcherThread(name:String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
-    // no handler needed since the controller will make the changes accordingly
+    delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs)
   }
 }


Mime
View raw message