Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id EE67217B63 for ; Wed, 18 Mar 2015 07:33:46 +0000 (UTC) Received: (qmail 85714 invoked by uid 500); 18 Mar 2015 07:33:46 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 85578 invoked by uid 500); 18 Mar 2015 07:33:46 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 85568 invoked by uid 99); 18 Mar 2015 07:33:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Mar 2015 07:33:46 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9F4ECE1828; Wed, 18 Mar 2015 07:33:46 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 18 Mar 2015 07:33:47 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] camel git commit: CAMEL-8503: camel-jms - Have replyTo options for concurrent consumers CAMEL-8503: camel-jms - Have replyTo options for concurrent consumers Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b3afcac2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b3afcac2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b3afcac2 Branch: refs/heads/master Commit: b3afcac209e1c18dac2bc64e9bc043d8c5e4a8ec Parents: ee04384 Author: Claus Ibsen Authored: Wed Mar 18 08:35:34 2015 +0100 Committer: Claus Ibsen Committed: Wed Mar 18 08:35:34 2015 +0100 ---------------------------------------------------------------------- .../camel/component/jms/JmsComponent.java | 8 +++++ .../camel/component/jms/JmsConfiguration.java | 36 ++++++++++++++++++++ .../apache/camel/component/jms/JmsEndpoint.java | 20 +++++++++++ .../component/jms/reply/QueueReplyManager.java | 6 ++-- .../jms/reply/TemporaryQueueReplyManager.java | 6 ++-- ...uestReplyTempQueueMultipleConsumersTest.java | 3 +- 6 files changed, 71 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java index ed2022c..070c44d 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java @@ -205,6 +205,10 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon getConfiguration().setConcurrentConsumers(concurrentConsumers); } + public void setReplyToConcurrentConsumers(int concurrentConsumers) { + getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers); + } + public void setConnectionFactory(ConnectionFactory connectionFactory) { getConfiguration().setConnectionFactory(connectionFactory); } @@ -257,6 +261,10 @@ public class JmsComponent extends UriEndpointComponent implements ApplicationCon getConfiguration().setMaxConcurrentConsumers(maxConcurrentConsumers); } + public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) { + getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers); + } + public void setMaxMessagesPerTask(int maxMessagesPerTask) { getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); } http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java index 65350d9..6229517 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java @@ -97,6 +97,8 @@ public class JmsConfiguration implements Cloneable { private boolean pubSubNoLocal; @UriParam(defaultValue = "1") private int concurrentConsumers = 1; + @UriParam(defaultValue = "1") + private int replyToConcurrentConsumers = 1; @UriParam(defaultValue = "-1") private int maxMessagesPerTask = -1; private int cacheLevel = -1; @@ -116,6 +118,8 @@ public class JmsConfiguration implements Cloneable { private int idleConsumerLimit = 1; @UriParam private int maxConcurrentConsumers; + @UriParam + private int replyToMaxConcurrentConsumers; // JmsTemplate only @UriParam(defaultValue = "false") private Boolean explicitQosEnabled; @@ -643,10 +647,26 @@ public class JmsConfiguration implements Cloneable { return concurrentConsumers; } + /** + * Specifies the default number of concurrent consumers when consuming from JMS (not for request/reply over JMS). + * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. + */ public void setConcurrentConsumers(int concurrentConsumers) { this.concurrentConsumers = concurrentConsumers; } + public int getReplyToConcurrentConsumers() { + return replyToConcurrentConsumers; + } + + /** + * Specifies the default number of concurrent consumers when doing request/reply over JMS. + * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. + */ + public void setReplyToConcurrentConsumers(int replyToConcurrentConsumers) { + this.replyToConcurrentConsumers = replyToConcurrentConsumers; + } + public int getMaxMessagesPerTask() { return maxMessagesPerTask; } @@ -734,10 +754,26 @@ public class JmsConfiguration implements Cloneable { return maxConcurrentConsumers; } + /** + * Specifies the maximum number of concurrent consumers when consuming from JMS (not for request/reply over JMS). + * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. + */ public void setMaxConcurrentConsumers(int maxConcurrentConsumers) { this.maxConcurrentConsumers = maxConcurrentConsumers; } + public int getReplyToMaxConcurrentConsumers() { + return replyToMaxConcurrentConsumers; + } + + /** + * Specifies the maximum number of concurrent consumers when using request/reply over JMS. + * See also the maxMessagesPerTask option to control dynamic scaling up/down of threads. + */ + public void setReplyToMaxConcurrentConsumers(int replyToMaxConcurrentConsumers) { + this.replyToMaxConcurrentConsumers = replyToMaxConcurrentConsumers; + } + public boolean isExplicitQosEnabled() { return explicitQosEnabled != null ? explicitQosEnabled : false; } http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java index 987d17f..d9571a6 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java @@ -542,6 +542,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy return getConfiguration().getConcurrentConsumers(); } + @ManagedAttribute + public int getReplyToConcurrentConsumers() { + return getConfiguration().getReplyToConcurrentConsumers(); + } + public ConnectionFactory getConnectionFactory() { return getConfiguration().getConnectionFactory(); } @@ -601,6 +606,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy } @ManagedAttribute + public int getReplyToMaxConcurrentConsumers() { + return getConfiguration().getReplyToMaxConcurrentConsumers(); + } + + @ManagedAttribute public int getMaxMessagesPerTask() { return getConfiguration().getMaxMessagesPerTask(); } @@ -822,6 +832,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy getConfiguration().setConcurrentConsumers(concurrentConsumers); } + @ManagedAttribute + public void setReplyToConcurrentConsumers(int concurrentConsumers) { + getConfiguration().setReplyToConcurrentConsumers(concurrentConsumers); + } + public void setConnectionFactory(ConnectionFactory connectionFactory) { getConfiguration().setConnectionFactory(connectionFactory); } @@ -897,6 +912,11 @@ public class JmsEndpoint extends DefaultEndpoint implements HeaderFilterStrategy } @ManagedAttribute + public void setReplyToMaxConcurrentConsumers(int maxConcurrentConsumers) { + getConfiguration().setReplyToMaxConcurrentConsumers(maxConcurrentConsumers); + } + + @ManagedAttribute public void setMaxMessagesPerTask(int maxMessagesPerTask) { getConfiguration().setMaxMessagesPerTask(maxMessagesPerTask); } http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java index ebb91c5..07ddfad 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/QueueReplyManager.java @@ -175,9 +175,9 @@ public class QueueReplyManager extends ReplyManagerSupport { answer.setMessageListener(this); answer.setPubSubDomain(false); answer.setSubscriptionDurable(false); - answer.setConcurrentConsumers(endpoint.getConcurrentConsumers()); - if (endpoint.getMaxConcurrentConsumers() > 0) { - answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers()); + answer.setConcurrentConsumers(endpoint.getReplyToConcurrentConsumers()); + if (endpoint.getReplyToMaxConcurrentConsumers() > 0) { + answer.setMaxConcurrentConsumers(endpoint.getReplyToMaxConcurrentConsumers()); } answer.setConnectionFactory(endpoint.getConnectionFactory()); String clientId = endpoint.getClientId(); http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java index 123b9cd..0e3d98b 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java @@ -110,9 +110,9 @@ public class TemporaryQueueReplyManager extends ReplyManagerSupport { answer.setMessageListener(this); answer.setPubSubDomain(false); answer.setSubscriptionDurable(false); - answer.setConcurrentConsumers(endpoint.getConcurrentConsumers()); - if (endpoint.getMaxConcurrentConsumers() > 0) { - answer.setMaxConcurrentConsumers(endpoint.getMaxConcurrentConsumers()); + answer.setConcurrentConsumers(endpoint.getReplyToConcurrentConsumers()); + if (endpoint.getReplyToMaxConcurrentConsumers() > 0) { + answer.setMaxConcurrentConsumers(endpoint.getReplyToMaxConcurrentConsumers()); } answer.setConnectionFactory(endpoint.getConnectionFactory()); // we use CACHE_CONSUMER by default to cling to the consumer as long as we can, since we can only consume http://git-wip-us.apache.org/repos/asf/camel/blob/b3afcac2/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java ---------------------------------------------------------------------- diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java index cb2a67a..decf610 100644 --- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java +++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsRequestReplyTempQueueMultipleConsumersTest.java @@ -37,7 +37,6 @@ import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknow /** * Reliability tests for JMS TempQueue Reply Manager with multiple consumers. - * @version */ public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupport { @@ -97,7 +96,7 @@ public class JmsRequestReplyTempQueueMultipleConsumersTest extends CamelTestSupp return new RouteBuilder() { @Override public void configure() throws Exception { - from("seda:start").inOut("jms:queue:foo?concurrentConsumers=10&maxConcurrentConsumers=20&recoveryInterval=10").process(new Processor() { + from("seda:start").inOut("jms:queue:foo?replyToConcurrentConsumers=10&replyToMaxConcurrentConsumers=20&recoveryInterval=10").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { String threadName = Thread.currentThread().getName();