camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r800792 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/impl/ test/java/org/apache/camel/impl/
Date Tue, 04 Aug 2009 13:52:27 GMT
Author: davsclaus
Date: Tue Aug  4 13:52:26 2009
New Revision: 800792

URL: http://svn.apache.org/viewvc?rev=800792&view=rev
Log:
CAMEL-1876: Added asyncCallback methods to ProducerTemplate to do async messaging with callbacks
being invoked when the message is done. This alows client to not hazzle with how to wait for
the response as the callback is invoked when the message is done.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=800792&r1=800791&r2=800792&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Tue Aug  4
13:52:26 2009
@@ -22,6 +22,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.camel.spi.Synchronization;
+
 /**
  * Template (named like Spring's TransactionTemplate & JmsTemplate
  * et al) for working with Camel and sending {@link Message} instances in an
@@ -905,4 +907,93 @@
      */
     <T> T extractFutureBody(Future future, long timeout, TimeUnit unit, Class<T>
type) throws TimeoutException;
 
+    // Asynchronous methods with callback
+    // -----------------------------------------------------------------------
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint.
+     *
+     * @param endpointUri   the endpoint URI to send the exchange to
+     * @param exchange      the exchange to send
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncCallback(String endpointUri, Exchange exchange, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint.
+     *
+     * @param endpoint      the endpoint to send the exchange to
+     * @param exchange      the exchange to send
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncCallback(Endpoint endpoint, Exchange exchange, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint using a supplied processor.
+     *
+     * @param endpointUri   the endpoint URI to send the exchange to
+     * @param processor     the transformer used to populate the new exchange
+     * {@link Processor} to populate the exchange
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncCallback(String endpointUri, Processor processor, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous exchange to the given endpoint using a supplied processor.
+     *
+     * @param endpoint      the endpoint to send the exchange to
+     * @param processor     the transformer used to populate the new exchange
+     * {@link Processor} to populate the exchange
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Exchange> asyncCallback(Endpoint endpoint, Processor processor, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOnly} message exchange pattern.
+     *
+     * @param endpointUri   the endpoint URI to send the exchange to
+     * @param body          the body to send
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncCallbackSendBody(String endpointUri, Object body, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOnly} message exchange pattern.
+     *
+     * @param endpoint      the endpoint to send the exchange to
+     * @param body          the body to send
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpointUri   the endpoint URI to send the exchange to
+     * @param body          the body to send
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncCallbackRequestBody(String endpointUri, Object body, Synchronization
onCompletion);
+
+    /**
+     * Sends an asynchronous body to the given endpoint.
+     * Uses an {@link ExchangePattern#InOut} message exchange pattern.
+     *
+     * @param endpoint      the endpoint to send the exchange to
+     * @param body          the body to send
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return a handle to be used to get the response in the future
+     */
+    Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body, Synchronization
onCompletion);
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=800792&r1=800791&r2=800792&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
Tue Aug  4 13:52:26 2009
@@ -31,6 +31,7 @@
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -397,6 +398,16 @@
         };
     }
 
