Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 68301 invoked from network); 4 Aug 2009 13:52:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Aug 2009 13:52:48 -0000 Received: (qmail 61506 invoked by uid 500); 4 Aug 2009 13:52:53 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 61458 invoked by uid 500); 4 Aug 2009 13:52:53 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 61449 invoked by uid 99); 4 Aug 2009 13:52:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Aug 2009 13:52:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Aug 2009 13:52:48 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 86CEF2388892; Tue, 4 Aug 2009 13:52:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r800792 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ test/java/org/apache/camel/impl/ Date: Tue, 04 Aug 2009 13:52:27 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090804135227.86CEF2388892@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Tue Aug 4 13:52:26 2009 New Revision: 800792 URL: http://svn.apache.org/viewvc?rev=800792&view=rev Log: CAMEL-1876: Added asyncCallback methods to ProducerTemplate to do async messaging with callbacks being invoked when the message is done. This alows client to not hazzle with how to wait for the response as the callback is invoked when the message is done. Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=800792&r1=800791&r2=800792&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Tue Aug 4 13:52:26 2009 @@ -22,6 +22,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.camel.spi.Synchronization; + /** * Template (named like Spring's TransactionTemplate & JmsTemplate * et al) for working with Camel and sending {@link Message} instances in an @@ -905,4 +907,93 @@ */ T extractFutureBody(Future future, long timeout, TimeUnit unit, Class type) throws TimeoutException; + // Asynchronous methods with callback + // ----------------------------------------------------------------------- + + /** + * Sends an asynchronous exchange to the given endpoint. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param exchange the exchange to send + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallback(String endpointUri, Exchange exchange, Synchronization onCompletion); + + /** + * Sends an asynchronous exchange to the given endpoint. + * + * @param endpoint the endpoint to send the exchange to + * @param exchange the exchange to send + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization onCompletion); + + /** + * Sends an asynchronous exchange to the given endpoint using a supplied processor. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param processor the transformer used to populate the new exchange + * {@link Processor} to populate the exchange + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallback(String endpointUri, Processor processor, Synchronization onCompletion); + + /** + * Sends an asynchronous exchange to the given endpoint using a supplied processor. + * + * @param endpoint the endpoint to send the exchange to + * @param processor the transformer used to populate the new exchange + * {@link Processor} to populate the exchange + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallback(Endpoint endpoint, Processor processor, Synchronization onCompletion); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOnly} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallbackSendBody(String endpointUri, Object body, Synchronization onCompletion); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOnly} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpointUri the endpoint URI to send the exchange to + * @param body the body to send + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallbackRequestBody(String endpointUri, Object body, Synchronization onCompletion); + + /** + * Sends an asynchronous body to the given endpoint. + * Uses an {@link ExchangePattern#InOut} message exchange pattern. + * + * @param endpoint the endpoint to send the exchange to + * @param body the body to send + * @param onCompletion callback invoked when exchange has been completed + * @return a handle to be used to get the response in the future + */ + Future asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion); + } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=800792&r1=800791&r2=800792&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Tue Aug 4 13:52:26 2009 @@ -31,6 +31,7 @@ import org.apache.camel.NoSuchEndpointException; import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; +import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; @@ -397,6 +398,16 @@ }; } + protected Processor createSetBodyProcessorCallback(final Object body, final Synchronization onCompletion) { + return new Processor() { + public void process(Exchange exchange) { + Message in = exchange.getIn(); + in.setBody(body); + exchange.addOnCompletion(onCompletion); + } + }; + } + protected Endpoint resolveMandatoryEndpoint(String endpointUri) { Endpoint endpoint = context.getEndpoint(endpointUri); if (endpoint == null) { @@ -478,6 +489,30 @@ return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type); } + public Future asyncCallbackSendBody(String uri, Object body, Synchronization onCompletion) { + return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion); + } + + public Future asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization onCompletion) { + return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion); + } + + public Future asyncCallbackRequestBody(String uri, Object body, Synchronization onCompletion) { + return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion); + } + + public Future asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization onCompletion) { + return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion); + } + + public Future asyncCallback(String uri, Exchange exchange, Synchronization onCompletion) { + return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion); + } + + public Future asyncCallback(String uri, Processor processor, Synchronization onCompletion) { + return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion); + } + public Future asyncRequestBody(final Endpoint endpoint, final Object body) { Callable task = new Callable() { public Object call() throws Exception { @@ -575,5 +610,43 @@ return executor.submit(task); } + public Future asyncCallback(final Endpoint endpoint, final ExchangePattern pattern, final Object body, final Synchronization onCompletion) { + Callable task = new Callable() { + public Object call() throws Exception { + Exchange exchange = send(endpoint, pattern, createSetBodyProcessorCallback(body, onCompletion)); + + Object result = extractResultBody(exchange, pattern); + if (pattern.isOutCapable()) { + return result; + } else { + // return null if not OUT capable + return null; + } + } + }; + + return executor.submit(task); + } + + public Future asyncCallback(final Endpoint endpoint, final Exchange exchange, final Synchronization onCompletion) { + Callable task = new Callable() { + public Exchange call() throws Exception { + exchange.addOnCompletion(onCompletion); + return send(endpoint, exchange); + } + }; + + return executor.submit(task); + } + + public Future asyncCallback(final Endpoint endpoint, final Processor processor, final Synchronization onCompletion) { + Callable task = new Callable() { + public Exchange call() throws Exception { + return producerCache.send(endpoint, null, processor, onCompletion); + } + }; + + return executor.submit(task); + } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=800792&r1=800791&r2=800792&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Tue Aug 4 13:52:26 2009 @@ -26,7 +26,9 @@ import org.apache.camel.Producer; import org.apache.camel.ProducerCallback; import org.apache.camel.ServicePoolAware; +import org.apache.camel.model.OnCompletionDefinition; import org.apache.camel.spi.ServicePool; +import org.apache.camel.spi.Synchronization; import org.apache.camel.util.LRUCache; import org.apache.camel.util.ServiceHelper; import org.apache.commons.logging.Log; @@ -106,8 +108,23 @@ * @return the exchange */ public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor) { + return send(endpoint, pattern, processor, null); + } + + /** + * Sends an exchange to an endpoint using a supplied + * {@link Processor} to populate the exchange + * + * @param endpoint the endpoint to send the exchange to + * @param pattern the message {@link ExchangePattern} such as + * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} + * @param processor the transformer used to populate the new exchange + * @param onCompletion callback invoked when exchange has been completed + * @return the exchange + */ + public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor, Synchronization onCompletion) { try { - return sendExchange(endpoint, pattern, processor, null); + return sendExchange(endpoint, pattern, processor, null, onCompletion); } catch (Exception e) { throw wrapRuntimeCamelException(e); } @@ -120,7 +137,6 @@ * @param endpoint the endpoint to send the exchange to * @param exchange the exchange, can be null if so then create a new exchange from the producer * @param pattern the exchange pattern, can be null - * @param callback the callback * @return the response from the callback * @throws Exception if an internal processing error has occurred. */ @@ -153,6 +169,11 @@ protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, final Processor processor, Exchange exchange) throws Exception { + return sendExchange(endpoint, pattern, processor, exchange, null); + } + + protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern, + final Processor processor, Exchange exchange, final Synchronization onCompletion) throws Exception { return doInProducer(endpoint, exchange, pattern, new ProducerCallback() { public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception { if (exchange == null) { @@ -164,6 +185,10 @@ processor.process(exchange); } + if (onCompletion != null) { + exchange.addOnCompletion(onCompletion); + } + // now lets dispatch if (LOG.isDebugEnabled()) { LOG.debug(">>>> " + endpoint + " " + exchange); Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=800792&r1=800791&r2=800792&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java Tue Aug 4 13:52:26 2009 @@ -18,13 +18,16 @@ import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.apache.camel.ContextTestSupport; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; import org.apache.camel.Processor; import org.apache.camel.Producer; import org.apache.camel.RuntimeCamelException; @@ -37,6 +40,8 @@ */ public class DefaultProducerTemplateAsyncTest extends ContextTestSupport { + private static String order; + public void testRequestAsync() throws Exception { Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("Hello"); @@ -272,6 +277,248 @@ assertTrue("Should take longer than: " + delta, delta > 250); } + public void testAsyncCallbackExchangeInOnly() throws Exception { + order = ""; + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + final CountDownLatch latch = new CountDownLatch(1); + + Exchange exchange = context.getEndpoint("direct:start").createExchange(); + exchange.getIn().setBody("Hello"); + + template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + latch.countDown(); + } + }); + + order += "A"; + latch.await(10, TimeUnit.SECONDS); + order += "C"; + + assertMockEndpointsSatisfied(); + assertEquals("ABC", order); + } + + public void testAsyncCallbackExchangeInOut() throws Exception { + order = ""; + + final CountDownLatch latch = new CountDownLatch(1); + + Exchange exchange = context.getEndpoint("direct:start").createExchange(); + exchange.getIn().setBody("Hello"); + exchange.setPattern(ExchangePattern.InOut); + + template.asyncCallback("direct:echo", exchange, new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("HelloHello", exchange.getOut().getBody()); + latch.countDown(); + } + }); + + order += "A"; + latch.await(10, TimeUnit.SECONDS); + order += "C"; + + assertEquals("ABC", order); + } + + public void testAsyncCallbackExchangeInOnlyGetResult() throws Exception { + order = ""; + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + Exchange exchange = context.getEndpoint("direct:start").createExchange(); + exchange.getIn().setBody("Hello"); + + Future future = template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + } + }); + + order += "A"; + Exchange reply = future.get(10, TimeUnit.SECONDS); + order += "C"; + + assertMockEndpointsSatisfied(); + assertEquals("ABC", order); + assertNotNull(reply); + } + + public void testAsyncCallbackExchangeInOutGetResult() throws Exception { + order = ""; + + Exchange exchange = context.getEndpoint("direct:start").createExchange(); + exchange.getIn().setBody("Hello"); + exchange.setPattern(ExchangePattern.InOut); + + Future future = template.asyncCallback("direct:echo", exchange, new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("HelloHello", exchange.getOut().getBody()); + } + }); + + order += "A"; + Exchange reply = future.get(10, TimeUnit.SECONDS); + order += "C"; + + assertEquals("ABC", order); + assertNotNull(reply); + assertEquals("HelloHello", reply.getOut().getBody()); + } + + public void testAsyncCallbackBodyInOnly() throws Exception { + order = ""; + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + final CountDownLatch latch = new CountDownLatch(1); + + template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + latch.countDown(); + } + }); + + order += "A"; + latch.await(10, TimeUnit.SECONDS); + order += "C"; + + assertMockEndpointsSatisfied(); + assertEquals("ABC", order); + } + + public void testAsyncCallbackBodyInOut() throws Exception { + order = ""; + + final CountDownLatch latch = new CountDownLatch(1); + + template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("HelloHello", exchange.getOut().getBody()); + latch.countDown(); + } + }); + + order += "A"; + latch.await(10, TimeUnit.SECONDS); + order += "C"; + + assertEquals("ABC", order); + } + + public void testAsyncCallbackBodyInOnlyGetResult() throws Exception { + order = ""; + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + Future future = template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + } + }); + + order += "A"; + Object reply = future.get(10, TimeUnit.SECONDS); + order += "C"; + + assertMockEndpointsSatisfied(); + assertEquals("ABC", order); + // no reply when in only + assertEquals(null, reply); + } + + public void testAsyncCallbackBodyInOutGetResult() throws Exception { + order = ""; + + Future future = template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("HelloHello", exchange.getOut().getBody()); + } + }); + + order += "A"; + Object reply = future.get(10, TimeUnit.SECONDS); + order += "C"; + + assertEquals("ABC", order); + assertEquals("HelloHello", reply); + } + + public void testAsyncCallbackInOnlyProcessor() throws Exception { + order = ""; + + getMockEndpoint("mock:result").expectedBodiesReceived("Hello World"); + + final CountDownLatch latch = new CountDownLatch(1); + + template.asyncCallback("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello"); + } + }, new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + latch.countDown(); + } + }); + + order += "A"; + latch.await(10, TimeUnit.SECONDS); + order += "C"; + + assertMockEndpointsSatisfied(); + assertEquals("ABC", order); + } + + public void testAsyncCallbackInOutProcessor() throws Exception { + order = ""; + + final CountDownLatch latch = new CountDownLatch(1); + + template.asyncCallback("direct:echo", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.getIn().setBody("Hello"); + exchange.setPattern(ExchangePattern.InOut); + } + }, new SynchronizationAdapter() { + @Override + public void onDone(Exchange exchange) { + order += "B"; + assertEquals("HelloHello", exchange.getOut().getBody()); + latch.countDown(); + } + }); + + order += "A"; + latch.await(10, TimeUnit.SECONDS); + order += "C"; + + assertEquals("ABC", order); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() {