camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/5] git commit: CAMEL-6947: rabbitmq producer should start|stop more cleanly, such as making sure to call close and shutdown the thread pool.
Date Fri, 08 Nov 2013 14:51:52 GMT
CAMEL-6947: rabbitmq producer should start|stop more cleanly, such as making sure to call close
and shutdown the thread pool.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0b4f88c5
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0b4f88c5
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0b4f88c5

Branch: refs/heads/master
Commit: 0b4f88c565bf8dfb5014d90429ff9918c47f9ec5
Parents: a16b907
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri Nov 8 15:41:44 2013 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Nov 8 15:52:32 2013 +0100

----------------------------------------------------------------------
 .../component/rabbitmq/RabbitMQProducer.java    | 49 +++++++++++++++++---
 1 file changed, 42 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/0b4f88c5/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 3bebb3f..5bf4269 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -21,7 +21,7 @@ import java.math.BigDecimal;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
 
 import com.rabbitmq.client.AMQP;
 import com.rabbitmq.client.Channel;
@@ -32,13 +32,13 @@ import org.apache.camel.util.ObjectHelper;
 
 public class RabbitMQProducer extends DefaultProducer {
 
-    private final Connection conn;
-    private final Channel channel;
+    private int closeTimeout = 30 * 1000;
+    private Connection conn;
+    private Channel channel;
+    private ExecutorService executorService;
 
     public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException {
         super(endpoint);
-        this.conn = endpoint.connect(Executors.newSingleThreadExecutor());
-        this.channel = conn.createChannel();
     }
 
     @Override
@@ -46,8 +46,35 @@ public class RabbitMQProducer extends DefaultProducer {
         return (RabbitMQEndpoint) super.getEndpoint();
     }
 
-    public void shutdown() throws IOException {
-        conn.close();
+    @Override
+    protected void doStart() throws Exception {
+        this.executorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
"CamelRabbitMQProducer[" + getEndpoint().getQueue() + "]");
+
+        log.trace("Creating connection...");
+        this.conn = getEndpoint().connect(executorService);
+        log.debug("Created connection: {}", conn);
+
+        log.trace("Creating channel...");
+        this.channel = conn.createChannel();
+        log.debug("Created channel: {}", channel);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (channel != null) {
+            log.debug("Closing channel: {}", channel);
+            channel.close();
+            channel = null;
+        }
+        if (conn != null) {
+            log.debug("Closing connection: {} with timeout: {} ms.", conn, closeTimeout);
+            conn.close(closeTimeout);
+            conn = null;
+        }
+        if (executorService != null) {
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService);
+            executorService = null;
+        }
     }
 
     @Override
@@ -179,4 +206,12 @@ public class RabbitMQProducer extends DefaultProducer {
         }
         return null;
     }
+
+    public int getCloseTimeout() {
+        return closeTimeout;
+    }
+
+    public void setCloseTimeout(int closeTimeout) {
+        this.closeTimeout = closeTimeout;
+    }
 }


Mime
View raw message