+    protected Processor createSetBodyProcessorCallback(final Object body, final Synchronization
onCompletion) {
+        return new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody(body);
+                exchange.addOnCompletion(onCompletion);
+            }
+        };
+    }
+
     protected Endpoint resolveMandatoryEndpoint(String endpointUri) {
         Endpoint endpoint = context.getEndpoint(endpointUri);
         if (endpoint == null) {
@@ -478,6 +489,30 @@
         return ExchangeHelper.extractFutureBody(context, future, timeout, unit, type);
     }
 
+    public Future<Object> asyncCallbackSendBody(String uri, Object body, Synchronization
onCompletion) {
+        return asyncCallbackSendBody(resolveMandatoryEndpoint(uri), body, onCompletion);
+    }
+
+    public Future<Object> asyncCallbackSendBody(Endpoint endpoint, Object body, Synchronization
onCompletion) {
+        return asyncCallback(endpoint, ExchangePattern.InOnly, body, onCompletion);
+    }
+
+    public Future<Object> asyncCallbackRequestBody(String uri, Object body, Synchronization
onCompletion) {
+        return asyncCallbackRequestBody(resolveMandatoryEndpoint(uri), body, onCompletion);
+    }
+
+    public Future<Object> asyncCallbackRequestBody(Endpoint endpoint, Object body,
Synchronization onCompletion) {
+        return asyncCallback(endpoint, ExchangePattern.InOut, body, onCompletion);
+    }
+
+    public Future<Exchange> asyncCallback(String uri, Exchange exchange, Synchronization
onCompletion) {
+        return asyncCallback(resolveMandatoryEndpoint(uri), exchange, onCompletion);
+    }
+
+    public Future<Exchange> asyncCallback(String uri, Processor processor, Synchronization
onCompletion) {
+        return asyncCallback(resolveMandatoryEndpoint(uri), processor, onCompletion);
+    }
+
     public Future<Object> asyncRequestBody(final Endpoint endpoint, final Object body)
{
         Callable<Object> task = new Callable<Object>() {
             public Object call() throws Exception {
@@ -575,5 +610,43 @@
         return executor.submit(task);
     }
    
+    public Future<Object> asyncCallback(final Endpoint endpoint, final ExchangePattern
pattern, final Object body, final Synchronization onCompletion) {
+        Callable<Object> task = new Callable<Object>() {
+            public Object call() throws Exception {
+                Exchange exchange = send(endpoint, pattern, createSetBodyProcessorCallback(body,
onCompletion));
+
+                Object result = extractResultBody(exchange, pattern);
+                if (pattern.isOutCapable()) {
+                    return result;
+                } else {
+                    // return null if not OUT capable
+                    return null;
+                }
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Exchange> asyncCallback(final Endpoint endpoint, final Exchange exchange,
final Synchronization onCompletion) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                exchange.addOnCompletion(onCompletion);
+                return send(endpoint, exchange);
+            }
+        };
+
+        return executor.submit(task);
+    }
+
+    public Future<Exchange> asyncCallback(final Endpoint endpoint, final Processor
processor, final Synchronization onCompletion) {
+        Callable<Exchange> task = new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                return producerCache.send(endpoint, null, processor, onCompletion);
+            }
+        };
+
+        return executor.submit(task);
+    }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=800792&r1=800791&r2=800792&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Tue Aug
 4 13:52:26 2009
@@ -26,7 +26,9 @@
 import org.apache.camel.Producer;
 import org.apache.camel.ProducerCallback;
 import org.apache.camel.ServicePoolAware;
+import org.apache.camel.model.OnCompletionDefinition;
 import org.apache.camel.spi.ServicePool;
+import org.apache.camel.spi.Synchronization;
 import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
@@ -106,8 +108,23 @@
      * @return the exchange
      */
     public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor)
{
+        return send(endpoint, pattern, processor, null);
+    }
+
+    /**
+     * Sends an exchange to an endpoint using a supplied
+     * {@link Processor} to populate the exchange
+     *
+     * @param endpoint the endpoint to send the exchange to
+     * @param pattern the message {@link ExchangePattern} such as
+     *   {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut}
+     * @param processor the transformer used to populate the new exchange
+     * @param onCompletion  callback invoked when exchange has been completed
+     * @return the exchange
+     */
+    public Exchange send(Endpoint endpoint, ExchangePattern pattern, Processor processor,
Synchronization onCompletion) {
         try {
-            return sendExchange(endpoint, pattern, processor, null);
+            return sendExchange(endpoint, pattern, processor, null, onCompletion);
         } catch (Exception e) {
             throw wrapRuntimeCamelException(e);
         }
@@ -120,7 +137,6 @@
      * @param endpoint  the endpoint to send the exchange to
      * @param exchange  the exchange, can be <tt>null</tt> if so then create
a new exchange from the producer
      * @param pattern   the exchange pattern, can be <tt>null</tt>
-     * @param callback  the callback
      * @return the response from the callback
      * @throws Exception if an internal processing error has occurred.
      */
@@ -153,6 +169,11 @@
 
     protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
                                     final Processor processor, Exchange exchange) throws
