kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject svn commit: r1227417 - in /incubator/kafka/trunk/core/src/main/scala/kafka/consumer: ConsumerConfig.scala FetcherRunnable.scala ZookeeperConsumerConnector.scala
Date Thu, 05 Jan 2012 01:12:43 GMT
Author: junrao
Date: Thu Jan  5 01:12:43 2012
New Revision: 1227417

URL: http://svn.apache.org/viewvc?rev=1227417&view=rev
Log:
Make backoff time during consumer rebalance configurable; patched by Jun Rao; reviewed by
Neha Narkhede; KAFKA-234

Modified:
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
    incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala?rev=1227417&r1=1227416&r2=1227417&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ConsumerConfig.scala Thu Jan
 5 01:12:43 2012
@@ -26,7 +26,7 @@ object ConsumerConfig {
   val SocketBufferSize = 64*1024
   val FetchSize = 300 * 1024
   val MaxFetchSize = 10*FetchSize
-  val BackoffIncrementMs = 1000
+  val DefaultFetcherBackoffMs = 1000
   val AutoCommit = true
   val AutoCommitInterval = 10 * 1000
   val MaxQueuedChunks = 100
@@ -67,7 +67,7 @@ class ConsumerConfig(props: Properties) 
   
   /** to avoid repeatedly polling a broker node which has no new data
       we will backoff every time we get an empty set from the broker*/
-  val backoffIncrementMs: Long = Utils.getInt(props, "backoff.increment.ms", BackoffIncrementMs)
+  val fetcherBackoffMs: Long = Utils.getInt(props, "fetcher.backoff.ms", DefaultFetcherBackoffMs)
   
   /** if true, periodically commit to zookeeper the offset of messages already fetched by
the consumer */
   val autoCommit = Utils.getBoolean(props, "autocommit.enable", AutoCommit)
@@ -81,6 +81,9 @@ class ConsumerConfig(props: Properties) 
   /** max number of retries during rebalance */
   val maxRebalanceRetries = Utils.getInt(props, "rebalance.retries.max", MaxRebalanceRetries)
 
+  /** backoff time between retries during rebalance */
+  val rebalanceBackoffMs = Utils.getInt(props, "rebalance.backoff.ms", zkSyncTimeMs)
+
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala?rev=1227417&r1=1227416&r2=1227417&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala (original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/FetcherRunnable.scala Thu Jan
 5 01:12:43 2012
@@ -95,8 +95,8 @@ class FetcherRunnable(val name: String,
 
         trace("fetched bytes: " + read)
         if(read == 0) {
-          debug("backing off " + config.backoffIncrementMs + " ms")
-          Thread.sleep(config.backoffIncrementMs)
+          debug("backing off " + config.fetcherBackoffMs + " ms")
+          Thread.sleep(config.fetcherBackoffMs)
         }
       }
     }

Modified: incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1227417&r1=1227416&r2=1227417&view=diff
==============================================================================
--- incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
(original)
+++ incubator/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
Thu Jan  5 01:12:43 2012
@@ -442,7 +442,7 @@ private[kafka] class ZookeeperConsumerCo
           // release all partitions, reset state and retry
           releasePartitionOwnership()
           resetState()
-          Thread.sleep(config.zkSyncTimeMs)
+          Thread.sleep(config.rebalanceBackoffMs)
         }
       }
 



Mime
View raw message