camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method
Date Thu, 12 Jan 2017 09:51:59 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.18.x 70d4750fb -> 5a4f641b6
  refs/heads/master 312b57d14 -> d22d0ca06


CAMEL-10697 - Workaround KAFKA-1894 by calling the wakeup method


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d22d0ca0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d22d0ca0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d22d0ca0

Branch: refs/heads/master
Commit: d22d0ca0607e0013d4be1dd3385959ac208ae1b8
Parents: 312b57d
Author: Antoine DESSAIGNE <antoine.dessaigne@gmail.com>
Authored: Wed Jan 11 17:25:56 2017 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Thu Jan 12 10:47:54 2017 +0100

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaConsumer.java       | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d22d0ca0/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 31a4180..4362390 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -45,6 +46,8 @@ public class KafkaConsumer extends DefaultConsumer {
     private final KafkaEndpoint endpoint;
     private final Processor processor;
     private final Long pollTimeoutMs;
+    // This list helps working around the infinite loop of KAFKA-1894
+    private final List<KafkaFetchRecords> tasks = new ArrayList<>();
 
     public KafkaConsumer(KafkaEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -75,7 +78,9 @@ public class KafkaConsumer extends DefaultConsumer {
 
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
-            executor.submit(new KafkaFetchRecords(endpoint.getConfiguration().getTopic(),
i + "", getProps()));
+            KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(),
i + "", getProps());
+            executor.submit(task);
+            tasks.add(task);
         }
     }
 
@@ -89,7 +94,12 @@ public class KafkaConsumer extends DefaultConsumer {
             } else {
                 executor.shutdownNow();
             }
+            if (!executor.isTerminated()) {
+                tasks.forEach(KafkaFetchRecords::shutdown);
+                executor.shutdownNow();
+            }
         }
+        tasks.clear();
         executor = null;
 
         super.doStop();
@@ -195,6 +205,11 @@ public class KafkaConsumer extends DefaultConsumer {
                 IOHelper.close(consumer);
             }
         }
+
+        private void shutdown() {
+            // As advised in the KAFKA-1894 ticket, calling this wakeup method breaks the
infinite loop
+            consumer.wakeup();
+        }
     }
 
     protected String serializeOffsetKey(TopicPartition topicPartition) {


Mime
View raw message