camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r772172 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/ camel-core...
Date Wed, 06 May 2009 12:23:48 GMT
Author: davsclaus
Date: Wed May  6 12:23:17 2009
New Revision: 772172

URL: http://svn.apache.org/viewvc?rev=772172&view=rev
Log:
CAMEL-1572: Migrated MulticastProcessor from old Async API to new API. CAMEL-1587: Aligned thread pool ref options to be executorServiceRef to be consistent.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java   (with props)
Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncCallback.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/AsyncProcessor.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/xml/buildRouteWithInterceptor.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/xml/SpringXmlRouteBuilderTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
    camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
    camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelContext.java Wed May  6 12:23:17 2009
@@ -20,7 +20,6 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.model.RouteDefinition;

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Wed May  6 12:23:17 2009
@@ -17,10 +17,10 @@
 package org.apache.camel;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.ExecutorService;
 
 /**
  * Template (named like Spring's TransactionTemplate & JmsTemplate

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/GenericFileProducer.java Wed May  6 12:23:17 2009
@@ -46,8 +46,7 @@
     protected String getFileSeparator() {
         return File.separator;
     }
-    
-   
+
     protected String normalizePath(String name) {        
         return FileUtil.normalizePath(name);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultProducerTemplate.java Wed May  6 12:23:17 2009
@@ -454,7 +454,9 @@
     protected void doStop() throws Exception {
         producerCache.stop();
         endpointCache.clear();
-        executor.shutdown();
+        if (executor != null) {
+            executor.shutdown();
+        }
     }
 
     protected Object extractResultBody(Exchange result) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/GroupedExchange.java Wed May  6 12:23:17 2009
@@ -23,6 +23,7 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
 
 
 /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java Wed May  6 12:23:17 2009
@@ -52,8 +52,6 @@
     public void process(Exchange exchange) throws Exception {
         if (processor != null) {
 
-            // TODO: why not use millis instead of nano?
-
             long startTime = 0;
             if (counter != null) {
                 startTime = System.nanoTime();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Wed May  6 12:23:17 2009
@@ -17,7 +17,7 @@
 package org.apache.camel.model;
 
 import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
@@ -29,7 +29,6 @@
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.processor.interceptor.StreamCachingInterceptor;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -44,12 +43,15 @@
     private Boolean parallelProcessing;
     @XmlAttribute(required = false)
     private String strategyRef;
+    @XmlTransient
+    private ExecutorService executorService;
+    @XmlAttribute(required = false)
+    private String executorServiceRef;
     @XmlAttribute(required = false)
-    private String threadPoolRef;    
+    private Boolean streaming;
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
-    @XmlTransient
-    private Executor executor;
+
 
     @Override
     public String toString() {
@@ -80,7 +82,7 @@
     }
     
     /**
-     * use a thread pool to do the multicasting work
+     * Uses the {@link java.util.concurrent.ExecutorService} to do the multicasting work
      *     
      * @return the builder
      */
@@ -90,39 +92,37 @@
     }
     
     /**
-     * Set the multicasting action's thread model
-     *
-     * @param parallelProcessing <tt>true</tt> to use a thread pool, if <tt>false</tt> then work is done in the
-     * calling thread.
+     * Aggregates the responses as the are done (e.g. out of order sequence)
      *
      * @return the builder
      */
