camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r880767 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
Date Mon, 16 Nov 2009 14:25:19 GMT
Author: davsclaus
Date: Mon Nov 16 14:25:19 2009
New Revision: 880767

URL: http://svn.apache.org/viewvc?rev=880767&view=rev
Log:
CAMEL-2178: Added support for simulating async request/reply in case producer does not nativly
support AsyncProcessor. The original thread will still see it as async request/reply and not
block.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java?rev=880767&r1=880766&r2=880767&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendAsyncProcessor.java
Mon Nov 16 14:25:19 2009
@@ -19,6 +19,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -46,6 +47,7 @@
     protected final Processor target;
     protected final BlockingQueue<Exchange> completedTasks = new LinkedBlockingQueue<Exchange>();
     protected ExecutorService executorService;
+    protected ExecutorService producerExecutorService;
     protected int poolSize = DEFAULT_THREADPOOL_SIZE;
     protected ExceptionHandler exceptionHandler;
 
@@ -84,8 +86,6 @@
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern
pattern) throws Exception {
                 exchange = configureExchange(exchange, pattern);
 
-                AsyncProcessor asyncProducer = exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class,
producer);
-
                 // pass in the callback that adds the exchange to the completed list of tasks
                 final AsyncCallback callback = new AsyncCallback() {
                     public void onTaskCompleted(Exchange exchange) {
@@ -93,8 +93,13 @@
                     }
                 };
 
-                // produce it async
-                asyncProducer.process(exchange, callback);
+                if (producer instanceof AsyncProcessor) {
+                    // producer is async capable so let it process it directly
+                    doAsyncProcess((AsyncProcessor) producer, exchange, callback);
+                } else {
+                    // producer is a regular processor so simulate async behaviour
+                    doSimulateAsyncProcess(producer, exchange, callback);
+                }
 
                 // and return the exchange
                 return exchange;
@@ -104,6 +109,55 @@
         return answer;
     }
 
+    /**
+     * The producer is already capable of async processing so let it process it directly.
+     *
+     * @param producer the async producer
+     * @param exchange the exchange
+     * @param callback the callback
+     *
+     * @throws Exception can be thrown in case of processing errors
+     */
+    protected void doAsyncProcess(AsyncProcessor producer, Exchange exchange, AsyncCallback
callback) throws Exception {
+        producer.process(exchange, callback);
+    }
+
+    /**
+     * The producer is <b>not</b> capable of async processing so lets simulate
this by transfering the task
+     * to another {@link ExecutorService} for async processing.
+     *
+     * @param producer the producer
+     * @param exchange the exchange
+     * @param callback the callback
+     *
+     * @throws Exception can be thrown in case of processing errors
+     */
+    protected void doSimulateAsyncProcess(final Processor producer, final Exchange exchange,
final AsyncCallback callback) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Producer "+ producer + " is not an instanceof AsyncProcessor"
+                + ". Will fallback to simulate async behavior by transferring task to a producer
thread pool for further processing.");
+        }
+
+        // let the producer thread pool handle the task of sending the request which then
will simulate the async
+        // behavior as the original thread is not blocking while we wait for the reply
+        getProducerExecutorService().submit(new Callable<Exchange>() {
+            public Exchange call() throws Exception {
+                // convert the async producer which just blocks until the task is complete
+                try {
+                    AsyncProcessor asyncProducer = exchange.getContext().getTypeConverter().convertTo(AsyncProcessor.class,
producer);
+                    asyncProducer.process(exchange, callback);
+                } catch (Exception e) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Caught exception while processing: " + exchange, e);
+                    }
+                    // set the exception on the exchange so Camel error handling can deal
with it
+                    exchange.setException(e);
+                }
+                return exchange;
+            }
+        });
+    }
+
     @Override
     public String toString() {
         return "sendAsyncTo(" + destination + (pattern != null ? " " + pattern : "") + "
-> " + target + ")";
@@ -111,15 +165,40 @@
 
     public ExecutorService getExecutorService() {
         if (executorService == null) {
-            executorService = createExecutorService();
+            executorService = createExecutorService("SendAsyncProcessor-Consumer");
         }
         return executorService;
     }
 
+    /**
+     * Sets the {@link java.util.concurrent.ExecutorService} to use for consuming replies.
+     *
+     * @param executorService the custom executor service
+     */
     public void setExecutorService(ExecutorService executorService) {
         this.executorService = executorService;
     }
 
+    public ExecutorService getProducerExecutorService() {
+        if (producerExecutorService == null) {
+            // use a cached pool for the producers which can grow/schrink itself
+            producerExecutorService = ExecutorServiceHelper.newCachedThreadPool("SendAsyncProcessor-Producer",
true);
+        }
+        return producerExecutorService;
+    }
+
+    /**
+     * Sets the {@link java.util.concurrent.ExecutorService} to use for simulating async
producers
+     * by transferring the {@link Exchange} to this {@link java.util.concurrent.ExecutorService}
for
+     * sending the request and block while waiting for the reply. However the original thread
+     * will not block and as such it all appears as real async request/reply mechanism.
+     *
+     * @param producerExecutorService the custom executor service for producers
+     */
+    public void setProducerExecutorService(ExecutorService producerExecutorService) {
+        this.producerExecutorService = producerExecutorService;
+    }
+
     public int getPoolSize() {
         return poolSize;
     }
@@ -156,7 +235,6 @@
         while (isRunAllowed()) {
             Exchange exchange;
             try {
-                // TODO: Wonder if we can use take instead of poll with timeout?
                 exchange = completedTasks.poll(1000, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 LOG.debug("Sleep interrupted, are we stopping? " + (isStopping() || isStopped()));
@@ -183,8 +261,8 @@
         }
     }
 
-    protected ExecutorService createExecutorService() {
-        return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "SendAsyncProcessor",
true);
+    protected ExecutorService createExecutorService(String name) {
+        return ExecutorServiceHelper.newScheduledThreadPool(poolSize, name, true);
     }
 
     protected void doStart() throws Exception {
@@ -198,6 +276,10 @@
     protected void doStop() throws Exception {
         super.doStop();
 
+        if (producerExecutorService != null) {
+            producerExecutorService.shutdownNow();
+            producerExecutorService = null;
+        }
         if (executorService != null) {
             executorService.shutdownNow();
             executorService = null;



Mime
View raw message