kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nehanarkh...@apache.org
Subject git commit: KAFKA-1446 Consumer metrics for rebalance; reviewed by Neha Narkhede and Joel Koshy
Date Thu, 22 May 2014 18:32:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d0df33df3 -> 52f1149dd


KAFKA-1446 Consumer metrics for rebalance; reviewed by Neha Narkhede and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/52f1149d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/52f1149d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/52f1149d

Branch: refs/heads/trunk
Commit: 52f1149ddbb31ff33d1fd122bb9ffed937be14f6
Parents: d0df33d
Author: Sriharsha Chintalapani <schintalapani@hortonworks.com>
Authored: Thu May 22 11:32:18 2014 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Thu May 22 11:32:51 2014 -0700

----------------------------------------------------------------------
 .../consumer/ZookeeperConsumerConnector.scala   | 59 ++++++++++----------
 1 file changed, 31 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/52f1149d/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index c032d26..65f518d 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -106,6 +106,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   // useful for tracking migration of consumers to store offsets in kafka
   private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits",
TimeUnit.SECONDS)
   private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits",
TimeUnit.SECONDS)
+  private val rebalanceTimer = new KafkaTimer(newTimer(config.clientId + "-RebalanceRateAndTime",
TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 
   val consumerIdString = {
     var consumerUuid : String = null
@@ -575,35 +576,37 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
     def syncedRebalance() {
       rebalanceLock synchronized {
-        if(isShuttingDown.get())  {
-          return
-        } else {
-          for (i <- 0 until config.rebalanceMaxRetries) {
-            info("begin rebalancing consumer " + consumerIdString + " try #" + i)
-            var done = false
-            var cluster: Cluster = null
-            try {
-              cluster = getCluster(zkClient)
-              done = rebalance(cluster)
-            } catch {
-              case e: Throwable =>
-                /** occasionally, we may hit a ZK exception because the ZK state is changing
while we are iterating.
-                 * For example, a ZK node can disappear between the time we get all children
and the time we try to get
-                 * the value of a child. Just let this go since another rebalance will be
triggered.
-                 **/
-                info("exception during rebalance ", e)
-            }
-            info("end rebalancing consumer " + consumerIdString + " try #" + i)
-            if (done) {
-              return
-            } else {
-              /* Here the cache is at a risk of being stale. To take future rebalancing decisions
correctly, we should
-               * clear the cache */
-              info("Rebalancing attempt failed. Clearing the cache before the next rebalancing
operation is triggered")
+        rebalanceTimer.time {
+          if(isShuttingDown.get())  {
+            return
+          } else {
+            for (i <- 0 until config.rebalanceMaxRetries) {
+              info("begin rebalancing consumer " + consumerIdString + " try #" + i)
+              var done = false
+              var cluster: Cluster = null
+              try {
+                cluster = getCluster(zkClient)
+                done = rebalance(cluster)
+              } catch {
+                case e: Throwable =>
+                  /** occasionally, we may hit a ZK exception because the ZK state is changing
while we are iterating.
+                    * For example, a ZK node can disappear between the time we get all children
and the time we try to get
+                    * the value of a child. Just let this go since another rebalance will
be triggered.
+                    **/
+                  info("exception during rebalance ", e)
+              }
+              info("end rebalancing consumer " + consumerIdString + " try #" + i)
+              if (done) {
+                return
+              } else {
+                /* Here the cache is at a risk of being stale. To take future rebalancing
decisions correctly, we should
+                 * clear the cache */
+                info("Rebalancing attempt failed. Clearing the cache before the next rebalancing
operation is triggered")
+              }
+              // stop all fetchers and clear all the queues to avoid data duplication
+              closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q
=> q._2))
+              Thread.sleep(config.rebalanceBackoffMs)
             }
-            // stop all fetchers and clear all the queues to avoid data duplication
-            closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q
=> q._2))
-            Thread.sleep(config.rebalanceBackoffMs)
           }
         }
       }


Mime
View raw message