-    public MulticastDefinition parallelProcessing(boolean parallelProcessing) {
-        setParallelProcessing(parallelProcessing);
+    public MulticastDefinition streaming() {
+        setStreaming(true);
         return this;
     }
-    
+       
     /**
-     * Setting the executor for executing the multicasting action. 
+     * Setting the executor service for executing the multicasting action.
      *
      * @return the builder
      */
-    public MulticastDefinition executor(Executor executor) {
-        setExecutor(executor);
+    public MulticastDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
         return this;
     }    
         
     protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
-        if (aggregationStrategy == null && strategyRef != null) {
+        if (strategyRef != null) {
             aggregationStrategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
         }
         if (aggregationStrategy == null) {
+            // default to use latest aggregation strategy
             aggregationStrategy = new UseLatestAggregationStrategy();
         }
-        if (threadPoolRef != null) {
-            executor = routeContext.lookup(threadPoolRef, Executor.class);
+        if (executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
         }
-        return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executor);
+        return new MulticastProcessor(list, aggregationStrategy, isParallelProcessing(), executorService, isStreaming());
     }
 
     public AggregationStrategy getAggregationStrategy() {
@@ -142,12 +142,20 @@
         this.parallelProcessing = parallelProcessing;        
     }
 
-    public Executor getExecutor() {
-        return executor;
+    public boolean isStreaming() {
+        return streaming != null ? streaming : false;
     }
 
-    public void setExecutor(Executor executor) {
-        this.executor = executor;        
+    public void setStreaming(boolean streaming) {
+        this.streaming = streaming;
     }
-    
+
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
+    }
+
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SplitDefinition.java Wed May  6 12:23:17 2009
@@ -16,11 +16,8 @@
  */
 package org.apache.camel.model;
 
-import java.util.concurrent.Executor;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -46,17 +43,17 @@
 public class SplitDefinition extends ExpressionNode {
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlTransient
+    private ExecutorService executorService;
     @XmlAttribute(required = false)
     private Boolean parallelProcessing;
-    @XmlTransient
-    private Executor executor;
     @XmlAttribute(required = false)
     private String strategyRef;
     @XmlAttribute(required = false)
-    private String threadPoolExecutorRef;
+    private String executorServiceRef;
     @XmlAttribute(required = false)
     private Boolean streaming = false;
-    
+
     public SplitDefinition() {
     }
 
@@ -82,9 +79,9 @@
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = routeContext.createProcessor(this);
         aggregationStrategy = createAggregationStrategy(routeContext);
-        executor = createThreadPoolExecutor(routeContext);
+        executorService = createExecutorService(routeContext);
         return new Splitter(getExpression().createExpression(routeContext), childProcessor, aggregationStrategy,
-                isParallelProcessing(), executor, streaming);
+                isParallelProcessing(), executorService, streaming);
     }
 
     
@@ -100,16 +97,15 @@
         return strategy;
     }        
     
