kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-3661; fix NPE in o.a.k.c.c.RoundRobinAssignor when topic metadata not found
Date Thu, 05 May 2016 22:25:26 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 940468011 -> a57d5125d


KAFKA-3661; fix NPE in o.a.k.c.c.RoundRobinAssignor when topic metadata not found

AbstractPartitionAssignor.assign has an ambiguous line in its documentation:
> param partitionsPerTopic The number of partitions for each subscribed topic (may be empty
for some topics)

Does empty mean the topic has an entry with value zero, or that the entry is excluded from
the map altogether? The current implementation in AbstractPartitionAssignor excludes the entry
from partitionsPerTopic if the topic isn't in the metadata.

RoundRobinAssignorTest.testOneConsumerNonexistentTopic interprets emptiness as providing the
topic with a zero value.
RangeAssignor interprets emptiness as excluding the entry from the map.
RangeAssignorTest.testOneConsumerNonexistentTopic interprets emptiness as providing the topic
with a zero value.

This implementation chooses to solve the NPE by deciding to exclude topics from partitionsPerTopic
when the topic is not in the metadata.

Author: Onur Karaman <okaraman@linkedin.com>

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>

Closes #1326 from onurkaraman/KAFKA-3661

(cherry picked from commit 8429db937e2134d9935d9dccd2ed0febc474fd66)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a57d5125
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a57d5125
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a57d5125

Branch: refs/heads/0.10.0
Commit: a57d5125d4a699c5fdaf05ef055dd7a0e7a6accd
Parents: 9404680
Author: Onur Karaman <okaraman@linkedin.com>
Authored: Thu May 5 23:25:03 2016 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Thu May 5 23:25:20 2016 +0100

----------------------------------------------------------------------
 .../apache/kafka/clients/consumer/RangeAssignor.java   | 10 +---------
 .../kafka/clients/consumer/RoundRobinAssignor.java     |  7 +++----
 .../consumer/internals/AbstractPartitionAssignor.java  | 13 ++++++++++---
 .../kafka/clients/consumer/RangeAssignorTest.java      |  2 --
 .../kafka/clients/consumer/RoundRobinAssignorTest.java |  2 --
 5 files changed, 14 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a57d5125/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
index f23151c..16c1d77 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RangeAssignor.java
@@ -45,14 +45,6 @@ public class RangeAssignor extends AbstractPartitionAssignor {
         return "range";
     }
 
-    private List<TopicPartition> partitions(String topic,
-                                            int numPartitions) {
-        List<TopicPartition> partitions = new ArrayList<>();
-        for (int i = 0; i < numPartitions; i++)
-            partitions.add(new TopicPartition(topic, i));
-        return partitions;
-    }
-
     private Map<String, List<String>> consumersPerTopic(Map<String, List<String>>
consumerMetadata) {
         Map<String, List<String>> res = new HashMap<>();
         for (Map.Entry<String, List<String>> subscriptionEntry : consumerMetadata.entrySet())
{
@@ -84,7 +76,7 @@ public class RangeAssignor extends AbstractPartitionAssignor {
             int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
             int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
 
-            List<TopicPartition> partitions = partitions(topic, numPartitionsForTopic);
+            List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic,
numPartitionsForTopic);
             for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
                 int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
                 int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition
? 0 : 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a57d5125/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
index b8dc253..a5de595 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RoundRobinAssignor.java
@@ -65,10 +65,9 @@ public class RoundRobinAssignor extends AbstractPartitionAssignor {
 
         List<TopicPartition> allPartitions = new ArrayList<>();
         for (String topic : topics) {
-            Integer partitions = partitionsPerTopic.get(topic);
-            for (int partition = 0; partition < partitions; partition++) {
-                allPartitions.add(new TopicPartition(topic, partition));
-            }
+            Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
+            if (numPartitionsForTopic != null)
+                allPartitions.addAll(AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic));
         }
         return allPartitions;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a57d5125/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
index 12fa913..4f90e66 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractPartitionAssignor.java
@@ -33,9 +33,10 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor
{
 
     /**
      * Perform the group assignment given the partition counts and member subscriptions
-     * @param partitionsPerTopic The number of partitions for each subscribed topic (may
be empty for some topics)
+     * @param partitionsPerTopic The number of partitions for each subscribed topic. Topics
not in metadata will be excluded
+     *                           from this map.
      * @param subscriptions Map from the memberId to their respective topic subscription
-     * @return Map from each member to the
+     * @return Map from each member to the list of partitions assigned to them.
      */
     public abstract Map<String, List<TopicPartition>> assign(Map<String, Integer>
partitionsPerTopic,
                                                              Map<String, List<String>>
subscriptions);
@@ -58,7 +59,7 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor
{
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
         for (String topic : allSubscribedTopics) {
             Integer numPartitions = metadata.partitionCountForTopic(topic);
-            if (numPartitions != null)
+            if (numPartitions != null && numPartitions > 0)
                 partitionsPerTopic.put(topic, numPartitions);
             else
                 log.debug("Skipping assignment for topic {} since no metadata is available",
topic);
@@ -87,4 +88,10 @@ public abstract class AbstractPartitionAssignor implements PartitionAssignor
{
         list.add(value);
     }
 
+    protected static List<TopicPartition> partitions(String topic, int numPartitions)
{
+        List<TopicPartition> partitions = new ArrayList<>(numPartitions);
+        for (int i = 0; i < numPartitions; i++)
+            partitions.add(new TopicPartition(topic, i));
+        return partitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a57d5125/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
index 13cce13..72febb0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RangeAssignorTest.java
@@ -53,8 +53,6 @@ public class RangeAssignorTest {
         String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        partitionsPerTopic.put(topic, 0);
-
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
                 Collections.singletonMap(consumerId, Arrays.asList(topic)));
         assertEquals(Collections.singleton(consumerId), assignment.keySet());

http://git-wip-us.apache.org/repos/asf/kafka/blob/a57d5125/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
index 31598cd..1d62700 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/RoundRobinAssignorTest.java
@@ -47,8 +47,6 @@ public class RoundRobinAssignorTest {
         String consumerId = "consumer";
 
         Map<String, Integer> partitionsPerTopic = new HashMap<>();
-        partitionsPerTopic.put(topic, 0);
-
         Map<String, List<TopicPartition>> assignment = assignor.assign(partitionsPerTopic,
                 Collections.singletonMap(consumerId, Arrays.asList(topic)));
 


Mime
View raw message