kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [39/43] git commit: KAFKA-890 The list of brokers for fetching metadata should be shuffled; reviewed by Joel Koshy
Date Mon, 08 Jul 2013 23:15:04 GMT
KAFKA-890 The list of brokers for fetching metadata should be shuffled; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d726e14e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d726e14e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d726e14e

Branch: refs/heads/trunk
Commit: d726e14eea2133cb8a1e52d20f6a17047f726e1c
Parents: 103aef8
Author: Neha Narkhede <neha.narkhede@gmail.com>
Authored: Tue Apr 30 12:42:15 2013 -0700
Committer: Neha Narkhede <neha.narkhede@gmail.com>
Committed: Tue Apr 30 12:42:15 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/client/ClientUtils.scala | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d726e14e/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 7b3f09d..025d3ab 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -7,6 +7,7 @@ import kafka.producer._
 import kafka.common.KafkaException
 import kafka.utils.{Utils, Logging}
 import java.util.Properties
+import util.Random
 
 /**
  * Helper functions common to clients (producer, consumer, or admin)
@@ -26,9 +27,12 @@ object ClientUtils extends Logging{
     val topicMetadataRequest = new TopicMetadataRequest(TopicMetadataRequest.CurrentVersion,
correlationId, producerConfig.clientId, topics.toSeq)
     var topicMetadataResponse: TopicMetadataResponse = null
     var t: Throwable = null
-    while(i < brokers.size && !fetchMetaDataSucceeded) {
-      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, brokers(i))
-      info("Fetching metadata with correlation id %d for %d topic(s) %s".format(correlationId,
topics.size, topics))
+    // shuffle the list of brokers before sending metadata requests so that most requests
don't get routed to the
+    // same broker
+    val shuffledBrokers = Random.shuffle(brokers)
+    while(i < shuffledBrokers.size && !fetchMetaDataSucceeded) {
+      val producer: SyncProducer = ProducerPool.createSyncProducer(producerConfig, shuffledBrokers(i))
+      info("Fetching metadata from broker %s with correlation id %d for %d topic(s) %s".format(shuffledBrokers(i),
correlationId, topics.size, topics))
       try {
         topicMetadataResponse = producer.send(topicMetadataRequest)
         fetchMetaDataSucceeded = true
@@ -36,7 +40,7 @@ object ClientUtils extends Logging{
       catch {
         case e =>
           warn("Fetching topic metadata with correlation id %d for topics [%s] from broker
[%s] failed"
-            .format(correlationId, topics, brokers(i).toString), e)
+            .format(correlationId, topics, shuffledBrokers(i).toString), e)
           t = e
       } finally {
         i = i + 1
@@ -44,7 +48,7 @@ object ClientUtils extends Logging{
       }
     }
     if(!fetchMetaDataSucceeded){
-      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s]
failed".format(topics, brokers), t)
+      throw new KafkaException("fetching topic metadata for topics [%s] from broker [%s]
failed".format(topics, shuffledBrokers), t)
     } else {
       debug("Successfully fetched metadata for %d topic(s) %s".format(topics.size, topics))
     }


Mime
View raw message