Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56B7B41BE for ; Sun, 29 May 2011 11:48:23 +0000 (UTC) Received: (qmail 14560 invoked by uid 500); 29 May 2011 11:48:23 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 14532 invoked by uid 500); 29 May 2011 11:48:23 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 14524 invoked by uid 99); 29 May 2011 11:48:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 May 2011 11:48:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 May 2011 11:48:18 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 324682388A3D; Sun, 29 May 2011 11:47:57 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1128869 - in /camel/trunk/components/camel-jms/src: main/java/org/apache/camel/component/jms/ test/java/org/apache/camel/component/jms/ Date: Sun, 29 May 2011 11:47:57 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110529114757.324682388A3D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Sun May 29 11:47:56 2011 New Revision: 1128869 URL: http://svn.apache.org/viewvc?rev=1128869&view=rev Log: CAMEL-3781: Added option disableTimeToLive on JMS. Thanks to Keith for patch. Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyDisableTimeToLiveTest.java camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutDisableTimeToLiveTest.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=1128869&r1=1128868&r2=1128869&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java Sun May 29 11:47:56 2011 @@ -116,6 +116,8 @@ public class JmsConfiguration implements // if the message is a JmsMessage and mapJmsMessage=false, force the // producer to send the javax.jms.Message body to the next JMS destination private boolean forceSendOriginalMessage; + // to force disabling time to live (works in both in-only or in-out mode) + private boolean disableTimeToLive; public JmsConfiguration() { } @@ -271,10 +273,14 @@ public class JmsConfiguration implements if (answer instanceof JmsTemplate && requestTimeout > 0) { JmsTemplate jmsTemplate = (JmsTemplate) answer; jmsTemplate.setExplicitQosEnabled(true); - if (timeToLive < 0) { - // If TTL not specified, then default to - jmsTemplate.setTimeToLive(requestTimeout); + + // prefer to use timeToLive over requestTimeout if both specified + long ttl = timeToLive > 0 ? timeToLive : requestTimeout; + if (ttl > 0 && !isDisableTimeToLive()) { + // only use TTL if not disabled + jmsTemplate.setTimeToLive(ttl); } + jmsTemplate.setSessionTransacted(isTransactedInOut()); if (isTransactedInOut()) { jmsTemplate.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED); @@ -329,7 +335,8 @@ public class JmsConfiguration implements if (receiveTimeout >= 0) { template.setReceiveTimeout(receiveTimeout); } - if (timeToLive >= 0) { + // only set TTL if we have a positive value and it has not been disabled + if (timeToLive >= 0 && !isDisableTimeToLive()) { template.setTimeToLive(timeToLive); } @@ -1087,4 +1094,11 @@ public class JmsConfiguration implements return forceSendOriginalMessage; } + public boolean isDisableTimeToLive() { + return disableTimeToLive; + } + + public void setDisableTimeToLive(boolean disableTimeToLive) { + this.disableTimeToLive = disableTimeToLive; + } } Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=1128869&r1=1128868&r2=1128869&view=diff ============================================================================== --- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java (original) +++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java Sun May 29 11:47:56 2011 @@ -44,7 +44,6 @@ import org.apache.camel.component.jms.re import org.apache.camel.component.jms.reply.TemporaryQueueReplyManager; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.impl.ServiceSupport; import org.apache.camel.impl.SynchronousDelegateProducer; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.spi.HeaderFilterStrategyAware; @@ -940,6 +939,16 @@ public class JmsEndpoint extends Default configuration.setForceSendOriginalMessage(forceSendOriginalMessage); } + @ManagedAttribute + public boolean isDisableTimeToLive() { + return configuration.isDisableTimeToLive(); + } + + @ManagedAttribute + public void setDisableTimeToLive(boolean disableTimeToLive) { + configuration.setDisableTimeToLive(disableTimeToLive); + } + @ManagedAttribute(description = "Camel id") public String getCamelId() { return getCamelContext().getName(); Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyDisableTimeToLiveTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyDisableTimeToLiveTest.java?rev=1128869&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyDisableTimeToLiveTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOnlyDisableTimeToLiveTest.java Sun May 29 11:47:56 2011 @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.ConsumerTemplate; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * @version + */ +public class JmsInOnlyDisableTimeToLiveTest extends CamelTestSupport { + + private String urlTimeout = "activemq:queue.in?timeToLive=2000"; + private String urlTimeToLiveDisabled = "activemq:queue.in?timeToLive=2000&disableTimeToLive=true"; + + @Test + public void testInOnlyExpired() throws Exception { + MyCoolBean cool = new MyCoolBean(); + cool.setProducer(template); + cool.setConsumer(consumer); + + getMockEndpoint("mock:result").expectedBodiesReceived("World 1"); + + // setup a message that will timeout to prove the ttl is getting set + // and that the disableTimeToLive is defaulting to false + template.sendBody("direct:timeout", "World 1"); + + assertMockEndpointsSatisfied(); + + // wait after the msg has expired + Thread.sleep(2500); + + resetMocks(); + getMockEndpoint("mock:end").expectedMessageCount(0); + + cool.someBusinessLogic(); + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInOnlyDisabledTimeToLive() throws Exception { + MyCoolBean cool = new MyCoolBean(); + cool.setProducer(template); + cool.setConsumer(consumer); + + getMockEndpoint("mock:result").expectedBodiesReceived("World 2"); + + // send a message that sets the requestTimeout to 2 secs with a + // disableTimeToLive set to true, this should timeout + // but leave the message on the queue to be processed + // by the CoolBean + template.sendBody("direct:disable", "World 2"); + + assertMockEndpointsSatisfied(); + + // wait after the msg has expired + Thread.sleep(2500); + + resetMocks(); + getMockEndpoint("mock:end").expectedBodiesReceived("Hello World 2"); + + cool.someBusinessLogic(); + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:timeout") + .to(urlTimeout) + .to("mock:result"); + + from("direct:disable") + .to(urlTimeToLiveDisabled) + .to("mock:result"); + + from("activemq:queue.out") + .to("mock:end"); + } + }; + } + + public static class MyCoolBean { + private int count; + private ConsumerTemplate consumer; + private ProducerTemplate producer; + + public void setConsumer(ConsumerTemplate consumer) { + this.consumer = consumer; + } + + public void setProducer(ProducerTemplate producer) { + this.producer = producer; + } + + public void someBusinessLogic() { + // loop to empty queue + while (true) { + // receive the message from the queue, wait at most 2 sec + String msg = consumer.receiveBody("activemq:queue.in", 2000, String.class); + if (msg == null) { + // no more messages in queue + break; + } + + // do something with body + msg = "Hello " + msg; + + // send it to the next queue + producer.sendBodyAndHeader("activemq:queue.out", msg, "number", count++); + } + } + } + +} Added: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutDisableTimeToLiveTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutDisableTimeToLiveTest.java?rev=1128869&view=auto ============================================================================== --- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutDisableTimeToLiveTest.java (added) +++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsInOutDisableTimeToLiveTest.java Sun May 29 11:47:56 2011 @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jms; + +import javax.jms.ConnectionFactory; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ConsumerTemplate; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge; + +/** + * @version + */ +public class JmsInOutDisableTimeToLiveTest extends CamelTestSupport { + + private String urlTimeout = "activemq:queue.in?requestTimeout=2000"; + private String urlTimeToLiveDisabled = "activemq:queue.in?requestTimeout=2000&disableTimeToLive=true"; + + @Test + public void testInOutExpired() throws Exception { + MyCoolBean cool = new MyCoolBean(); + cool.setProducer(template); + cool.setConsumer(consumer); + + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:end").expectedMessageCount(0); + + // setup a message that will timeout to prove the ttl is getting set + // and that the disableTimeToLive is defaulting to false + try { + template.requestBody("direct:timeout", "World 1"); + fail("Should not get here, timeout expected"); + } catch (CamelExecutionException e) { + cool.someBusinessLogic(); + } + + assertMockEndpointsSatisfied(); + } + + @Test + public void testInOutDisableTimeToLive() throws Exception { + MyCoolBean cool = new MyCoolBean(); + cool.setProducer(template); + cool.setConsumer(consumer); + + getMockEndpoint("mock:result").expectedMessageCount(0); + getMockEndpoint("mock:end").expectedBodiesReceived("Hello World 2"); + + // send a message that sets the requestTimeout to 2 secs with a + // disableTimeToLive set to true, this should timeout + // but leave the message on the queue to be processed + // by the CoolBean + try { + template.requestBody("direct:disable", "World 2"); + fail("Should not get here, timeout expected"); + } catch (CamelExecutionException e) { + cool.someBusinessLogic(); + } + + assertMockEndpointsSatisfied(); + } + + protected CamelContext createCamelContext() throws Exception { + CamelContext camelContext = super.createCamelContext(); + ConnectionFactory connectionFactory = CamelJmsTestHelper.createConnectionFactory(); + camelContext.addComponent("activemq", jmsComponentAutoAcknowledge(connectionFactory)); + return camelContext; + } + + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + from("direct:timeout") + .to(urlTimeout) + .to("mock:result"); + + from("direct:disable") + .to(urlTimeToLiveDisabled) + .to("mock:result"); + + from("activemq:queue.out") + .to("mock:end"); + } + }; + } + + public static class MyCoolBean { + private int count; + private ConsumerTemplate consumer; + private ProducerTemplate producer; + + public void setConsumer(ConsumerTemplate consumer) { + this.consumer = consumer; + } + + public void setProducer(ProducerTemplate producer) { + this.producer = producer; + } + + public void someBusinessLogic() { + // loop to empty queue + while (true) { + // receive the message from the queue, wait at most 2 sec + String msg = consumer.receiveBody("activemq:queue.in", 2000, String.class); + if (msg == null) { + // no more messages in queue + break; + } + + // do something with body + msg = "Hello " + msg; + + // send it to the next queue + producer.sendBodyAndHeader("activemq:queue.out", msg, "number", count++); + } + } + } + +}