camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [camel] branch camel-2.20.x updated: CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
Date Wed, 03 Jan 2018 11:24:32 GMT
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.20.x by this push:
     new 20be7d5  CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
20be7d5 is described below

commit 20be7d5de76cc301ea05a7e3e4ef7204cddfa522
Author: Claus Ibsen <claus.ibsen@gmail.com>
AuthorDate: Wed Jan 3 11:52:42 2018 +0100

    CAMEL-12110: camel-kafka consumer swallows exception if error creating KafkaConsumer.
---
 .../camel/component/kafka/KafkaConsumer.java       | 34 +++++++++++++++++-----
 1 file changed, 26 insertions(+), 8 deletions(-)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
index 52611fc..d9de379 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConsumer.java
@@ -101,6 +101,8 @@ public class KafkaConsumer extends DefaultConsumer {
         executor = endpoint.createExecutor();
         for (int i = 0; i < endpoint.getConfiguration().getConsumersCount(); i++) {
             KafkaFetchRecords task = new KafkaFetchRecords(endpoint.getConfiguration().getTopic(),
i + "", getProps());
+            // pre-initialize task during startup so if there is any error we have it thrown
asap
+            task.preInit();
             executor.submit(task);
             tasks.add(task);
         }
@@ -146,15 +148,14 @@ public class KafkaConsumer extends DefaultConsumer {
             boolean reConnect = true;
 
             while (reConnect) {
-
-                // create consumer
-                ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
                 try {
-                    // Kafka uses reflection for loading authentication settings, use its
classloader
-                    Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
-                    this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
-                } finally {
-                    Thread.currentThread().setContextClassLoader(threadClassLoader);
+                    if (!first) {
+                        // re-initialize on re-connect so we have a fresh consumer
+                        doInit();
+                    }
+                } catch (Throwable e) {
+                    // ensure this is logged so users can see the problem
+                    log.warn("Error creating org.apache.kafka.clients.consumer.KafkaConsumer
due " + e.getMessage(), e);
                 }
 
                 if (!first) {
@@ -175,6 +176,23 @@ public class KafkaConsumer extends DefaultConsumer {
             }
         }
 
+        void preInit() {
+            doInit();
+        }
+
+        protected void doInit() {
+            // create consumer
+            ClassLoader threadClassLoader = Thread.currentThread().getContextClassLoader();
+            try {
+                // Kafka uses reflection for loading authentication settings, use its classloader
+                Thread.currentThread().setContextClassLoader(org.apache.kafka.clients.consumer.KafkaConsumer.class.getClassLoader());
+                // this may throw an exception if something is wrong with kafka consumer
+                this.consumer = new org.apache.kafka.clients.consumer.KafkaConsumer(kafkaProps);
+            } finally {
+                Thread.currentThread().setContextClassLoader(threadClassLoader);
+            }
+        }
+
         @SuppressWarnings("unchecked")
         protected boolean doRun() {
             // allow to re-connect thread in case we use that to retry failed messages

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <commits@camel.apache.org>'].

Mime
View raw message