camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [camel] branch camel-2.20.x updated: CAMEL-12111: Fix reconnect if broker is down on startup. Also fix so channels share connections again. Also fix consumers getting started twice on reconnect at startup. Also fix null pointers if automaticRecoveryEnabled is not set on the endpoint.
Date Sat, 06 Jan 2018 12:03:43 GMT
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.20.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.20.x by this push:
     new 98cff9a  CAMEL-12111: Fix reconnect if broker is down on startup.  Also fix so channels
share connections again.  Also fix consumers getting started twice on reconnect at startup.
 Also fix null pointers if automaticRecoveryEnabled is not set on the endpoint.
98cff9a is described below

commit 98cff9aebda6145df51aa2d19ec588a7b2c7df04
Author: Jeremy Isikoff <jisikoff@yahoo.com>
AuthorDate: Thu Jan 4 09:42:37 2018 -0500

    CAMEL-12111: Fix reconnect if broker is down on startup.  Also fix so channels share connections
again.  Also fix consumers getting started twice on reconnect at startup.  Also fix null pointers
if automaticRecoveryEnabled is not set on the endpoint.
---
 .../camel/component/rabbitmq/RabbitConsumer.java   |  9 +++++++--
 .../camel/component/rabbitmq/RabbitMQConsumer.java | 22 ++++++++++++++--------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index e96367c..ffef62d 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -295,10 +295,10 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         if (isChannelOpen()) {
             // The connection is good, so nothing to do
             return;
-        } else if (!isChannelOpen() && this.consumer.getEndpoint().getAutomaticRecoveryEnabled())
{
+        } else if (channel != null && !channel.isOpen() && isAutomaticRecoveryEnabled())
{
             // Still need to wait for channel to re-open
             throw new IOException("Waiting for channel to re-open.");
-        } else if (!this.consumer.getEndpoint().getAutomaticRecoveryEnabled()) {
+        } else if (channel == null || !isAutomaticRecoveryEnabled()) {
             log.info("Attempting to open a new rabbitMQ channel");
             Connection conn = consumer.getConnection();
             channel = openChannel(conn);
@@ -307,6 +307,11 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         }
     }
 
+    private boolean isAutomaticRecoveryEnabled() {
+        return this.consumer.getEndpoint().getAutomaticRecoveryEnabled() != null
+            && this.consumer.getEndpoint().getAutomaticRecoveryEnabled();
+    }
+
     private boolean isChannelOpen() {
         return channel != null && channel.isOpen();
     }
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index 95a6609..c0aaa6d 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -74,7 +74,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable
{
         if (this.conn == null) {
             openConnection();
             return this.conn;
-        } else if (!this.conn.isOpen() && this.endpoint.getAutomaticRecoveryEnabled())
{
+        } else if (this.conn.isOpen() || (!this.conn.isOpen() && isAutomaticRecoveryEnabled()))
{
             return this.conn;
         } else {
             log.debug("The existing connection is closed");
@@ -83,16 +83,24 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable
{
         }
     }
 
-
+    private boolean isAutomaticRecoveryEnabled() {
+        return this.endpoint.getAutomaticRecoveryEnabled() != null
+            && this.endpoint.getAutomaticRecoveryEnabled();
+    }
     /**
-     * Add a consumer thread for given channel
+     * Create the consumers but don't start yet
      */
-    private void startConsumers() throws IOException {
-
+    private void createConsumers() throws IOException {
         // Create consumers but don't start yet
         for (int i = 0; i < endpoint.getConcurrentConsumers(); i++) {
             createConsumer();
         }
+    }
+
+    /**
+     * Start the consumers (already created)
+     */
+    private void startConsumers() {
 
         // Try starting consumers (which will fail if RabbitMQ can't connect)
         try {
@@ -160,6 +168,7 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable
{
     protected void doStart() throws Exception {
         executor = endpoint.createExecutor();
         log.debug("Using executor {}", executor);
+        createConsumers();
         startConsumers();
     }
 
@@ -211,9 +220,6 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable
{
                     Thread.sleep(connectionRetryInterval);
                 }
             }
-            if (!connectionFailed) {
-                startConsumers();
-            }
             stop();
             return null;
         }

-- 
To stop receiving notification emails like this one, please contact
['"commits@camel.apache.org" <commits@camel.apache.org>'].

Mime
View raw message