camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.
Date Tue, 24 May 2016 09:53:35 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x a863c8868 -> 968bac1d8
  refs/heads/master d518e543a -> 406b83ef6


CAMEL-9984: Rabbit MQ connection is not closed when channel has been closed by server.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/406b83ef
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/406b83ef
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/406b83ef

Branch: refs/heads/master
Commit: 406b83ef66215976149733ff31beaafe04d0af7c
Parents: d518e54
Author: Darrell King <Darrell.King@hermes-europe.co.uk>
Authored: Tue May 24 09:18:53 2016 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue May 24 11:52:08 2016 +0200

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitConsumer.java      |  6 +++--
 .../rabbitmq/RabbitMQConsumerTest.java          | 26 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index eeeafd6..21560f8 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -156,11 +156,13 @@ class RabbitConsumer implements com.rabbitmq.client.Consumer {
         if (channel == null) {
             return;
         }
-        if (tag != null) {
+        if (tag != null && isChannelOpen()) {
             channel.basicCancel(tag);
         }
         try {
-            channel.close();
+            if (isChannelOpen()) {
+                channel.close();
+            }
         } catch (TimeoutException e) {
             log.error("Timeout occured");
             throw e;

http://git-wip-us.apache.org/repos/asf/camel/blob/406b83ef/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
index ef6b096..da84477 100644
--- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
+++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQConsumerTest.java
@@ -20,9 +20,11 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
 
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 
+import com.rabbitmq.client.Consumer;
 import org.apache.camel.Processor;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -30,6 +32,9 @@ import org.mockito.Mockito;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
 
 public class RabbitMQConsumerTest {
 
@@ -69,4 +74,25 @@ public class RabbitMQConsumerTest {
 
         Mockito.verify(conn).close(30 * 1000);
     }
+
+    @Test
+    public void testStoppingConsumerShutdownConnectionWhenServerHasClosedChannel() throws
Exception {
+        AlreadyClosedException alreadyClosedException = Mockito.mock(AlreadyClosedException.class);
+
+        RabbitMQConsumer consumer = new RabbitMQConsumer(endpoint, processor);
+
+        Mockito.when(endpoint.createExecutor()).thenReturn(Executors.newFixedThreadPool(3));
+        Mockito.when(endpoint.getConcurrentConsumers()).thenReturn(1);
+        Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn);
+        Mockito.when(conn.createChannel()).thenReturn(channel);
+        Mockito.when(channel.basicConsume(anyString(), anyBoolean(), any(Consumer.class))).thenReturn("TAG");
+        Mockito.when(channel.isOpen()).thenReturn(false);
+        Mockito.doThrow(alreadyClosedException).when(channel).basicCancel("TAG");
+        Mockito.doThrow(alreadyClosedException).when(channel).close();
+
+        consumer.doStart();
+        consumer.doStop();
+
+        Mockito.verify(conn).close(30 * 1000);
+    }
 }


Mime
View raw message