camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jans...@apache.org
Subject [1/3] camel git commit: CAMEL-9900 - provide option for MessageListenerContainer for reply managers to stop quicker when CamelContext is stopping
Date Thu, 21 Apr 2016 22:08:40 GMT
Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x e8bff3a0a -> 00cb4b89b
  refs/heads/master a4c43a6ca -> d403cc908


CAMEL-9900 - provide option for MessageListenerContainer for reply managers to stop quicker
when CamelContext is stopping


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d403cc90
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d403cc90
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d403cc90

Branch: refs/heads/master
Commit: d403cc9084123667e29398dad5a0eaa109521de3
Parents: a4c43a6
Author: Jonathan Anstey <janstey@gmail.com>
Authored: Thu Apr 21 19:21:48 2016 -0230
Committer: Jonathan Anstey <janstey@gmail.com>
Committed: Thu Apr 21 19:21:48 2016 -0230

----------------------------------------------------------------------
 components/camel-jms/src/main/docs/jms.adoc     | 10 +++++++--
 .../camel/component/jms/JmsComponent.java       | 12 ++++++++++-
 .../camel/component/jms/JmsConfiguration.java   | 22 +++++++++++++++++++-
 .../apache/camel/component/jms/JmsEndpoint.java | 10 +++++++++
 .../ExclusiveQueueMessageListenerContainer.java |  3 +--
 .../SharedQueueMessageListenerContainer.java    |  5 ++---
 .../jms/reply/TemporaryQueueReplyManager.java   |  3 +--
 .../camel/component/jms/JmsComponentTest.java   |  2 ++
 .../jms/JmsEndpointConfigurationTest.java       |  4 ++++
 9 files changed, 60 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/docs/jms.adoc
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/docs/jms.adoc b/components/camel-jms/src/main/docs/jms.adoc
index a930518..3745626 100644
--- a/components/camel-jms/src/main/docs/jms.adoc
+++ b/components/camel-jms/src/main/docs/jms.adoc
@@ -207,8 +207,9 @@ Component options
 +++++++++++++++++
 
 
+
 // component options: START
-The JMS component supports 68 options which are listed below.
+The JMS component supports 69 options which are listed below.
 
 
 
@@ -217,6 +218,7 @@ The JMS component supports 68 options which are listed below.
 | Name | Java Type | Description
 | configuration | JmsConfiguration | To use a shared JMS configuration
 | acceptMessagesWhileStopping | boolean | Specifies whether the consumer accept messages
while it is stopping. You may consider enabling this option if you start and stop JMS routes
at runtime while there are still messages enqued on the queue. If this option is false and
you stop the JMS route then messages may be rejected and the JMS broker would have to attempt
redeliveries which yet again may be rejected and eventually the message may be moved at a
dead letter queue on the JMS broker. To avoid this its recommended to enable this option.
+| allowReplyManagerQuickStop | boolean | Whether the DefaultMessageListenerContainer used
in the reply managers for request-reply messaging allow the DefaultMessageListenerContainer.runningAllowed
flag to quick stop in case JmsConfigurationisAcceptMessagesWhileStopping is enabled and org.apache.camel.CamelContext
is currently being stopped. This quick stop ability is enabled by default in the regular JMS
consumers but to enable for reply managers you must enable this flag.
 | acknowledgementMode | int | The JMS acknowledgement mode defined as an Integer. Allows
you to set vendor-specific extensions to the acknowledgment mode. For the regular modes it
is preferable to use the acknowledgementModeName instead.
 | eagerLoadingOfProperties | boolean | Enables eager loading of JMS properties as soon as
a message is loaded which generally is inefficient as the JMS properties may not be required
but sometimes can catch early any issues with the underlying JMS provider and the use of JMS
properties
 | acknowledgementModeName | String | The JMS acknowledgement name which is one of: SESSION_TRANSACTED
CLIENT_ACKNOWLEDGE AUTO_ACKNOWLEDGE DUPS_OK_ACKNOWLEDGE
@@ -287,13 +289,15 @@ The JMS component supports 68 options which are listed below.
 // component options: END
 
 