Exception {
+        return sendExchange(endpoint, pattern, processor, exchange, null);
+    }
+
+    protected Exchange sendExchange(final Endpoint endpoint, ExchangePattern pattern,
+                                    final Processor processor, Exchange exchange, final Synchronization
onCompletion) throws Exception {
         return doInProducer(endpoint, exchange, pattern, new ProducerCallback<Exchange>()
{
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern
pattern) throws Exception {
                 if (exchange == null) {
@@ -164,6 +185,10 @@
                     processor.process(exchange);
                 }
 
+                if (onCompletion != null) {
+                    exchange.addOnCompletion(onCompletion);
+                }
+
                 // now lets dispatch
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(">>>> " + endpoint + " " + exchange);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java?rev=800792&r1=800791&r2=800792&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/DefaultProducerTemplateAsyncTest.java
Tue Aug  4 13:52:26 2009
@@ -18,13 +18,16 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.RuntimeCamelException;
@@ -37,6 +40,8 @@
  */
 public class DefaultProducerTemplateAsyncTest extends ContextTestSupport {
 
+    private static String order;
+
     public void testRequestAsync() throws Exception {
         Exchange exchange = new DefaultExchange(context);
         exchange.getIn().setBody("Hello");
@@ -272,6 +277,248 @@
         assertTrue("Should take longer than: " + delta, delta > 250);
     }
 
+    public void testAsyncCallbackExchangeInOnly() throws Exception {
+        order = "";
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Exchange exchange = context.getEndpoint("direct:start").createExchange();
+        exchange.getIn().setBody("Hello");
+
+        template.asyncCallback("direct:start", exchange, new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("Hello World", exchange.getIn().getBody());
+                latch.countDown();
+            }
+        });
+
+        order += "A";
+        latch.await(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertMockEndpointsSatisfied();
+        assertEquals("ABC", order);
+    }
+
+    public void testAsyncCallbackExchangeInOut() throws Exception {
+        order = "";
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        Exchange exchange = context.getEndpoint("direct:start").createExchange();
+        exchange.getIn().setBody("Hello");
+        exchange.setPattern(ExchangePattern.InOut);
+
+        template.asyncCallback("direct:echo", exchange, new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("HelloHello", exchange.getOut().getBody());
+                latch.countDown();
+            }
+        });
+
+        order += "A";
+        latch.await(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertEquals("ABC", order);
+    }
+
+    public void testAsyncCallbackExchangeInOnlyGetResult() throws Exception {
+        order = "";
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        Exchange exchange = context.getEndpoint("direct:start").createExchange();
+        exchange.getIn().setBody("Hello");
+
+        Future<Exchange> future = template.asyncCallback("direct:start", exchange,
new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("Hello World", exchange.getIn().getBody());
+            }
+        });
+
+        order += "A";
+        Exchange reply = future.get(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertMockEndpointsSatisfied();
+        assertEquals("ABC", order);
+        assertNotNull(reply);
+    }
+
+    public void testAsyncCallbackExchangeInOutGetResult() throws Exception {
+        order = "";
+
+        Exchange exchange = context.getEndpoint("direct:start").createExchange();
+        exchange.getIn().setBody("Hello");
+        exchange.setPattern(ExchangePattern.InOut);
+
+        Future<Exchange> future = template.asyncCallback("direct:echo", exchange, new
SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("HelloHello", exchange.getOut().getBody());
+            }
+        });
+
+        order += "A";
+        Exchange reply = future.get(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertEquals("ABC", order);
+        assertNotNull(reply);
+        assertEquals("HelloHello", reply.getOut().getBody());
+    }
+
+    public void testAsyncCallbackBodyInOnly() throws Exception {
+        order = "";
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        template.asyncCallbackSendBody("direct:start", "Hello", new SynchronizationAdapter()
{
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("Hello World", exchange.getIn().getBody());
+                latch.countDown();
+            }
+        });
+
+        order += "A";
+        latch.await(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertMockEndpointsSatisfied();
+        assertEquals("ABC", order);
+    }
+
+    public void testAsyncCallbackBodyInOut() throws Exception {
+        order = "";
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        template.asyncCallbackRequestBody("direct:echo", "Hello", new SynchronizationAdapter()
{
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("HelloHello", exchange.getOut().getBody());
+                latch.countDown();
+            }
+        });
+
+        order += "A";
+        latch.await(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertEquals("ABC", order);
+    }
+
+    public void testAsyncCallbackBodyInOnlyGetResult() throws Exception {
+        order = "";
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        Future<Object> future = template.asyncCallbackSendBody("direct:start", "Hello",
new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("Hello World", exchange.getIn().getBody());
+            }
+        });
+
+        order += "A";
+        Object reply = future.get(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertMockEndpointsSatisfied();
+        assertEquals("ABC", order);
+        // no reply when in only
+        assertEquals(null, reply);
+    }
+
+    public void testAsyncCallbackBodyInOutGetResult() throws Exception {
+        order = "";
+
+        Future<Object> future = template.asyncCallbackRequestBody("direct:echo", "Hello",
new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("HelloHello", exchange.getOut().getBody());
+            }
+        });
+
+        order += "A";
+        Object reply = future.get(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertEquals("ABC", order);
+        assertEquals("HelloHello", reply);
+    }
+
+    public void testAsyncCallbackInOnlyProcessor() throws Exception {
+        order = "";
+
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        template.asyncCallback("direct:start", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello");
+            }
+        }, new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("Hello World", exchange.getIn().getBody());
+                latch.countDown();
+            }
+        });
+
+        order += "A";
+        latch.await(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertMockEndpointsSatisfied();
+        assertEquals("ABC", order);
+    }
+
+    public void testAsyncCallbackInOutProcessor() throws Exception {
+        order = "";
+
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        template.asyncCallback("direct:echo", new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello");
+                exchange.setPattern(ExchangePattern.InOut);
+            }
+        }, new SynchronizationAdapter() {
+            @Override
+            public void onDone(Exchange exchange) {
+                order += "B";
+                assertEquals("HelloHello", exchange.getOut().getBody());
+                latch.countDown();
+            }
+        });
+
+        order += "A";
+        latch.await(10, TimeUnit.SECONDS);
+        order += "C";
+
+        assertEquals("ABC", order);
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {



Mime
View raw message