camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r770599 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/component/direct/ main/java/org/apache/camel/component/file/ main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/ main/j...
Date Fri, 01 May 2009 09:39:29 GMT
Author: davsclaus
Date: Fri May  1 09:39:28 2009
New Revision: 770599

URL: http://svn.apache.org/viewvc?rev=770599&view=rev
Log:
CAMEL-1572: Internal API cleanup of AsyncProcessor.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java Fri May  1 09:39:28 2009
@@ -19,17 +19,24 @@
 
 /**
  * The callback interface for an {@link AsyncProcessor} so that it can
- * notify you when an {@link Exchange} has completed. 
+ * notify you when an {@link Exchange} has completed.
+ *
+ * @deprecated a new async API is planned for Camel 2.0
  */
 public interface AsyncCallback {
     
     /**
      * This method is invoked once the Exchange is completed.  If an error 
      * occurred while processing the exchange, the exception field of the 
-     * {@link Exchange} being processed will hold the error. 
+     * {@link Exchange} being processed will hold the error.
+     * <p/>
+     * This callback reports back twice:
+     * - first time when the caller thread is done, that is the synchronously done.
+     * - second time when the asynchronously thread is done and thus the {@link Exchange} is really complete. 
      *  
-     * @param doneSynchronously set to true if the processing of the exchange was completed synchronously thread.
+     * @param doneSynchronously set to <tt>true</tt> if the processing of the exchange was completed in the
+     * synchronously thread. Is set to <tt>false</tt> when the asynchronously thread is complete.
      */
-    void done(boolean doneSynchronously);    
+    void done(boolean doneSynchronously);
     
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java Fri May  1 09:39:28 2009
@@ -26,6 +26,7 @@
  * method.
  * 
  * @version $Revision$
+ * @deprecated a new async API is planned for Camel 2.0
  */
 public interface AsyncProcessor extends Processor {
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/direct/DirectProducer.java Fri May  1 09:39:28 2009
@@ -16,12 +16,10 @@
  */
 package org.apache.camel.component.direct;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultConsumer;
 import org.apache.camel.impl.DefaultProducer;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -30,7 +28,7 @@
  *
  * @version $Revision$
  */
-public class DirectProducer extends DefaultProducer implements AsyncProcessor {
+public class DirectProducer extends DefaultProducer implements Processor {
     private static final transient Log LOG = LogFactory.getLog(DirectProducer.class);
     private DirectEndpoint endpoint;
 
@@ -49,25 +47,4 @@
         }
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        int size = endpoint.getConsumers().size();
-        if (size == 0) {
-            LOG.warn("No consumers available on endpoint: " + endpoint + " to process " + exchange);
-        } else if (size == 1) {
-            DefaultConsumer consumer = endpoint.getConsumers().get(0);
-            AsyncProcessor processor = AsyncProcessorTypeConverter.convert(consumer.getProcessor());
-            return processor.process(exchange, callback);
-        } else if (size > 1) {
-            // Too hard to do multiple async.. do it sync
-            try {
-                for (DefaultConsumer consumer : endpoint.getConsumers()) {
-                    consumer.getProcessor().process(exchange);
-                }
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-        }
-        callback.done(true);
-        return true;
-    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConsumer.java Fri May  1 09:39:28 2009
@@ -20,12 +20,10 @@
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.ScheduledPollConsumer;
-import org.apache.camel.processor.DeadLetterChannel;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -130,61 +128,67 @@
         try {
             final GenericFileProcessStrategy<T> processStrategy = endpoint.getGenericFileProcessStrategy();
 
-            if (processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile())) {
+            boolean begin = processStrategy.begin(operations, endpoint, exchange, exchange.getGenericFile());
+            if (!begin) {
+                log.warn(endpoint + " cannot process remote file: " + exchange.getGenericFile());
+                return;
+            }
 
-                // must use file from exchange as it can be updated due the
-                // preMoveNamePrefix/preMoveNamePostfix options
-                final GenericFile<T> target = exchange.getGenericFile();
-                // must use full name when downloading so we have the correct path
-                final String name = target.getAbsoluteFilePath();
-
-                // retrieve the file using the stream
-                if (log.isTraceEnabled()) {
-                    log.trace("Retreiving file: " + name + " from: " + endpoint);
-                }
+            // must use file from exchange as it can be updated due the
+            // preMoveNamePrefix/preMoveNamePostfix options
+            final GenericFile<T> target = exchange.getGenericFile();
+            // must use full name when downloading so we have the correct path
+            final String name = target.getAbsoluteFilePath();
+
+            // retrieve the file using the stream
+            if (log.isTraceEnabled()) {
+                log.trace("Retreiving file: " + name + " from: " + endpoint);
+            }
 
-                operations.retrieveFile(name, exchange);
+            operations.retrieveFile(name, exchange);
 
-                if (log.isTraceEnabled()) {
-                    log.trace("Retrieved file: " + name + " from: " + endpoint);
-                }
+            if (log.isTraceEnabled()) {
+                log.trace("Retrieved file: " + name + " from: " + endpoint);
+            }
 
-                if (log.isDebugEnabled()) {
-                    log.debug("About to process file: " + target + " using exchange: " + exchange);
-                }
-                // Use the async processor interface so that processing of
-                // the exchange can happen asynchronously
-                getAsyncProcessor().process(exchange, new AsyncCallback() {
-                    public void done(boolean sync) {
-                        final GenericFile<T> file = exchange.getGenericFile();
-                        boolean failed = exchange.isFailed();
-
-                        if (log.isDebugEnabled()) {
-                            log.debug("Done processing file: " + file + " using exchange: " + exchange);
-                        }
-
-                        boolean committed = false;
-                        try {
-                            if (!failed) {
-                                // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
-                                processStrategyCommit(processStrategy, exchange, file);
-                                committed = true;
-                            } else {
-                                if (exchange.getException() != null) {
-                                    // if the failure was an exception then handle it
-                                    handleException(exchange.getException());
-                                } 
-                            }
-                        } finally {
-                            if (!committed) {
-                                processStrategyRollback(processStrategy, exchange, file);
-                            }
-                        }
+            if (log.isDebugEnabled()) {
+                log.debug("About to process file: " + target + " using exchange: " + exchange);
+            }
+            // Use the async processor interface so that processing of
+            // the exchange can happen asynchronously
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            // after processing
+            final GenericFile<T> file = exchange.getGenericFile();
+            boolean failed = exchange.isFailed();
+
+            if (log.isDebugEnabled()) {
+                log.debug("Done processing file: " + file + " using exchange: " + exchange);
+            }
+
+            // commit or rollback
+            boolean committed = false;
+            try {
+                if (!failed) {
+                    // commit the file strategy if there was no failure or already handled by the DeadLetterChannel
+                    processStrategyCommit(processStrategy, exchange, file);
+                    committed = true;
+                } else {
+                    if (exchange.getException() != null) {
+                        // if the failure was an exception then handle it
+                        handleException(exchange.getException());
                     }
-                });
-            } else {
-                log.warn(endpoint + " cannot process remote file: " + exchange.getGenericFile());
+                }
+            } finally {
+                if (!committed) {
+                    processStrategyRollback(processStrategy, exchange, file);
+                }
             }
+
         } catch (Exception e) {
             handleException(e);
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileConverter.java Fri May  1 09:39:28 2009
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.file;
 
-import java.io.FileNotFoundException;
-
 import org.apache.camel.Converter;
 import org.apache.camel.Exchange;
 import org.apache.camel.FallbackConverter;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/CollectionProducer.java Fri May  1 09:39:28 2009
@@ -42,7 +42,9 @@
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        queue.add(exchange.copy());
+        Exchange copy = exchange.copy();
+        copy.setProperty("CamelAsyncCallback", callback);
+        queue.add(copy);
         callback.done(true);
         return true;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Fri May  1 09:39:28 2009
@@ -67,13 +67,18 @@
             if (exchange != null) {
                 if (isRunAllowed()) {
                     try {
-                        processor.process(exchange, new AsyncCallback() {
-                            public void done(boolean sync) {
-                            }
-                        });
+                        processor.process(exchange);
                     } catch (Exception e) {
                         LOG.error("Seda queue caught: " + e, e);
                     }
+
+                    // TODO: It ought to be UnitOfWork that did the callback notification but we are planning
+                    // to replace it with a brand new Async API so we leave it as is
+                    AsyncCallback callback = exchange.getProperty("CamelAsyncCallback", AsyncCallback.class);
+                    if (callback != null) {
+                        // seda consumer is async so invoke the async done on the callback if its provided
+                        callback.done(false);
+                    }
                 } else {
                     LOG.warn("This consumer is stopped during polling an exchange, so putting it back on the seda queue: " + exchange);
                     try {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Fri May  1 09:39:28 2009
@@ -379,11 +379,8 @@
                         answer = createEndpoint(uri);
                     }
 
-                    // If it's a singleton then auto register it.
-                    // TODO: Why not test for isSingleton?
                     if (answer != null) {
                         addService(answer);
-
                         endpoints.put(getEndpointKey(uri, answer), answer);
                         lifecycleStrategy.onEndpointAdd(answer);
                     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultConsumer.java Fri May  1 09:39:28 2009
@@ -56,7 +56,8 @@
     /**
      * Provides an {@link AsyncProcessor} interface to the configured
      * processor on the consumer.  If the processor does not implement
-     * the interface, it will be adapted so that it does.  
+     * the interface, it will be adapted so that it does.
+     * @deprecated
      */
     public AsyncProcessor getAsyncProcessor() {
         if (asyncProcessor == null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultRouteContext.java Fri May  1 09:39:28 2009
@@ -21,14 +21,12 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Intercept;
 import org.apache.camel.NoSuchEndpointException;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.FromDefinition;
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.model.RouteDefinition;
@@ -136,11 +134,8 @@
         if (!eventDrivenProcessors.isEmpty()) {
             Processor processor = Pipeline.newInstance(eventDrivenProcessors);
 
-            // lets create the async processor
-            final AsyncProcessor asyncProcessor = AsyncProcessorTypeConverter.convert(processor);
-
             // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
-            Processor unitOfWorkProcessor = new UnitOfWorkProcessor(asyncProcessor);
+            Processor unitOfWorkProcessor = new UnitOfWorkProcessor(processor);
 
             // and create the route that wraps the UoW
             Route edcr = new EventDrivenConsumerRoute(getEndpoint(), unitOfWorkProcessor);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Fri May  1 09:39:28 2009
@@ -20,7 +20,6 @@
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Service;
 import org.apache.camel.model.ProcessorDefinition;
@@ -38,7 +37,6 @@
 
     private String id;
     private List<Synchronization> synchronizations;
-    private List<AsyncCallback> asyncCallbacks;
     private List<ProcessorDefinition> routeList;
 
     public DefaultUnitOfWork() {
@@ -53,9 +51,6 @@
         if (synchronizations != null) {
             synchronizations.clear();
         }
-        if (asyncCallbacks != null) {
-            asyncCallbacks.clear();
-        }
         if (routeList != null) {
             routeList.clear();
         }
@@ -87,10 +82,6 @@
         }
     }
 
-    public boolean isSynchronous() {
-        return asyncCallbacks == null || asyncCallbacks.isEmpty();
-    }
-
     public String getId() {
         if (id == null) {
             id = DEFAULT_ID_GENERATOR.generateId();
@@ -116,27 +107,4 @@
         return Collections.unmodifiableList(routeList);
     }
 
-    /**
-     * Register some asynchronous processing step
-     */
-    /*
-    public synchronized AsyncCallback addAsyncStep() {
-        AsyncCallback answer = new AsyncCallback() {
-            public void done(boolean doneSynchronously) {
-                latch.countDown();
-            }
-        };
-        if (latch == null) {
-            latch = new CountDownLatch(1);
-        }
-        else {
-            // TODO increment latch!
-        }
-        if (asyncCallbacks == null) {
-            asyncCallbacks = new ArrayList<AsyncCallback>();
-        }
-        asyncCallbacks.add(answer);
-        return answer;
-    }
-    */
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Fri May  1 09:39:28 2009
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.management;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -30,19 +27,19 @@
  *
  * @version $Revision$
  */
-public class InstrumentationProcessor extends DelegateProcessor implements AsyncProcessor {
+public class InstrumentationProcessor extends DelegateProcessor {
 
     private static final transient Log LOG = LogFactory.getLog(InstrumentationProcessor.class);
     private PerformanceCounter counter;
     private String type;
 
+    public InstrumentationProcessor() {
+    }
+
     public InstrumentationProcessor(PerformanceCounter counter) {
         this.counter = counter;
     }
 
-    public InstrumentationProcessor() {
-    }
-    
     @Override
     public String toString() {
         return "Instrumention" + (type != null ? ":" + type : "") + "[" + processor + "]";
@@ -53,42 +50,26 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
+        if (processor != null) {
 
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        if (processor == null) {
-            // no processor so nothing to process, so return
-            callback.done(true);
-            return true;
-        }
-
-        final long startTime = System.nanoTime();
-
-        if (processor instanceof AsyncProcessor) {
-            return ((AsyncProcessor)processor).process(exchange, new AsyncCallback() {
-                public void done(boolean doneSynchronously) {
-                    if (counter != null) {
-                        // convert nanoseconds to milliseconds
-                        recordTime(exchange, (System.nanoTime() - startTime) / 1000000.0);
-                    }
-                    callback.done(doneSynchronously);
-                }
-            });
-        }
-
-        try {
-            processor.process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
-        }
+            // TODO: why not use millis instead of nano?
 
-        if (counter != null) {
-            // convert nanoseconds to milliseconds
-            recordTime(exchange, (System.nanoTime() - startTime) / 1000000.0);
+            long startTime = 0;
+            if (counter != null) {
+                startTime = System.nanoTime();
+            }
+
+            try {
+                processor.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            if (counter != null) {
+                // convert nanoseconds to milliseconds
+                recordTime(exchange, (System.nanoTime() - startTime) / 1000000.0);
+            }
         }
-        callback.done(true);
-        return true;
     }
 
     protected void recordTime(Exchange exchange, double duration) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ThreadDefinition.java Fri May  1 09:39:28 2009
@@ -116,8 +116,7 @@
         thread.setTaskQueue(taskQueue);
         thread.setThreadGroup(threadGroup);
 
-        // TODO: see if we can avoid creating so many nested pipelines
-        ArrayList<Processor> pipe = new ArrayList<Processor>(2);
+        List<Processor> pipe = new ArrayList<Processor>(2);
         pipe.add(thread);
         pipe.add(createOutputsProcessor(routeContext, outputs));
         return new Pipeline(pipe);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/loadbalancer/LoadBalancerDefinition.java Fri May  1 09:39:28 2009
@@ -17,13 +17,11 @@
 package org.apache.camel.model.loadbalancer;
 
 import java.util.List;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.model.IdentifiedType;
@@ -131,18 +129,4 @@
         loadBalancer.process(exchange);
     }
 
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        ObjectHelper.notNull(loadBalancer, "loadBalancer");
-        
-        return loadBalancer.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                // Only handle the async case...
-                if (!sync) {
-                    callback.done(sync);
-                }
-            }
-        });                
-    
-    }
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri May  1 09:39:28 2009
@@ -16,21 +16,15 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.RejectedExecutionException;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.LoggingLevel;
 import org.apache.camel.Message;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -43,16 +37,20 @@
  *
  * @version $Revision$
  */
-public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
+public class DeadLetterChannel extends ErrorHandlerSupport implements Processor {
 
-    // TODO: The code in this class will be much easier when the AsyncProcessor is removed
+    // TODO: Introduce option to allow async redelivery, eg to not block thread while delay
+    // (eg the Timer task code). However we should consider using Channels that has internal
+    // producer/consumer queues with "delayed" support so a redelivery is just to move an
+    // exchange to this channel with the computed delay time
+    // we need to provide option so end users can deside if they would like to spawn an async thread
+    // or not. Also consider MEP as InOut does not work with async then as the original caller thread
+    // is expecting a reply in the sync thread.
 
     // we can use a single shared static timer for async redeliveries
-    private static final Timer REDELIVER_TIMER = new Timer("Camel DeadLetterChannel Redeliver Timer", true);
     private final Processor deadLetter;
     private final String deadLetterUri;
     private final Processor output;
-    private final AsyncProcessor outputAsync;
     private final Processor redeliveryProcessor;
     private RedeliveryPolicy redeliveryPolicy;
     private Logger logger;
@@ -70,40 +68,6 @@
         Processor onRedeliveryProcessor = redeliveryProcessor;
     }
     
-    private class RedeliverTimerTask extends TimerTask {
-        private final Exchange exchange;
-        private final AsyncCallback callback;
-        private final RedeliveryData data;
-        
-        public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, RedeliveryData data) {
-            this.exchange = exchange;
-            this.callback = callback;
-            this.data = data;
-        }
-
-        @Override
-        public void run() {
-            //only handle the real AsyncProcess the exchange 
-            outputAsync.process(exchange, new AsyncCallback() {
-                public void done(boolean sync) {
-                    // Only handle the async case...
-                    if (sync) {
-                        return;
-                    }
-                    data.sync = false;
-                    // only process if the exchange hasn't failed
-                    // and it has not been handled by the error processor
-                    if (exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
-                        // deliver to async to process it
-                        asyncProcess(exchange, callback, data);
-                    } else {
-                        callback.done(sync);
-                    }
-                }
-            });                
-        } 
-    }
-
     /**
      * Creates the dead letter channel.
      *
@@ -121,7 +85,6 @@
         this.deadLetter = deadLetter;
         this.deadLetterUri = deadLetterUri;
         this.redeliveryProcessor = redeliveryProcessor;
-        this.outputAsync = AsyncProcessorTypeConverter.convert(output);
         this.redeliveryPolicy = redeliveryPolicy;
         this.logger = logger;
         setExceptionPolicy(exceptionPolicyStrategy);
@@ -137,17 +100,13 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        return processErrorHandler(exchange, callback, new RedeliveryData());
+        processErrorHandler(exchange, new RedeliveryData());
     }
 
     /**
      * Processes the exchange decorated with this dead letter channel.
      */
-    protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
+    protected void processErrorHandler(final Exchange exchange, final RedeliveryData data) {
 
         while (true) {
             // we can't keep retrying if the route is being shutdown.
@@ -158,8 +117,6 @@
                 if (exchange.getException() == null) {
                     exchange.setException(new RejectedExecutionException());
                 }
-                callback.done(data.sync);
-                return data.sync;
             }
 
             // do not handle transacted exchanges that failed as this error handler does not support it
@@ -168,7 +125,7 @@
                     log.debug("This error handler does not support transacted exchanges."
                         + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
                 }
-                return data.sync;
+                return;
             }
 
             // did previous processing caused an exception?
@@ -179,7 +136,9 @@
             // compute if we should redeliver or not
             boolean shouldRedeliver = shouldRedeliver(exchange, data);
             if (!shouldRedeliver) {
-                return deliverToFaultProcessor(exchange, callback, data);
+                deliverToFaultProcessor(exchange, data);
+                // we should not try redeliver so we are finished
+                return;
             }
 
             // if we are redelivering then sleep before trying again
@@ -196,84 +155,25 @@
                 }
 
                 // letting onRedeliver be executed
-                deliverToRedeliveryProcessor(exchange, callback, data);
+                deliverToRedeliveryProcessor(exchange, data);
             }
 
             // process the exchange
-            boolean sync = outputAsync.process(exchange, new AsyncCallback() {
-                public void done(boolean sync) {
-                    // Only handle the async case...
-                    if (sync) {
-                        return;
-                    }
-                    data.sync = false;
-                    // only process if the exchange hasn't failed
-                    // and it has not been handled by the error processor
-                    if (exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
-                        //TODO Call the Timer for the asyncProcessor
-                        asyncProcess(exchange, callback, data);
-                    } else {
-                        callback.done(sync);
-                    }
-                }
-            });
-            if (!sync) {
-                // It is going to be processed async..
-                return false;
-            }
-            if (exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange)) {
-                // If everything went well.. then we exit here..
-                callback.done(true);
-                return true;
-            }
-            // error occurred so loop back around.....
-        }
-
-    }
-
-    protected void asyncProcess(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
-        // set the timer here
-        if (!isRunAllowed()) {
-            if (exchange.getException() == null) {
-                exchange.setException(new RejectedExecutionException());
+            try {
+                output.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
             }
-            callback.done(data.sync);
-            return;
-        }
 
-        // do not handle transacted exchanges that failed as this error handler does not support it
-        if (exchange.isTransacted() && !supportTransacted() && exchange.getException() != null) {
-            if (log.isDebugEnabled()) {
-                log.debug("This error handler does not support transacted exchanges."
-                    + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
+            // only process if the exchange hasn't failed
+            // and it has not been handled by the error processor
+            boolean done = exchange.getException() == null || ExchangeHelper.isFailureHandled(exchange);
+            if (done) {
+                return;
             }
-            return;
-        }
-
-        // did previous processing caused an exception?
-        if (exchange.getException() != null) {
-            handleException(exchange, data);
-        }
-        
-        // compute if we should redeliver or not
-        boolean shouldRedeliver = shouldRedeliver(exchange, data);
-        if (!shouldRedeliver) {
-            deliverToFaultProcessor(exchange, callback, data);
-            return;
+            // error occurred so loop back around.....
         }
-        
-        // process the next try
-        // if we are redelivering then sleep before trying again
-        if (data.redeliveryCounter > 0) {
-            prepareExchangeForRedelivery(exchange);
-
-            // wait until we should redeliver using a timer to avoid thread blocking
-            data.redeliveryDelay = data.currentRedeliveryPolicy.calculateRedeliveryDelay(data.redeliveryDelay, data.redeliveryCounter);
-            REDELIVER_TIMER.schedule(new RedeliverTimerTask(exchange, callback, data), data.redeliveryDelay);
 
-            // letting onRedeliver be executed
-            deliverToRedeliveryProcessor(exchange, callback, data);
-        }
     }
 
     // Properties
@@ -369,8 +269,7 @@
      * Gives an optional configure redelivery processor a chance to process before the Exchange
      * will be redelivered. This can be used to alter the Exchange.
      */
-    private void deliverToRedeliveryProcessor(final Exchange exchange, final AsyncCallback callback,
-                                              final RedeliveryData data) {
+    private void deliverToRedeliveryProcessor(final Exchange exchange, final RedeliveryData data) {
         if (data.onRedeliveryProcessor == null) {
             return;
         }
@@ -379,41 +278,39 @@
             log.trace("RedeliveryProcessor " + data.onRedeliveryProcessor + " is processing Exchange: " + exchange + " before its redelivered");
         }
 
-        AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.onRedeliveryProcessor);
-        afp.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                log.trace("Redelivery processor done");
-                // do NOT call done on callback as this is the redelivery processor that
-                // is done. we should not mark the entire exchange as done.
-            }
-        });
+        try {
+            data.onRedeliveryProcessor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        log.trace("Redelivery processor done");
     }
 
     /**
      * All redelivery attempts failed so move the exchange to the fault processor (eg the dead letter queue)
      */
-    private boolean deliverToFaultProcessor(final Exchange exchange, final AsyncCallback callback,
-                                            final RedeliveryData data) {
+    private void deliverToFaultProcessor(final Exchange exchange, final RedeliveryData data) {
+        if (data.failureProcessor == null) {
+            return;
+        }
+
         // we did not success with the redelivery so now we let the failure processor handle it
         ExchangeHelper.setFailureHandled(exchange);
         // must decrement the redelivery counter as we didn't process the redelivery but is
         // handling by the failure handler. So we must -1 to not let the counter be out-of-sync
         decrementRedeliveryCounter(exchange);
 
-        AsyncProcessor afp = AsyncProcessorTypeConverter.convert(data.failureProcessor);
-        boolean sync = afp.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-                log.trace("Fault processor done");
-                prepareExchangeForFailure(exchange, data.handledPredicate);
-                callback.done(data.sync);
-            }
-        });
+        try {
+            data.failureProcessor.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
+        log.trace("Fault processor done");
+        prepareExchangeForFailure(exchange, data.handledPredicate);
 
         String msg = "Failed delivery for exchangeId: " + exchange.getExchangeId()
                 + ". Handled by the failure processor: " + data.failureProcessor;
         logFailedDelivery(false, exchange, msg, data, null);
-
-        return sync;
     }
 
     private void prepareExchangeForFailure(Exchange exchange, Predicate handledPredicate) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri May  1 09:39:28 2009
@@ -19,8 +19,6 @@
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Channel;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -28,7 +26,6 @@
 import org.apache.camel.model.ProcessorDefinition;
 import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
 
 /**
@@ -43,7 +40,7 @@
  *
  * @version $Revision$
  */
-public class DefaultChannel extends ServiceSupport implements AsyncProcessor, Channel {
+public class DefaultChannel extends ServiceSupport implements Processor, Channel {
 
     private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
     private Processor errorHandler;
@@ -141,24 +138,10 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    public boolean process(Exchange exchange, AsyncCallback callback) {
         Processor processor = getOutput();
-
-        if (processor instanceof AsyncProcessor) {
-            return ((AsyncProcessor) processor).process(exchange, callback);
-        } else if (processor != null) {
-            try {
-                processor.process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
+        if (processor != null) {
+            processor.process(exchange);
         }
-
-        callback.done(true);
-        return true;
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultErrorHandler.java Fri May  1 09:39:28 2009
@@ -16,15 +16,11 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.processor.exceptionpolicy.ExceptionPolicyStrategy;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -34,11 +30,11 @@
  *
  * @version $Revision$
  */
-public class DefaultErrorHandler extends ErrorHandlerSupport implements AsyncProcessor {
-    private AsyncProcessor output;
+public class DefaultErrorHandler extends ErrorHandlerSupport implements Processor {
+    private Processor output;
 
     public DefaultErrorHandler(Processor output, ExceptionPolicyStrategy exceptionPolicyStrategy) {
-        this.output = AsyncProcessorTypeConverter.convert(output);
+        this.output = output;
         setExceptionPolicy(exceptionPolicyStrategy);
     }
 
@@ -52,32 +48,28 @@
     }
 
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
+        try {
+            output.process(exchange);
+        } catch (Exception e) {
+            exchange.setException(e);
+        }
 
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        return output.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-
-                // do not handle transacted exchanges as this error handler does not support it
-                boolean handle = true;
-                if (exchange.isTransacted() && !supportTransacted()) {
-                    handle = false;
-                    if (log.isDebugEnabled()) {
-                        log.debug("This error handler does not support transacted exchanges."
-                            + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
-                    }
-                }
-
-                if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
-                    handleException(exchange);
-                }
-                callback.done(sync);
+        // do not handle transacted exchanges as this error handler does not support it
+        boolean handle = true;
+        if (exchange.isTransacted() && !supportTransacted()) {
+            handle = false;
+            if (log.isDebugEnabled()) {
+                log.debug("This error handler does not support transacted exchanges."
+                    + " Bypassing this error handler: " + this + " for exchangeId: " + exchange.getExchangeId());
             }
-        });
+        }
+
+        if (handle && exchange.getException() != null && !ExchangeHelper.isFailureHandled(exchange)) {
+            handleException(exchange);
+        }
     }
 
-    private void handleException(Exchange exchange) {
+    private void handleException(Exchange exchange) throws Exception {
         Exception e = exchange.getException();
 
         // store the original caused exception in a property, so we can restore it later
@@ -110,12 +102,8 @@
         MessageHelper.resetStreamCache(exchange.getIn());
     }
 
-    private boolean deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) {
-        AsyncProcessor afp = AsyncProcessorTypeConverter.convert(failureProcessor);
-        return afp.process(exchange, new AsyncCallback() {
-            public void done(boolean sync) {
-            }
-        });
+    private void deliverToFaultProcessor(final Exchange exchange, final Processor failureProcessor) throws Exception {
+        failureProcessor.process(exchange);
     }
 
     private void prepareExchangeAfterOnException(Exchange exchange, Predicate handledPredicate) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelegateAsyncProcessor.java Fri May  1 09:39:28 2009
@@ -31,6 +31,8 @@
 /**
  * A Delegate pattern which delegates processing to a nested AsyncProcessor which can
  * be useful for implementation inheritance when writing an {@link org.apache.camel.spi.Policy}
+ *
+ * @deprecated
  */
 public class DelegateAsyncProcessor extends ServiceSupport implements AsyncProcessor, Navigate {
     protected AsyncProcessor processor;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java Fri May  1 09:39:28 2009
@@ -16,12 +16,11 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 
-public class ExchangePatternProcessor implements AsyncProcessor {
+public class ExchangePatternProcessor implements Processor {
     private ExchangePattern exchangePattern = ExchangePattern.InOnly;
     
     public ExchangePatternProcessor() {
@@ -39,10 +38,4 @@
         exchange.setPattern(exchangePattern);        
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        exchange.setPattern(exchangePattern);
-        callback.done(true);
-        return true;
-    }
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Fri May  1 09:39:28 2009
@@ -50,6 +50,8 @@
  */
 public class MulticastProcessor extends ServiceSupport implements Processor, Navigate {
 
+    // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
+
     static class ProcessorExchangePair {
         private final Processor processor;
         private final Exchange exchange;
@@ -163,7 +165,7 @@
                 exchanges.add(subExchange);
                 completedExchanges.increment(); 
                 ProcessCall call = new ProcessCall(subExchange, producer, new AsyncCallback() {
-                    public void done(boolean doneSynchronously) {
+                    public void done(boolean sync) {
                         if (streaming && aggregationStrategy != null) {
                             doAggregate(result, subExchange);
                         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Fri May  1 09:39:28 2009
@@ -40,6 +40,8 @@
 public class Pipeline extends MulticastProcessor implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(Pipeline.class);
 
+    // TODO: Cleanup this when AsyncProcessor/AsyncCallback is replaced with new async API
+
     public Pipeline(Collection<Processor> processors) {
         super(processors);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Fri May  1 09:39:28 2009
@@ -16,17 +16,14 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.Service;
 import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -35,11 +32,10 @@
  *
  * @version $Revision$
  */
-public class SendProcessor extends ServiceSupport implements AsyncProcessor, Service {
+public class SendProcessor extends ServiceSupport implements Processor {
     protected static final transient Log LOG = LogFactory.getLog(SendProcessor.class);
     protected Endpoint destination;
     protected Producer producer;
-    protected AsyncProcessor processor;
     protected ExchangePattern pattern;
 
     public SendProcessor(Endpoint destination) {
@@ -70,40 +66,17 @@
         }
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        if (producer == null) {
-            if (isStopped()) {
-                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
-            } else {
-                exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
-            }
-            callback.done(true);
-            return true;
-        } else {
-            exchange = configureExchange(exchange);
-            return processor.process(exchange, callback);
-        }
-    }
-
     public Endpoint getDestination() {
         return destination;
     }
 
     protected void doStart() throws Exception {
         this.producer = destination.createProducer();
-        this.producer.start();
-        this.processor = AsyncProcessorTypeConverter.convert(producer);
+        ServiceHelper.startService(this.producer);
     }
 
     protected void doStop() throws Exception {
-        if (producer != null) {
-            try {
-                producer.stop();
-            } finally {
-                producer = null;
-                processor = null;
-            }
-        }
+        ServiceHelper.stopService(this.producer);
     }
 
     protected Exchange configureExchange(Exchange exchange) {
@@ -113,8 +86,4 @@
         return exchange;
     }
 
-    public Processor getProcessor() {
-        return processor;
-    }
-
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/UnitOfWorkProcessor.java Fri May  1 09:39:28 2009
@@ -16,9 +16,8 @@
  */
 package org.apache.camel.processor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.impl.DefaultUnitOfWork;
 import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException;
 
@@ -26,9 +25,9 @@
  * Handles calling the UnitOfWork.done() method when processing of an exchange
  * is complete.
  */
-public final class UnitOfWorkProcessor extends DelegateAsyncProcessor {
+public final class UnitOfWorkProcessor extends DelegateProcessor {
 
-    public UnitOfWorkProcessor(AsyncProcessor processor) {
+    public UnitOfWorkProcessor(Processor processor) {
         super(processor);
     }
     
@@ -36,8 +35,9 @@
     public String toString() {
         return "UnitOfWork(" + processor + ")";
     }
-    
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
+
+    @Override
+    protected void processNext(Exchange exchange) throws Exception {
         if (exchange.getUnitOfWork() == null) {
             // If there is no existing UoW, then we should start one and
             // terminate it once processing is completed for the exchange.
@@ -48,26 +48,26 @@
             } catch (Exception e) {
                 throw wrapRuntimeCamelException(e);
             }
-            // return the process code where we do stop and cleanup
-            return processor.process(exchange, new AsyncCallback() {
-                public void done(boolean sync) {
-                    // Order here matters. We need to complete the callbacks
-                    // since they will likely update the exchange with 
-                    // some final results.
-                    callback.done(sync);
-                    exchange.getUnitOfWork().done(exchange);
-                    try {
-                        uow.stop();
-                    } catch (Exception e) {
-                        throw wrapRuntimeCamelException(e);
-                    }
-                    exchange.setUnitOfWork(null);
-                }
-            });
+
+            // process the exchange
+            try {
+                processor.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+
+            // unit of work is done
+            exchange.getUnitOfWork().done(exchange);
+            try {
+                uow.stop();
+            } catch (Exception e) {
+                throw wrapRuntimeCamelException(e);
+            }
+            exchange.setUnitOfWork(null);
         } else {
             // There was an existing UoW, so we should just pass through..
             // so that the guy the initiated the UoW can terminate it.
-            return processor.process(exchange, callback);
+            processor.process(exchange);
         }
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/WireTapProcessor.java Fri May  1 09:39:28 2009
@@ -21,7 +21,6 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
@@ -96,39 +95,6 @@
         }
     }
 
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        if (producer == null) {
-            if (isStopped()) {
-                LOG.warn("Ignoring exchange sent after processor is stopped: " + exchange);
-            } else {
-                exchange.setException(new IllegalStateException("No producer, this processor has not been started!"));
-            }
-            callback.done(true);
-            return true;
-        } else {
-            exchange = configureExchange(exchange);
-
-            final Exchange wireTapExchange = configureExchange(exchange);
-
-            // use submit instead of execute to force it to use a new thread, execute might
-            // decide to use current thread, so we must submit a new task
-            // as we dont care for the response we dont hold the future object and wait for the result
-            getExecutorService().submit(new Callable<Object>() {
-                public Object call() throws Exception {
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Processing wiretap: " + wireTapExchange);
-                    }
-                    return processor.process(wireTapExchange, callback);
-                }
-            });
-
-            // return true to indicate caller its okay, and he should not wait as this wiretap
-            // is a fire and forget
-            return true;
-        }
-    }
-
-
     @Override
     protected Exchange configureExchange(Exchange exchange) {
         if (newExchangeProcessor == null && newExchangeExpression == null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/HandleFaultInterceptor.java Fri May  1 09:39:28 2009
@@ -16,16 +16,12 @@
  */
 package org.apache.camel.processor.interceptor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.util.AsyncProcessorHelper;
 
-public class HandleFaultInterceptor extends DelegateProcessor implements AsyncProcessor {
+public class HandleFaultInterceptor extends DelegateProcessor {
 
     public HandleFaultInterceptor() {
         super();
@@ -43,23 +39,8 @@
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
         if (processor == null) {
-            // no processor so nothing to process, so return
-            callback.done(true);
-            return true;
-        }
-
-        if (processor instanceof AsyncProcessor) {
-            return ((AsyncProcessor) processor).process(exchange, new AsyncCallback() {
-                public void done(boolean doneSynchronously) {
-                    handleFault(exchange);
-                    callback.done(doneSynchronously);
-                }
-            });
+            return;
         }
 
         try {
@@ -67,10 +48,8 @@
         } catch (Exception e) {
             exchange.setException(e);
         }
-        handleFault(exchange);
 
-        callback.done(true);
-        return true;
+        handleFault(exchange);
     }
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/StreamCachingInterceptor.java Fri May  1 09:39:28 2009
@@ -16,19 +16,16 @@
  */
 package org.apache.camel.processor.interceptor;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.StreamCache;
 import org.apache.camel.processor.DelegateProcessor;
-import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.MessageHelper;
 
 /**
  * {@link DelegateProcessor} that converts a message into a re-readable format
  */
-public class StreamCachingInterceptor extends DelegateProcessor implements AsyncProcessor {
+public class StreamCachingInterceptor extends DelegateProcessor {
 
     public StreamCachingInterceptor() {
         super();
@@ -46,31 +43,13 @@
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    public boolean process(Exchange exchange, AsyncCallback callback) {
         StreamCache newBody = exchange.getIn().getBody(StreamCache.class);
         if (newBody != null) {
             exchange.getIn().setBody(newBody);
         }
         MessageHelper.resetStreamCache(exchange.getIn());
 
-        return proceed(exchange, callback);
+        getProcessor().process(exchange);
     }
 
-    public boolean proceed(Exchange exchange, AsyncCallback callback) {
-        if (getProcessor() instanceof AsyncProcessor) {
-            return ((AsyncProcessor) getProcessor()).process(exchange, callback);
-        } else {
-            try {
-                getProcessor().process(exchange);
-            } catch (Exception e) {
-                exchange.setException(e);
-            }
-            // false means processing of the exchange asynchronously,
-            callback.done(true);
-            return true;
-        }
-    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Fri May  1 09:39:28 2009
@@ -18,8 +18,6 @@
 
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.util.ObjectHelper;
@@ -83,13 +81,7 @@
         }
     }
 
-    public boolean process(Exchange exchange, final AsyncCallback callback) {
-        return processExchange(0, exchange, callback);
-    }
-
-    public boolean processExchange(final int index, final Exchange exchange, final AsyncCallback callback) {
-        boolean sync;
-
+    protected void processExchange(final int index, final Exchange exchange) {
         List<Processor> list = getProcessors();
         if (list.isEmpty()) {
             throw new IllegalStateException("No processors available to process " + exchange);
@@ -99,33 +91,18 @@
         if (processor == null) {
             throw new IllegalStateException("No processors could be chosen to process " + exchange);
         }
-        if (processor instanceof AsyncProcessor) {
-            AsyncProcessor asyncProcessor = (AsyncProcessor) processor;
-            sync = asyncProcessor.process(exchange, new AsyncCallback() {
-                public void done(boolean doSync) {
-                    // check the exchange and call the FailOverProcessor
-                    if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
-                        exchange.setException(null);
-                        processExchange(index + 1, exchange, callback);
-                    } else {
-                        callback.done(doSync);
-                    }
-                }
-            });
-        } else {
-            try {
-                processor.process(exchange);
-            } catch (Exception ex) {
-                exchange.setException(ex);
-            }
-            if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
-                exchange.setException(null);
-                processExchange(index + 1, exchange, callback);
-            }
-            sync = true;
-            callback.done(true);
+
+        try {
+            processor.process(exchange);
+        } catch (Exception ex) {
+            exchange.setException(ex);
+        }
+
+        if (isCheckedException(exchange) && index < getProcessors().size() - 1) {
+            exchange.setException(null);
+            processExchange(index + 1, exchange);
         }
-        return sync;
+
     }
 
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/LoadBalancer.java Fri May  1 09:39:28 2009
@@ -18,7 +18,6 @@
 
 import java.util.List;
 
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Processor;
 
 /**
@@ -26,7 +25,8 @@
  *
  * @version $Revision$
  */
-public interface LoadBalancer extends AsyncProcessor {
+public interface LoadBalancer extends Processor {
+
     /**
      * Adds a new processor to the load balancer
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/QueueLoadBalancer.java Fri May  1 09:39:28 2009
@@ -18,8 +18,6 @@
 
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
@@ -43,39 +41,6 @@
             processor.process(exchange);
         }
     }
-    
-    public boolean process(final Exchange exchange, final AsyncCallback callback) {
-        boolean sync = false;
-        List<Processor> list = getProcessors();
-        if (list.isEmpty()) {
-            throw new IllegalStateException("No processors available to process " + exchange);
-        }
-        Processor processor = chooseProcessor(list, exchange);
-        if (processor == null) {
-            throw new IllegalStateException("No processors could be chosen to process " + exchange);
-        } else {
-            if (processor instanceof AsyncProcessor) {
-                AsyncProcessor asyncProcessor = (AsyncProcessor)processor;
-                sync = asyncProcessor.process(exchange, new AsyncCallback() {
-                    public void done(boolean sync) {
-                        // Only handle the async case...
-                        if (!sync) {
-                            callback.done(sync);
-                        }
-                    }
-                });                
-            } else {
-                try {
-                    processor.process(exchange);
-                } catch (Exception ex) {
-                    exchange.setException(ex);
-                }
-                callback.done(false);                
-            }            
-        }
-        return sync;
-        
-    }
 
     protected abstract Processor chooseProcessor(List<Processor> processors, Exchange exchange);
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/TopicLoadBalancer.java Fri May  1 09:39:28 2009
@@ -18,7 +18,6 @@
 
 import java.util.List;
 
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 
@@ -52,17 +51,4 @@
         return exchange.copy();
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
-        List<Processor> list = getProcessors();
-        for (Processor processor : list) {
-            Exchange copy = copyExchangeStrategy(processor, exchange);
-            try {
-                processor.process(copy);
-            } catch (Exception ex) {
-                // We don't handle the exception here
-            }
-        }
-        callback.done(false);
-        return false;
-    }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/seda/SedaAsyncProcessorTest.java Fri May  1 09:39:28 2009
@@ -61,9 +61,7 @@
                 // the first is when we have finished sending to the seda producer
                 if (sync) {
                     doneSync = true;
-                }
-                // and the async should occur when the mock endpoint is done
-                if (!sync) {
+                } else {
                     latchAsync.countDown();
                 }
             }
@@ -77,14 +75,14 @@
         assertEquals("Send should occur before processor", "sendprocess", route);
         assertTrue("Sync done should have occured", doneSync);
 
-        // TODO: The AsyncProcessor does not work as expected
         // wait at most 2 seconds
         boolean zero = latchAsync.await(2, TimeUnit.SECONDS);
-        // assertTrue("Async done should have occured", zero);
+        assertTrue("Async done should have occured", zero);
 
         // how to get the response?
         String response = exchange.getOut().getBody(String.class);
-        // assertEquals("Bye World", response);
+        // TODO: we need a new API for getting the result
+        //assertEquals("Bye World", response);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/Camel715ThreadProcessorTest.java Fri May  1 09:39:28 2009
@@ -76,7 +76,7 @@
 
             for (int i = 0; i < ITERS; i++) {
                 template.send(e, new SendingProcessor(i), new AsyncCallback() {
-                    public void done(boolean arg0) {
+                    public void done(boolean sync) {
                         // Do nothing here
                     }
                 });

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java?rev=770599&r1=770598&r2=770599&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ThreadTest.java Fri May  1 09:39:28 2009
@@ -87,7 +87,6 @@
 
         final Exchange exchanges[] = new Exchange[exchangeCount];
         for (int i = 0; i < exchangeCount; i++) {
-            final int index = i;
             // Send the exchange using the async completion interface.
             // This call returns before the exchange is completed.
             exchanges[i] = template.send("direct:a", new Processor() {
@@ -98,7 +97,6 @@
                 }
             }, new AsyncCallback() {
                 public void done(boolean doneSynchronously) {
-                    log.debug("Completed: " + index + ", exception: " + exchanges[index].getException());
                     completedExchanges.countDown();
                 }
             });
@@ -142,9 +140,11 @@
         return new RouteBuilder() {
             public void configure() {
                 // START SNIPPET: example
-                from("direct:a").thread(1).process(new Processor() {
+                from("direct:a").to("seda:async");
+
+                from("seda:async").thread(1).process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        continueProcessing.await();
+                        continueProcessing.await(10, TimeUnit.SECONDS);
                     }
                 }).to("mock:result");
                 // END SNIPPET: example



Mime
View raw message