kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jjko...@apache.org
Subject git commit: KAFKA-1648; Robin consumer balance throws an NPE when there are no topics
Date Thu, 09 Oct 2014 23:35:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a314461fa -> 58e58529b


KAFKA-1648; Robin consumer balance throws an NPE when there are no topics


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/58e58529
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/58e58529
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/58e58529

Branch: refs/heads/trunk
Commit: 58e58529b350a3da860b1f51fdfa356dfc42761f
Parents: a314461
Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
Authored: Thu Oct 9 16:34:40 2014 -0700
Committer: Joel Koshy <jjkoshy@gmail.com>
Committed: Thu Oct 9 16:34:40 2014 -0700

----------------------------------------------------------------------
 .../kafka/consumer/PartitionAssignor.scala      | 62 ++++++++++----------
 .../kafka/consumer/PartitionAssignorTest.scala  |  2 +-
 2 files changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/58e58529/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
index 8ea7368..e6ff768 100644
--- a/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
+++ b/core/src/main/scala/kafka/consumer/PartitionAssignor.scala
@@ -71,39 +71,41 @@ class RoundRobinAssignor() extends PartitionAssignor with Logging {
   def assign(ctx: AssignmentContext) = {
     val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()
 
-    // check conditions (a) and (b)
-    val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
-    ctx.consumersForTopic.foreach { case (topic, threadIds) =>
-      val threadIdSet = threadIds.toSet
-      require(threadIdSet == headThreadIdSet,
-              "Round-robin assignment is allowed only if all consumers in the group subscribe
to the same topics, " +
-              "AND if the stream counts across topics are identical for a given consumer
instance.\n" +
-              "Topic %s has the following available consumer streams: %s\n".format(topic,
threadIdSet) +
-              "Topic %s has the following available consumer streams: %s\n".format(headTopic,
headThreadIdSet))
-    }
+    if (ctx.consumersForTopic.size > 0) {
+      // check conditions (a) and (b)
+      val (headTopic, headThreadIdSet) = (ctx.consumersForTopic.head._1, ctx.consumersForTopic.head._2.toSet)
+      ctx.consumersForTopic.foreach { case (topic, threadIds) =>
+        val threadIdSet = threadIds.toSet
+        require(threadIdSet == headThreadIdSet,
+          "Round-robin assignment is allowed only if all consumers in the group subscribe
to the same topics, " +
+            "AND if the stream counts across topics are identical for a given consumer instance.\n"
+
+            "Topic %s has the following available consumer streams: %s\n".format(topic, threadIdSet)
+
+            "Topic %s has the following available consumer streams: %s\n".format(headTopic,
headThreadIdSet))
+      }
 
-    val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
+      val threadAssignor = Utils.circularIterator(headThreadIdSet.toSeq.sorted)
+
+      info("Starting round-robin assignment with consumers " + ctx.consumers)
+      val allTopicPartitions = ctx.partitionsForTopic.flatMap { case (topic, partitions)
=>
+        info("Consumer %s rebalancing the following partitions for topic %s: %s"
+          .format(ctx.consumerId, topic, partitions))
+        partitions.map(partition => {
+          TopicAndPartition(topic, partition)
+        })
+      }.toSeq.sortWith((topicPartition1, topicPartition2) => {
+        /*
+         * Randomize the order by taking the hashcode to reduce the likelihood of all partitions
of a given topic ending
+         * up on one consumer (if it has a high enough stream count).
+         */
+        topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
+      })
 
-    info("Starting round-robin assignment with consumers " + ctx.consumers)
-    val allTopicPartitions = ctx.partitionsForTopic.flatMap { case(topic, partitions) =>
-      info("Consumer %s rebalancing the following partitions for topic %s: %s"
-           .format(ctx.consumerId, topic, partitions))
-      partitions.map(partition => {
-        TopicAndPartition(topic, partition)
+      allTopicPartitions.foreach(topicPartition => {
+        val threadId = threadAssignor.next()
+        if (threadId.consumer == ctx.consumerId)
+          partitionOwnershipDecision += (topicPartition -> threadId)
       })
-    }.toSeq.sortWith((topicPartition1, topicPartition2) => {
-      /*
-       * Randomize the order by taking the hashcode to reduce the likelihood of all partitions
of a given topic ending
-       * up on one consumer (if it has a high enough stream count).
-       */
-      topicPartition1.toString.hashCode < topicPartition2.toString.hashCode
-    })
-
-    allTopicPartitions.foreach(topicPartition => {
-      val threadId = threadAssignor.next()
-      if (threadId.consumer == ctx.consumerId)
-        partitionOwnershipDecision += (topicPartition -> threadId)
-    })
+    }
 
     partitionOwnershipDecision
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/58e58529/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
index 9ceae22..24954de 100644
--- a/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/PartitionAssignorTest.scala
@@ -87,7 +87,7 @@ private object PartitionAssignorTest extends Logging {
   private val MaxConsumerCount = 10
   private val MaxStreamCount = 8
   private val MaxTopicCount = 100
-  private val MinTopicCount = 20
+  private val MinTopicCount = 0
   private val MaxPartitionCount = 120
   private val MinPartitionCount = 8
 


Mime
View raw message