apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject [1/2] apex-malhar git commit: APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker
Date Sat, 27 Aug 2016 00:24:38 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 3f30b81a6 -> da3b4317f


APEXMALHAR-2134 Fix the NullPointerException, if the kafka partition has no leader broker


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/af425a5f
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/af425a5f
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/af425a5f

Branch: refs/heads/master
Commit: af425a5fd062741790eb5f7243c65d47e49a3721
Parents: 0a1adff
Author: chaitanya <chaithu@apache.org>
Authored: Fri Aug 26 00:51:43 2016 +0530
Committer: chaitanya <chaithu@apache.org>
Committed: Fri Aug 26 21:29:16 2016 +0530

----------------------------------------------------------------------
 .../java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/af425a5f/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index e10502b..fb89389 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -508,6 +508,10 @@ public class SimpleKafkaConsumer extends KafkaConsumer
               continue;
             }
             Broker b = pm.leader();
+            if (b == null) {
+              logger.info("No Leader broker for Kafka Partition {}. Skipping it for time
until new leader is elected", kp.getPartitionId());
+              continue;
+            }
             Broker oldB = partitionToBroker.put(kp, b);
             if (b.equals(oldB)) {
               continue;


Mime
View raw message