+
 [[JMS-Endpointoptions]]
 Endpoint options
 ++++++++++++++++
 
 
+
 // endpoint options: START
-The JMS component supports 75 endpoint options which are listed below:
+The JMS component supports 76 endpoint options which are listed below:
 
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
 |=======================================================================
@@ -316,6 +320,7 @@ The JMS component supports 75 endpoint options which are listed below:
 | replyToDeliveryPersistent | consumer | true | boolean | Specifies whether to use persistent
delivery by default for replies.
 | selector | consumer |  | String | Sets the JMS selector to use
 | acceptMessagesWhileStopping | consumer (advanced) | false | boolean | Specifies whether
the consumer accept messages while it is stopping. You may consider enabling this option if
you start and stop JMS routes at runtime while there are still messages enqued on the queue.
If this option is false and you stop the JMS route then messages may be rejected and the JMS
broker would have to attempt redeliveries which yet again may be rejected and eventually the
message may be moved at a dead letter queue on the JMS broker. To avoid this its recommended
to enable this option.
+| allowReplyManagerQuickStop | consumer (advanced) | false | boolean | Whether the DefaultMessageListenerContainer
used in the reply managers for request-reply messaging allow the DefaultMessageListenerContainer.runningAllowed
flag to quick stop in case link JmsConfigurationisAcceptMessagesWhileStopping() is enabled
and org.apache.camel.CamelContext is currently being stopped. This quick stop ability is enabled
by default in the regular JMS consumers but to enable for reply managers you must enable this
flag.
 | consumerType | consumer (advanced) | Default | ConsumerType | The consumer type to use
which can be one of: Simple Default or Custom. The consumer type determines which Spring JMS
listener to use. Default will use org.springframework.jms.listener.DefaultMessageListenerContainer
Simple will use org.springframework.jms.listener.SimpleMessageListenerContainer. When Custom
is specified the MessageListenerContainerFactory defined by the messageListenerContainerFactory
option will determine what org.springframework.jms.listener.AbstractMessageListenerContainer
to use.
 | defaultTaskExecutorType | consumer (advanced) |  | DefaultTaskExecutorType | Specifies
what default TaskExecutor type to use in the DefaultMessageListenerContainer for both consumer
endpoints and the ReplyTo consumer of producer endpoints. Possible values: SimpleAsync (uses
Spring's SimpleAsyncTaskExecutor) or ThreadPool (uses Spring's ThreadPoolTaskExecutor with
optimal values - cached threadpool-like). If not set it defaults to the previous behaviour
which uses a cached thread pool for consumer endpoints and SimpleAsync for reply consumers.
The use of ThreadPool is recommended to reduce thread trash in elastic configurations with
dynamically increasing and decreasing concurrent consumers.
 | eagerLoadingOfProperties | consumer (advanced) | false | boolean | Enables eager loading
of JMS properties as soon as a message is loaded which generally is inefficient as the JMS
properties may not be required but sometimes can catch early any issues with the underlying
JMS provider and the use of JMS properties
@@ -377,6 +382,7 @@ The JMS component supports 75 endpoint options which are listed below:
 // endpoint options: END
 
 
+
 [[JMS-MessageMappingbetweenJMSandCamel]]
 Message Mapping between JMS and Camel
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index 46ef75d..4ff90e9 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -172,7 +172,17 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon
      * may be moved at a dead letter queue on the JMS broker. To avoid this its recommended