-    private Executor createThreadPoolExecutor(RouteContext routeContext) {
-        Executor executor = getExecutor();
-        if (executor == null && threadPoolExecutorRef != null) {
-            executor = routeContext.lookup(threadPoolExecutorRef, ThreadPoolExecutor.class);
+    private ExecutorService createExecutorService(RouteContext routeContext) {
+        if (executorServiceRef != null) {
+            executorService = routeContext.lookup(executorServiceRef, ExecutorService.class);
         }
-        if (executor == null) {
+        if (executorService == null) {
             // fall back and use default
-            executor = new ThreadPoolExecutor(4, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+            executorService = Executors.newScheduledThreadPool(5);
         }
-        return executor;
+        return executorService;
     }         
     
     // Fluent API
@@ -168,13 +164,13 @@
     }
     
     /**
-     * Setting the executor for executing the splitting action. 
+     * Setting the executor service for executing the splitting action.
      *
-     * @param executor the executor
+     * @param executorService the executor service
      * @return the builder
      */
-    public SplitDefinition executor(Executor executor) {
-        setExecutor(executor);
+    public SplitDefinition executorService(ExecutorService executorService) {
+        setExecutorService(executorService);
         return this;
     }
     
@@ -197,7 +193,7 @@
     /**
      * The splitter should use streaming -- exchanges are being sent as the data for them becomes available.
      * This improves throughput and memory usage, but it has a drawback: 
-     * - the sent exchanges will no longer contain the {@link Splitter#SPLIT_SIZE} header property 
+     * - the sent exchanges will no longer contain the {@link org.apache.camel.Exchange#SPLIT_SIZE} header property
      * 
      * @return whether or not streaming should be used
      */
@@ -207,13 +203,13 @@
 
     public void setStreaming(boolean streaming) {
         this.streaming = streaming;
-    }  
-    
-    public Executor getExecutor() {
-        return executor;
     }
 
-    public void setExecutor(Executor executor) {
-        this.executor = executor;
+    public ExecutorService getExecutorService() {
+        return executorService;
+    }
+
+    public void setExecutorService(ExecutorService executorService) {
+        this.executorService = executorService;
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed May  6 12:23:17 2009
@@ -18,18 +18,16 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.camel.AsyncCallback;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
@@ -39,6 +37,9 @@
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.concurrent.AtomicExchange;
 import org.apache.camel.util.concurrent.CountingLatch;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
@@ -50,8 +51,14 @@
  */
 public class MulticastProcessor extends ServiceSupport implements Processor, Navigate {
 
-    // TODO: Use JDK CompletionService to get rid of the AsyncProcessor/AsyncCallback
+    private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
+
+    // TODO: Add more logging
+    // TODO: Add option to stop if an exception was thrown during processing to break asap
 
+    /**
+     * Class that represent each step in the multicast route to do
+     */
     static class ProcessorExchangePair {
         private final Processor processor;
         private final Exchange exchange;
@@ -70,51 +77,34 @@
         }
     }
 
-    private Collection<Processor> processors;
-    private AggregationStrategy aggregationStrategy;
-    private boolean isParallelProcessing;
-    private Executor executor;
+    private final Collection<Processor> processors;
+    private final AggregationStrategy aggregationStrategy;
+    private final boolean isParallelProcessing;
     private final boolean streaming;
-    private final AtomicBoolean shutdown = new AtomicBoolean(true);
+    private ExecutorService executorService;
 
     public MulticastProcessor(Collection<Processor> processors) {
         this(processors, null);
     }
 
     public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy) {
-        this(processors, aggregationStrategy, false, null);
+        this(processors, aggregationStrategy, false, null, false);
     }
     
-    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor executor) {
-        this(processors, aggregationStrategy, parallelProcessing, executor, false);
-    }
-
-    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, Executor executor, boolean streaming) {
+    public MulticastProcessor(Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, ExecutorService executorService, boolean streaming) {
         notNull(processors, "processors");
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
         this.isParallelProcessing = parallelProcessing;
-        if (isParallelProcessing) {
-            if (executor != null) {
-                this.executor = executor;
-            } else { 
-                // setup default Executor
-                this.executor = new ThreadPoolExecutor(processors.size(), processors.size(), 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(processors.size()));
-            }
-        }
+        this.executorService = executorService;
         this.streaming = streaming;
-    }
 
-    /**
-     * A helper method to convert a list of endpoints into a list of processors
-     */
-    public static <E extends Exchange> Collection<Processor> toProducers(Collection<Endpoint> endpoints)
-        throws Exception {
-        Collection<Processor> answer = new ArrayList<Processor>();
-        for (Endpoint endpoint : endpoints) {
-            answer.add(endpoint.createProducer());
+        if (isParallelProcessing()) {
+            if (this.executorService == null) {
+                // setup default executor
+                this.executorService = Executors.newScheduledThreadPool(5);
+            }
         }
-        return answer;
     }
 
     @Override
@@ -122,85 +112,134 @@
         return "Multicast" + getProcessors();
     }
 
-    class ProcessCall implements Runnable {
-        private final Exchange exchange;
-        private final AsyncCallback callback;
-        private final Processor processor;
+    public void process(Exchange exchange) throws Exception {
+        final AtomicExchange result = new AtomicExchange();
+        final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
+        
+        if (isParallelProcessing() && isStreaming()) {
+            doProcessNewParallelStreaming(result, pairs);
+        } else if (isParallelProcessing()) {
+            doProcessNewParallel(result, pairs);
+        } else {
+            doProcessSequntiel(result, pairs);
+        }
 
-        public ProcessCall(Exchange exchange, Processor processor, AsyncCallback callback) {
-            this.exchange = exchange;
-            this.callback = callback;
-            this.processor = processor;
+        if (result.get() != null) {
+            ExchangeHelper.copyResults(exchange, result.get());
         }
+    }
 
-        public void run() {
-            if (shutdown.get()) {
-                exchange.setException(new RejectedExecutionException());
-                callback.done(false);
-            } else {
-                try {
-                    processor.process(exchange);
-                } catch (Exception ex) {
-                    exchange.setException(ex);
+    protected void doProcessNewParallelStreaming(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws InterruptedException, ExecutionException {
+        // execute tasks in parallel and aggregate in the order they are finished (out of order sequence)
+
+        CompletionService<Exchange> completion = new ExecutorCompletionService<Exchange>(executorService);
+        int total = 0;
+
+        for (ProcessorExchangePair pair : pairs) {
+            final Processor producer = pair.getProcessor();
+            final Exchange subExchange = pair.getExchange();
+            updateNewExchange(subExchange, total, pairs);
+
+            completion.submit(new Callable<Exchange>() {
+                public Exchange call() throws Exception {
+                    try {
+                        producer.process(subExchange);
+                    } catch (Exception e) {
+                        subExchange.setException(e);
+                    }
+                    return subExchange;
                 }
-                callback.done(false);
+            });
+
+            total++;
+        }
+
+        for (int i = 0; i < total; i++) {
+            Future<Exchange> future = completion.take();
+            Exchange subExchange = future.get();
+            if (aggregationStrategy != null) {
+                doAggregate(result, subExchange);
             }
         }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Done parallel streaming processing " + total + " exchanges");
+        }
     }
 
-    public void process(Exchange exchange) throws Exception {
-        final AtomicExchange result = new AtomicExchange();
+    protected void doProcessNewParallel(final AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws InterruptedException {
+        // execute tasks in parallel but aggregate in the same order as the tasks was submitted (in order sequence)
 
-        Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
-        
-        // Parallel Processing the producer
-        if (isParallelProcessing) {
-            List<Exchange> exchanges = new LinkedList<Exchange>();
-            final CountingLatch completedExchanges = new CountingLatch();
-            int i = 0;
-            for (ProcessorExchangePair pair : pairs) {
-                Processor producer = pair.getProcessor();
-                final Exchange subExchange = pair.getExchange();
-                updateNewExchange(subExchange, i, pairs);
-                exchanges.add(subExchange);
-                completedExchanges.increment(); 
-                ProcessCall call = new ProcessCall(subExchange, producer, new AsyncCallback() {
-                    public void done(boolean sync) {
-                        if (streaming && aggregationStrategy != null) {
-                            doAggregate(result, subExchange);
-                        }
-                        completedExchanges.decrement();
+        // TODO I wonder if there is a completion servce that can order the take in the same order as the tasks
+        // was submitted, if so we can do aggregate to catch-up while still processing for more performance
+        // this one completes all tasks before doing aggregation
+
+        final List<Exchange> ordered = new ArrayList<Exchange>();
+        final CountingLatch latch = new CountingLatch();
+        int total = 0;
+
+        for (ProcessorExchangePair pair : pairs) {
+            final Processor producer = pair.getProcessor();
+            final Exchange subExchange = pair.getExchange();
+            updateNewExchange(subExchange, total, pairs);
+
+            // add to the list of ordered exchanges to aggregate when all the tasks is finished
+            ordered.add(subExchange);
+
+            // increment number of tasks to do
+            latch.increment();
+            executorService.submit(new Runnable() {
+                public void run() {
+                    try {
+                        producer.process(subExchange);
+                    } catch (Exception e) {
+                        subExchange.setException(e);
                     }
+                    // this task is done so decrement
+                    latch.decrement();
+                }
+            });
+
+            total++;
+        }
+
+        // wait for all tasks to be complete
+        latch.await();
 
-                });
-                executor.execute(call);
-                i++;
+        // aggregate in order
+        for (Exchange subExchange : ordered) {
+            if (aggregationStrategy != null) {
+                doAggregate(result, subExchange);
             }
-            completedExchanges.await();
-            if (!streaming && aggregationStrategy != null) {
-                for (Exchange resultExchange : exchanges) {
-                    doAggregate(result, resultExchange);
-                }
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Done parallel processing " + total + " exchanges");
+        }
+    }
+
+    protected void doProcessSequntiel(AtomicExchange result, Iterable<ProcessorExchangePair> pairs) throws Exception {
+        int total = 0;
+
+        for (ProcessorExchangePair pair : pairs) {
+            Processor producer = pair.getProcessor();
+            Exchange subExchange = pair.getExchange();
+            updateNewExchange(subExchange, total, pairs);
+
+            // process it sequentially
+            producer.process(subExchange);
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Sequientel processing complete for number " + total + " exchange: " + subExchange);
             }
 
-        } else {
-            // we call the producer one by one sequentially
-            int i = 0;
-            for (ProcessorExchangePair pair : pairs) {
-                Processor producer = pair.getProcessor();
-                Exchange subExchange = pair.getExchange();
-                updateNewExchange(subExchange, i, pairs);
-                try {
-                    producer.process(subExchange);
-                } catch (Exception e) {
-                    subExchange.setException(e);
-                }
+            if (aggregationStrategy != null) {
                 doAggregate(result, subExchange);
-                i++;
             }
+            total++;
         }
-        if (result.get() != null) {
-            ExchangeHelper.copyResults(exchange, result.get());
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Done sequientel processing " + total + " exchanges");
         }
     }
 
@@ -235,25 +274,14 @@
     }
 
     protected void doStop() throws Exception {
-        shutdown.set(true);
-        if (executor != null && executor instanceof ThreadPoolExecutor) {
-            ((ThreadPoolExecutor)executor).shutdown();
-            ((ThreadPoolExecutor)executor).awaitTermination(0, TimeUnit.SECONDS);
+        if (executorService != null) {
+            executorService.shutdown();
+            executorService.awaitTermination(0, TimeUnit.SECONDS);
         }
         ServiceHelper.stopServices(processors);
     }
 
     protected void doStart() throws Exception {
-        shutdown.set(false);
-        if (executor != null && executor instanceof ThreadPoolExecutor) {
-            ((ThreadPoolExecutor)executor).setRejectedExecutionHandler(new RejectedExecutionHandler() {
-                public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
-                    ProcessCall call = (ProcessCall)runnable;
-                    call.exchange.setException(new RejectedExecutionException());
-                    call.callback.done(false);
-                }
-            });
-        }
         ServiceHelper.startServices(processors);
     }
     
@@ -282,10 +310,6 @@
         return aggregationStrategy;
     }
 
-    public Executor getExecutor() {
-        return executor;
-    }
-
     public boolean isParallelProcessing() {
         return isParallelProcessing;
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Wed May  6 12:23:17 2009
@@ -22,7 +22,7 @@
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -50,8 +50,8 @@
     }
 
     public Splitter(Expression expression, Processor destination, AggregationStrategy aggregationStrategy,
-                    boolean parallelProcessing, Executor executor, boolean streaming) {
-        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, executor, streaming);
+                    boolean parallelProcessing, ExecutorService executorService, boolean streaming) {
+        super(Collections.singleton(destination), aggregationStrategy, parallelProcessing, executorService, streaming);
 
         this.expression = expression;
         notNull(expression, "expression");

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java?rev=772172&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java Wed May  6 12:23:17 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.GroupedExchange;
+
+/**
+ * Aggregate all exchanges into a single combined {@link org.apache.camel.impl.GroupedExchange} holding all
+ * the exchanges gathered.
+ *
+ * @version $Revision$
+ */
+public class GroupedExchangeAggregationStrategy implements AggregationStrategy {
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        GroupedExchange answer;
+        if (!(oldExchange instanceof GroupedExchange)) {
+            answer = new GroupedExchange(oldExchange);
+            answer.addExchange(oldExchange);
+        } else {
+            answer = (GroupedExchange) oldExchange;
+        }
+
+        answer.addExchange(newExchange);
+        return answer;
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedExchangeAggregationStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MultiCastAggregatorTest.java Wed May  6 12:23:17 2009
@@ -71,7 +71,7 @@
                 // START SNIPPET: example
                 // The message will be sent parallelly to the endpoints
                 from("direct:parallel")
-                    .multicast(new BodyOutAggregatingStrategy(), true).executor(tpExecutor)
+                    .multicast(new BodyOutAggregatingStrategy(), true).executorService(tpExecutor)
                         .to("direct:x", "direct:y", "direct:z");
                 // Multicast the message in a sequential way
                 from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java?rev=772172&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java Wed May  6 12:23:17 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class MulticastParallelStreamingTest extends ContextTestSupport {
+
+    public void testMulticastParallelStreaming() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("BA");
+
+        template.sendBody("direct:start", "Hello");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast(new AggregationStrategy() {
+                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                            String body = oldExchange.getIn().getBody(String.class);
+                            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
+                            return oldExchange;
+                        }
+                    }).parallelProcessing().streaming()
+                        .to("direct:a", "direct:b")
+                    // use end to indicate end of multicast route
+                    .end()
+                    .to("mock:result");
+
+                from("direct:a").delay(100).setBody(constant("A"));
+
+                from("direct:b").setBody(constant("B"));
+            }
+        };
+    }
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java?rev=772172&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java Wed May  6 12:23:17 2009
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class MulticastParallelTest extends ContextTestSupport {
+
+    public void testMulticastParallel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("AB");
+
+        template.sendBody("direct:start", "Hello");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast(new AggregationStrategy() {
+                            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                                String body = oldExchange.getIn().getBody(String.class);
+                                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
+                                return oldExchange;
+                            }
+                        })
+                        .parallelProcessing().to("direct:a", "direct:b")
+                    // use end to indicate end of multicast route
+                    .end()
+                    .to("mock:result");
+
+                from("direct:a").delay(100).setBody(constant("A"));
+
+                from("direct:b").setBody(constant("B"));
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithCustomThreadPoolExecutorTest.java Wed May  6 12:23:17 2009
@@ -32,7 +32,7 @@
     protected ThreadPoolExecutor customThreadPoolExecutor = new ThreadPoolExecutor(8, 16, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
 
     public void testSplitterWithCustomThreadPoolExecutor() throws Exception {
-        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) getSplitter().getExecutor();
+        ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) getSplitter().getExecutorService();
         // this should be sufficient as core pool size is the only thing I changed from the default
         assertTrue(threadPoolExecutor.getCorePoolSize() == customThreadPoolExecutor.getCorePoolSize());
         assertTrue(threadPoolExecutor.getMaximumPoolSize() == customThreadPoolExecutor.getMaximumPoolSize());
@@ -71,7 +71,7 @@
             public void configure() {
                 from("direct:parallel-custom-pool")
                     .split(body().tokenize(",")).parallelProcessing(true)
-                    .executor(customThreadPoolExecutor).to("mock:result");
+                    .executorService(customThreadPoolExecutor).to("mock:result");
             }
         };
     }

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/xml/SpringXmlRouteBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/xml/SpringXmlRouteBuilderTest.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/xml/SpringXmlRouteBuilderTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/xml/SpringXmlRouteBuilderTest.java Wed May  6 12:23:17 2009
@@ -56,14 +56,6 @@
     }
 
     @Override
-    protected List<Route> buildRouteWithInterceptor() {
-        List<Route> answer = getRoutesFromContext("org/apache/camel/spring/xml/buildRouteWithInterceptor.xml");
-        interceptor1 = (DelegateProcessor) applicationContext.getBean("interceptor1");
-        interceptor2 = (DelegateProcessor) applicationContext.getBean("interceptor2");
-        return answer;
-    }
-
-    @Override
     protected List<Route> buildSimpleRouteWithHeaderPredicate() {
         return getRoutesFromContext("org/apache/camel/spring/xml/buildSimpleRouteWithHeaderPredicate.xml");
     }

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml Wed May  6 12:23:17 2009
@@ -26,7 +26,7 @@
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
     <route>
       <from uri="direct:parallel"/>
-      <multicast strategyRef="bodyOutAggregatorStrategy" parallelProcessing="true" threadPoolRef="mySingleThreadExcutor">
+      <multicast strategyRef="bodyOutAggregatorStrategy" parallelProcessing="true" executorServiceRef="mySingleThreadExcutor">
          <to uri="direct:x"/>
          <to uri="direct:y"/>
          <to uri="direct:z"/>

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/splitterWithCustomThreadPoolExecutor.xml Wed May  6 12:23:17 2009
@@ -26,7 +26,7 @@
   <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
     <route>
       <from uri="direct:parallel-custom-pool"/>
-      <split threadPoolExecutorRef="threadPoolExecutor">
+      <split executorServiceRef="threadPoolExecutor">
         <xpath>/invoice/lineItems</xpath>
         <to uri="mock:result"/>
       </split>

Modified: camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java
URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java (original)
+++ camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/LoanBroker.java Wed May  6 12:23:17 2009
@@ -98,7 +98,7 @@
             // Set the aggregation strategy for aggregating the out message            
             .multicast(new BankResponseAggregationStrategy().setAggregatingOutMessage(true))
                 // Send out the request to three different banks in parallel
-                .parallelProcessing(true).to("jms:queue2:bank1", "jms:queue2:bank2", "jms:queue2:bank3");
+                .parallelProcessing().to("jms:queue2:bank1", "jms:queue2:bank2", "jms:queue2:bank3");
         
         // Each bank processor will process the message and put the response message back
         from("jms:queue2:bank1").process(new Bank("bank1"));

Modified: camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java
URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java?rev=772172&r1=772171&r2=772172&view=diff
==============================================================================
--- camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java (original)
+++ camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/LoanBroker.java Wed May  6 12:23:17 2009
@@ -96,7 +96,7 @@
                 .multicast(new BankResponseAggregationStrategy())
                     // Camel will create a thread pool with the size of the send to endpoints
                     // for sending the message in parallel
-                    .parallelProcessing(true)
+                    .parallelProcessing()
                     .to(Constants.BANK1_URI, Constants.BANK2_URI, Constants.BANK3_URI);
 
     //END SNIPPET: dsl



Mime
View raw message