camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [3/5] git commit: CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there are still active listeners when a route is stopped.
Date Sat, 31 Aug 2013 10:05:29 GMT
CAMEL-6093: Fixed having 2+ routes from the same JMS queue, not stop the endpoint if there
are still active listeners when a route is stopped.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e21ec70
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e21ec70
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e21ec70

Branch: refs/heads/camel-2.12.x
Commit: 2e21ec7012d3ab35ab8c344de0366436e4918bc5
Parents: 16d8180
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sat Aug 31 11:29:25 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sat Aug 31 11:45:13 2013 +0200

----------------------------------------------------------------------
 .../apache/camel/component/jms/JmsConsumer.java | 11 ++++--
 .../apache/camel/component/jms/JmsEndpoint.java | 37 ++++++++++++++++----
 .../jms/reply/ReplyManagerSupport.java          | 12 +++++--
 .../jms/TwoConsumerOnSameQueueTest.java         |  1 -
 4 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
index 50b7833..7bf6ab5 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConsumer.java
@@ -133,7 +133,8 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService
{
         if (listenerContainer == null) {
             createMessageListenerContainer();
         }
-        
+        getEndpoint().onListenerContainerStarting(listenerContainer);
+
         if (getEndpoint().getConfiguration().isAsyncStartListener()) {
             getEndpoint().getAsyncStartStopExecutorService().submit(new Runnable() {
                 @Override
@@ -173,8 +174,12 @@ public class JmsConsumer extends DefaultConsumer implements SuspendableService
{
 
     protected void stopAndDestroyListenerContainer() {
         if (listenerContainer != null) {
-            listenerContainer.stop();
-            listenerContainer.destroy();
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                getEndpoint().onListenerConstainerStopped(listenerContainer);
+            }
         }
         // null container and listener so they are fully re created if this consumer is restarted
         // then we will use updated configuration from jms endpoint that may have been managed
using JMX

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index 701de7c..664da7c 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -18,7 +18,7 @@ package org.apache.camel.component.jms;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.ExceptionListener;
@@ -71,6 +71,7 @@ import org.springframework.util.ErrorHandler;
 @UriEndpoint(scheme = "jms", consumerClass = JmsConsumer.class)
 public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategyAware, MultipleConsumersSupport,
Service {
     protected final Logger log = LoggerFactory.getLogger(getClass());
+    private final AtomicInteger runningMessageListeners = new AtomicInteger();
     @UriParam
     private HeaderFilterStrategy headerFilterStrategy;
     private boolean pubSubDomain;
@@ -82,7 +83,6 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
     private String selector;
     @UriParam
     private JmsConfiguration configuration;
-    private final AtomicBoolean running = new AtomicBoolean();
 
     public JmsEndpoint() {
         this(null, null);
@@ -442,21 +442,39 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return getComponent().getAsyncStartStopExecutorService();
     }
 
+    public void onListenerContainerStarting(AbstractMessageListenerContainer container) {
+        runningMessageListeners.incrementAndGet();
+    }
+
+    public void onListenerConstainerStopped(AbstractMessageListenerContainer container) {
+        runningMessageListeners.decrementAndGet();
+    }
+
     /**
      * State whether this endpoint is running (eg started)
      */
     protected boolean isRunning() {
-        return running.get();
+        return isStarted();
     }
 
     @Override
-    protected void doStart() throws Exception {
-        running.set(true);
+    public void stop() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.stop();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot stop endpoint
{}", running, this);
+        }
     }
 
     @Override
-    protected void doStop() throws Exception {
-        running.set(false);
+    public void shutdown() throws Exception {
+        int running = runningMessageListeners.get();
+        if (running <= 0) {
+            super.shutdown();
+        } else {
+            log.trace("There are still {} running message listeners. Cannot shutdown endpoint
{}", running, this);
+        }
     }
 
     // Delegated properties from the configuration
@@ -1146,6 +1164,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy
         return status.name();
     }
 
+    @ManagedAttribute(description = "Number of running message listeners")
+    public int getRunningMessageListeners() {
+        return runningMessageListeners.get();
+    }
+
     // Implementation methods
     //-------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
index 3828926..173d9c8 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
@@ -224,6 +224,8 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements
Repl
         listenerContainer = createListenerContainer();
         listenerContainer.afterPropertiesSet();
         log.debug("Starting reply listener container on endpoint: {}", endpoint);
+
+        endpoint.onListenerContainerStarting(listenerContainer);
         listenerContainer.start();
     }
 
@@ -233,9 +235,13 @@ public abstract class ReplyManagerSupport extends ServiceSupport implements
Repl
 
         if (listenerContainer != null) {
             log.debug("Stopping reply listener container on endpoint: {}", endpoint);
-            listenerContainer.stop();
-            listenerContainer.destroy();
-            listenerContainer = null;
+            try {
+                listenerContainer.stop();
+                listenerContainer.destroy();
+            } finally {
+                endpoint.onListenerConstainerStopped(listenerContainer);
+                listenerContainer = null;
+            }
         }
 
         // must also stop executor service

http://git-wip-us.apache.org/repos/asf/camel/blob/2e21ec70/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
index 3cdfd9e..55f73db 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/TwoConsumerOnSameQueueTest.java
@@ -62,7 +62,6 @@ public class TwoConsumerOnSameQueueTest extends CamelTestSupport {
     }
 
     @Test
-    @Ignore
     public void testRemoveOneRoute() throws Exception {
         sendTwoMessagesWhichShouldReceivedOnBothEndpointsAndAssert();
 


Mime
View raw message