to enable this option.
      */
     public void setAcceptMessagesWhileStopping(boolean acceptMessagesWhileStopping) {
-        getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
+        getConfiguration().setAcceptMessagesWhileStopping(acceptMessagesWhileStopping); 
 
+    }
+    
+    /**
+     * Whether the DefaultMessageListenerContainer used in the reply managers for request-reply
messaging allow 
+     * the DefaultMessageListenerContainer.runningAllowed flag to quick stop in case JmsConfiguration#isAcceptMessagesWhileStopping
+     * is enabled, and org.apache.camel.CamelContext is currently being stopped. This quick
stop ability is enabled by
+     * default in the regular JMS consumers but to enable for reply managers you must enable
this flag.
+      */
+    public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) {
+        getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index c9a7117..a4c57e2 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -98,12 +98,18 @@ public class JmsConfiguration implements Cloneable {
             description = "Specifies whether the consumer container should auto-startup.")
     private boolean autoStartup = true;
     @UriParam(label = "consumer,advanced",
+            description = "Whether the DefaultMessageListenerContainer used in the reply
managers for request-reply messaging allow "
+                    + " the DefaultMessageListenerContainer.runningAllowed flag to quick
stop in case JmsConfiguration#isAcceptMessagesWhileStopping"
+                    + " is enabled, and org.apache.camel.CamelContext is currently being
stopped. This quick stop ability is enabled by"
+                    + " default in the regular JMS consumers but to enable for reply managers
you must enable this flag.")
+    private boolean allowReplyManagerQuickStop;   
+    @UriParam(label = "consumer,advanced",
             description = "Specifies whether the consumer accept messages while it is stopping."
                     + " You may consider enabling this option, if you start and stop JMS
routes at runtime, while there are still messages"
                     + " enqued on the queue. If this option is false, and you stop the JMS
route, then messages may be rejected,"
                     + " and the JMS broker would have to attempt redeliveries, which yet
again may be rejected, and eventually the message"
                     + " may be moved at a dead letter queue on the JMS broker. To avoid this
its recommended to enable this option.")
-    private boolean acceptMessagesWhileStopping;
+    private boolean acceptMessagesWhileStopping;    
     @UriParam(description = "Sets the JMS client ID to use. Note that this value, if specified,
must be unique and can only be used by a single JMS connection instance."
             + " It is typically only required for durable topic subscriptions."
             + " If using Apache ActiveMQ you may prefer to use Virtual Topics instead.")
@@ -762,6 +768,20 @@ public class JmsConfiguration implements Cloneable {
         this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
     }
 
+    /**
+     * Whether the {@link DefaultMessageListenerContainer} used in the reply managers for
request-reply messaging allow 
+     * the {@link DefaultMessageListenerContainer.runningAllowed} flag to quick stop in case
{@link JmsConfiguration#isAcceptMessagesWhileStopping()} 
+     * is enabled, and {@link org.apache.camel.CamelContext} is currently being stopped.
This quick stop ability is enabled by
+     * default in the regular JMS consumers but to enable for reply managers you must enable
this flag.
+     */
+    public boolean isAllowReplyManagerQuickStop() {
+        return allowReplyManagerQuickStop;
+    }
+
+    public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) {
+        this.allowReplyManagerQuickStop = allowReplyManagerQuickStop;
+    }
+    
     public String getClientId() {
         return clientId;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
index c671e75..c6a3599 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
@@ -695,6 +695,11 @@ public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint,
Heade
     }
 
     @ManagedAttribute
+    public boolean isAllowReplyManagerQuickStop() {
+        return getConfiguration().isAllowReplyManagerQuickStop();
+    }
+    
+    @ManagedAttribute
     public boolean isAlwaysCopyMessage() {
         return getConfiguration().isAlwaysCopyMessage();
     }
@@ -792,6 +797,11 @@ public class JmsEndpoint extends DefaultEndpoint implements AsyncEndpoint,
Heade
     }
 
     @ManagedAttribute
+    public void setAllowReplyManagerQuickStop(boolean allowReplyManagerQuickStop) {
+        getConfiguration().setAllowReplyManagerQuickStop(allowReplyManagerQuickStop);
+    }
+    
+    @ManagedAttribute
     public void setAcknowledgementMode(int consumerAcknowledgementMode) {
         getConfiguration().setAcknowledgementMode(consumerAcknowledgementMode);
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
index 64b7bfe..9ca4cbc 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ExclusiveQueueMessageListenerContainer.java
@@ -37,7 +37,6 @@ public class ExclusiveQueueMessageListenerContainer extends DefaultJmsMessageLis
     // no need to override any methods currently
 
     public ExclusiveQueueMessageListenerContainer(JmsEndpoint endpoint) {
-        // request-reply listener container should not allow quick-stop so we can keep listening
for reply messages
-        super(endpoint, false);
+        super(endpoint, endpoint.isAllowReplyManagerQuickStop());
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
index 3b682e0..dd5954b 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/SharedQueueMessageListenerContainer.java
@@ -48,8 +48,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
      * @param fixedMessageSelector the fixed selector
      */
     public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, String fixedMessageSelector)
{
-        // request-reply listener container should not allow quick-stop so we can keep listening
for reply messages
-        super(endpoint, false);
+        super(endpoint, endpoint.isAllowReplyManagerQuickStop());
         this.fixedMessageSelector = fixedMessageSelector;
     }
 
@@ -60,7 +59,7 @@ public class SharedQueueMessageListenerContainer extends DefaultJmsMessageListen
      * @param creator the create to create the dynamic selector
      */
     public SharedQueueMessageListenerContainer(JmsEndpoint endpoint, MessageSelectorCreator
creator) {
-        super(endpoint, false);
+        super(endpoint, endpoint.isAllowReplyManagerQuickStop());
         this.creator = creator;
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
index 6d1f51d..42423a3 100644
--- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
+++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
@@ -96,8 +96,7 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport {
     @Override
     protected AbstractMessageListenerContainer createListenerContainer() throws Exception
{
         // Use DefaultMessageListenerContainer as it supports reconnects (see CAMEL-3193)
-        // request-reply listener container should not allow quick-stop so we can keep listening
for reply messages
-        DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint,
false);
+        DefaultMessageListenerContainer answer = new DefaultJmsMessageListenerContainer(endpoint,
endpoint.isAllowReplyManagerQuickStop());
 
         answer.setDestinationName("temporary");
         answer.setDestinationResolver(destResolver);

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
index cf7b8a9..cded429 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsComponentTest.java
@@ -38,6 +38,7 @@ public class JmsComponentTest extends CamelTestSupport {
         assertEquals("Bye World", reply);
 
         assertEquals(true, endpoint.isAcceptMessagesWhileStopping());
+        assertEquals(true, endpoint.isAllowReplyManagerQuickStop());
         assertEquals(true, endpoint.isAlwaysCopyMessage());
         assertEquals(1, endpoint.getAcknowledgementMode());
         assertEquals(true, endpoint.isAutoStartup());
@@ -65,6 +66,7 @@ public class JmsComponentTest extends CamelTestSupport {
         JmsComponent comp = jmsComponentAutoAcknowledge(connectionFactory);
 
         comp.setAcceptMessagesWhileStopping(true);
+        comp.setAllowReplyManagerQuickStop(true);
         comp.setAlwaysCopyMessage(true);
         comp.setAcknowledgementMode(1);
         comp.setAutoStartup(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/d403cc90/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
index b504e62..2b74366 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsEndpointConfigurationTest.java
@@ -320,6 +320,7 @@ public class JmsEndpointConfigurationTest extends CamelTestSupport {
         assertEquals("Foo", endpoint.getEndpointConfiguredDestinationName());
 
         assertFalse(endpoint.isAcceptMessagesWhileStopping());
+        assertFalse(endpoint.isAllowReplyManagerQuickStop());
         assertFalse(endpoint.isAlwaysCopyMessage());
         assertTrue(endpoint.isAllowNullBody());
         assertFalse(endpoint.isAsyncConsumer());
@@ -362,6 +363,9 @@ public class JmsEndpointConfigurationTest extends CamelTestSupport {
 
         endpoint.setAcceptMessagesWhileStopping(true);
         assertTrue(endpoint.isAcceptMessagesWhileStopping());
+        
+        endpoint.setAllowReplyManagerQuickStop(true);
+        assertTrue(endpoint.isAllowReplyManagerQuickStop());
 
         endpoint.setAcknowledgementMode(2);
         assertEquals(2, endpoint.getAcknowledgementMode());


Mime
View raw message