camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r956902 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/async/ test/resources/
Date Tue, 22 Jun 2010 14:11:06 GMT
Author: davsclaus
Date: Tue Jun 22 14:11:05 2010
New Revision: 956902

URL: http://svn.apache.org/viewvc?rev=956902&view=rev
Log:
CAMEL-2838: Multicast, Recipient List and Splitter EIPs now support async routing engine.
Work in progress.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
  (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
  (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java
      - copied, changed from r956800, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java
    camel/trunk/camel-core/src/test/resources/log4j.properties

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultTracedRouteNodes.java
Tue Jun 22 14:11:05 2010
@@ -74,7 +74,9 @@ public class DefaultTracedRouteNodes imp
     }
 
     public void popBlock() {
-        routeNodes.pop();
+        if (!routeNodes.isEmpty()) {
+            routeNodes.pop();
+        }
     }
 
     public void pushBlock() {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Tue Jun 22 14:11:05 2010
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -28,6 +29,8 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
@@ -37,9 +40,11 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.ErrorHandlerBuilder;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TracedRouteNodes;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -59,44 +64,55 @@ import static org.apache.camel.util.Obje
  * @see Pipeline
  * @version $Revision$
  */
-public class MulticastProcessor extends ServiceSupport implements Processor, Navigate<Processor>,
Traceable {
+public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>,
Traceable {
 
     private static final transient Log LOG = LogFactory.getLog(MulticastProcessor.class);
 
     /**
      * Class that represent each step in the multicast route to do
      */
-    static final class ProcessorExchangePair {
+    static final class DefaultProcessorExchangePair implements ProcessorExchangePair {
+        private final int index;
         private final Processor processor;
         private final Processor prepared;
         private final Exchange exchange;
 
-        /**
-         * Private constructor as you must use the static creator
-         * {@link org.apache.camel.processor.MulticastProcessor#createProcessorExchangePair(org.apache.camel.Processor,
-         *        org.apache.camel.Exchange)} which prepares the processor before its ready
to be used.
-         *
-         * @param processor  the original processor
-         * @param prepared   the prepared processor
-         * @param exchange   the exchange
-         */
-        private ProcessorExchangePair(Processor processor, Processor prepared, Exchange exchange)
{
+        private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared,
Exchange exchange) {
+            this.index = index;
             this.processor = processor;
             this.prepared = prepared;
             this.exchange = exchange;
         }
 
-        public Processor getProcessor() {
-            return processor;
+        public int getIndex() {
+            return index;
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        public Producer getProducer() {
+            if (processor instanceof Producer) {
+                return (Producer) processor;
+            }
+            return null;
         }
 
-        public Processor getPrepared() {
+        public Processor getProcessor() {
             return prepared;
         }
 
-        public Exchange getExchange() {
-            return exchange;
+        public void begin() {
+            // noop
+            LOG.trace("ProcessorExchangePair #" + index + " begin: " + exchange);
+        }
+
+        public void done() {
+            // noop
+            LOG.trace("ProcessorExchangePair #" + index + " done: " + exchange);
         }
+
     }
 
     private final CamelContext camelContext;
@@ -118,7 +134,6 @@ public class MulticastProcessor extends 
     public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors,
AggregationStrategy aggregationStrategy,
                               boolean parallelProcessing, ExecutorService executorService,
boolean streaming, boolean stopOnException) {
         notNull(camelContext, "camelContext");
-        notNull(processors, "processors");
         this.camelContext = camelContext;
         this.processors = processors;
         this.aggregationStrategy = aggregationStrategy;
@@ -143,33 +158,51 @@ public class MulticastProcessor extends 
     }
 
     public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        boolean sync = true;
+
         final AtomicExchange result = new AtomicExchange();
-        final Iterable<ProcessorExchangePair> pairs = createProcessorExchangePairs(exchange);
+        final Iterable<ProcessorExchangePair> pairs;
 
         // multicast uses fine grained error handling on the output processors
         // so use try .. catch to cater for this
         try {
+            pairs = createProcessorExchangePairs(exchange);
             if (isParallelProcessing()) {
                 // ensure an executor is set when running in parallel
                 ObjectHelper.notNull(executorService, "executorService", this);
-                doProcessParallel(result, pairs, isStreaming());
+                doProcessParallel(exchange, result, pairs, isStreaming(), callback);
             } else {
-                doProcessSequential(result, pairs);
+                sync = doProcessSequential(exchange, result, pairs, callback);
+            }
+
+            if (!sync) {
+                // the remainder of the multicast will be completed async
+                // so we break out now, then the callback will be invoked which then continue
routing from where we left here
+                return false;
             }
 
+            // copy results back to the original exchange
             if (result.get() != null) {
                 ExchangeHelper.copyResults(exchange, result.get());
             }
-        } catch (Exception e) {
+        } catch (Throwable e) {
             // multicast uses error handling on its output processors and they have tried
to redeliver
             // so we shall signal back to the other error handlers that we are exhausted
and they should not
             // also try to redeliver as we will then do that twice
             exchange.setProperty(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE);
             exchange.setException(e);
         }
+
+        callback.done(true);
+        return true;
     }
 
-    protected void doProcessParallel(final AtomicExchange result, Iterable<ProcessorExchangePair>
pairs, boolean streaming) throws InterruptedException, ExecutionException {
+    protected void doProcessParallel(final Exchange original, final AtomicExchange result,
Iterable<ProcessorExchangePair> pairs,
+                                     boolean streaming, final AsyncCallback callback) throws
InterruptedException, ExecutionException {
         final CompletionService<Exchange> completion;
         final AtomicBoolean running = new AtomicBoolean(true);
 
@@ -183,9 +216,9 @@ public class MulticastProcessor extends 
 
         final AtomicInteger total = new AtomicInteger(0);
 
-        for (ProcessorExchangePair pair : pairs) {
-            final Processor processor = pair.getProcessor();
-            final Processor prepared = pair.getPrepared();
+        final Iterator<ProcessorExchangePair> it = pairs.iterator();
+        while (it.hasNext()) {
+            final ProcessorExchangePair pair = it.next();
             final Exchange subExchange = pair.getExchange();
             updateNewExchange(subExchange, total.intValue(), pairs);
 
@@ -196,7 +229,7 @@ public class MulticastProcessor extends 
                         return subExchange;
                     }
 
-                    doProcess(processor, prepared, subExchange);
+                    doProcess(original, result, it, pair, callback);
 
                     // should we stop in case of an exception occurred during processing?
                     if (stopOnException && subExchange.getException() != null) {
@@ -228,16 +261,28 @@ public class MulticastProcessor extends 
         }
     }
 
-    protected void doProcessSequential(AtomicExchange result, Iterable<ProcessorExchangePair>
pairs) throws Exception {
+    protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair>
pairs, AsyncCallback callback) throws Exception {
         int total = 0;
+        Iterator<ProcessorExchangePair> it = pairs.iterator();
 
-        for (ProcessorExchangePair pair : pairs) {
-            Processor processor = pair.getProcessor();
-            Processor prepared = pair.getPrepared();
+        while (it.hasNext()) {
+            ProcessorExchangePair pair = it.next();
             Exchange subExchange = pair.getExchange();
             updateNewExchange(subExchange, total, pairs);
 
-            doProcess(processor, prepared, subExchange);
+            boolean sync = doProcess(original, result, it, pair, callback);
+            if (!sync) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Processing exchangeId: " + pair.getExchange().getExchangeId()
+ " is continued being processed asynchronously");
+                }
+                // the remainder of the multicast will be completed async
+                // so we break out now, then the callback will be invoked which then continue
routing from where we left here
+                return false;
+            }
+
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Processing exchangeId: " + pair.getExchange().getExchangeId()
+ " is continued being processed synchronously");
+            }
 
             // should we stop in case of an exception occurred during processing?
             if (stopOnException && subExchange.getException() != null) {
@@ -257,14 +302,23 @@ public class MulticastProcessor extends 
         if (LOG.isDebugEnabled()) {
             LOG.debug("Done sequential processing " + total + " exchanges");
         }
+
+        return true;
     }
 
-    private void doProcess(Processor processor, Processor prepared, Exchange exchange) {
+    private boolean doProcess(final Exchange original, final AtomicExchange result, final
Iterator<ProcessorExchangePair> it,
+                              final ProcessorExchangePair pair, final AsyncCallback callback)
{
+        boolean sync = true;
+
+        final Exchange exchange = pair.getExchange();
+        Processor processor = pair.getProcessor();
+        Producer producer = pair.getProducer();
+
         TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes()
: null;
 
         // compute time taken if sending to another endpoint
         StopWatch watch = null;
-        if (processor instanceof Producer) {
+        if (producer != null) {
             watch = new StopWatch();
         }
 
@@ -275,21 +329,86 @@ public class MulticastProcessor extends 
             }
 
             // let the prepared process it
-            prepared.process(exchange);
-        } catch (Exception e) {
-            exchange.setException(e);
+            AsyncProcessor async = AsyncProcessorTypeConverter.convert(processor);
+            pair.begin();
+            sync = async.process(exchange, new AsyncCallback() {
+                public void done(boolean doneSync) {
+                    pair.done();
+
+                    // we only have to handle async completion of the routing slip
+                    if (doneSync) {
+                        return;
+                    }
+
+                    // TODO: total number
+                    // continue processing the multicast asynchronously
+                    Exchange subExchange = exchange;
+                    int total = 0;
+
+                    while (it.hasNext()) {
+
+                        if (stopOnException && exchange.getException() != null) {
+                            // wrap in exception to explain where it failed
+                            exchange.setException(new CamelExchangeException("Sequential
processing failed for number " + total, subExchange, subExchange.getException()));
+                            callback.done(false);
+                            return;
+                        }
+
+                        if (aggregationStrategy != null) {
+                            doAggregate(result, subExchange);
+                        }
+
+                        if (it.hasNext()) {
+                            // prepare and run the next
+                            ProcessorExchangePair pair = it.next();
+                            subExchange = pair.getExchange();
+                            updateNewExchange(subExchange, total, null);
+                            boolean sync = doProcess(original, result, it, pair, callback);
+
+                            if (!sync) {
+                                if (LOG.isTraceEnabled()) {
+                                    LOG.trace("Processing exchangeId: " + original.getExchangeId()
+ " is continued being processed asynchronously");
+                                }
+                                return;
+                            }
+
+                            total++;
+                        }
+                    }
+
+                    // remember to test for stop on exception and aggregate before copying
back results
+                    if (stopOnException && exchange.getException() != null) {
+                        // wrap in exception to explain where it failed
+                        exchange.setException(new CamelExchangeException("Sequential processing
failed for number " + total, subExchange, subExchange.getException()));
+                        callback.done(false);
+                        return;
+                    }
+
+                    if (aggregationStrategy != null) {
+                        doAggregate(result, subExchange);
+                    }
+
+                    // copy results back to the original exchange
+                    if (result.get() != null) {
+                        ExchangeHelper.copyResults(original, result.get());
+                    }
+                    callback.done(false);
+                }
+            });
         } finally {
             // pop the block so by next round we have the same staring point and thus the
tracing looks accurate
             if (traced != null) {
                 traced.popBlock();
             }
-            if (processor instanceof Producer) {
+            if (producer != null) {
                 long timeTaken = watch.stop();
-                Endpoint endpoint = ((Producer) processor).getEndpoint();
+                Endpoint endpoint = producer.getEndpoint();
                 // emit event that the exchange was sent to the endpoint
                 EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint,
timeTaken);
             }
         }
+
+        return sync;
     }
 
     /**
@@ -314,9 +433,10 @@ public class MulticastProcessor extends 
     protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange
exchange) throws Exception {
         List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size());
 
+        int index = 0;
         for (Processor processor : processors) {
             Exchange copy = exchange.copy();
-            result.add(createProcessorExchangePair(processor, copy));
+            result.add(createProcessorExchangePair(index++, processor, copy));
         }
 
         return result;
@@ -332,7 +452,7 @@ public class MulticastProcessor extends 
      * @param exchange   the exchange
      * @return prepared for use
      */
-    protected static ProcessorExchangePair createProcessorExchangePair(Processor processor,
Exchange exchange) {
+    protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor,
Exchange exchange) {
         Processor prepared = processor;
 
         // set property which endpoint we send to
@@ -355,7 +475,7 @@ public class MulticastProcessor extends 
             }
         }
 
-        return new ProcessorExchangePair(processor, prepared, exchange);
+        return new DefaultProcessorExchangePair(index, processor, prepared, exchange);
     }
 
     protected void doStart() throws Exception {
@@ -369,7 +489,7 @@ public class MulticastProcessor extends 
         ServiceHelper.stopServices(processors);
     }
 
-    private static void setToEndpoint(Exchange exchange, Processor processor) {
+    protected static void setToEndpoint(Exchange exchange, Processor processor) {
         if (processor instanceof Producer) {
             Producer producer = (Producer) processor;
             exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java?rev=956902&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
Tue Jun 22 14:11:05 2010
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+
+/**
+ * Exchange pair to be executed by {@link org.apache.camel.processor.MulticastProcessor}.
+ *
+ * @version $Revision$
+ */
+public interface ProcessorExchangePair {
+
+    int getIndex();
+
+    Exchange getExchange();
+
+    Producer getProducer();
+
+    Processor getProcessor();
+
+    void begin();
+
+    void done();
+
+}
\ No newline at end of file

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ProcessorExchangePair.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Tue
Jun 22 14:11:05 2010
@@ -23,16 +23,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
-import org.apache.camel.Processor;
-import org.apache.camel.Producer;
 import org.apache.camel.impl.ProducerCache;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
@@ -49,7 +50,7 @@ import static org.apache.camel.util.Obje
  *
  * @version $Revision$
  */
-public class RecipientList extends ServiceSupport implements Processor {
+public class RecipientList extends ServiceSupport implements AsyncProcessor {
     private static final transient Log LOG = LogFactory.getLog(RecipientList.class);
     private final CamelContext camelContext;
     private ProducerCache producerCache;
@@ -93,53 +94,40 @@ public class RecipientList extends Servi
     }
 
     public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
         if (!isStarted()) {
             throw new IllegalStateException("RecipientList has not been started: " + this);
         }
 
         Object recipientList = expression.evaluate(exchange, Object.class);
-        sendToRecipientList(exchange, recipientList);
+        return sendToRecipientList(exchange, recipientList, callback);
+    }
+
+    public boolean sendToRecipientList(Exchange exchange, Object routingSlip) {
+        // this method is invoked from @RecipientList so we bridge with an empty callback
+        // TODO: Have @RecipientList support async out of the box
+        return sendToRecipientList(exchange, routingSlip, new AsyncCallback() {
+            public void done(boolean doneSync) {
+                // noop
+            }
+        });
     }
 
     /**
      * Sends the given exchange to the recipient list
      */
-    public void sendToRecipientList(Exchange exchange, Object recipientList) throws Exception
{
+    public boolean sendToRecipientList(Exchange exchange, Object recipientList, AsyncCallback
callback) {
         Iterator<Object> iter = ObjectHelper.createIterator(recipientList, delimiter);
 
-        // we should acquire and release the producers we need so we can leverage the producer
-        // cache to the fullest
-        Map<Endpoint, Producer> producers = new LinkedHashMap<Endpoint, Producer>();
-        try {
-            List<Processor> processors = new ArrayList<Processor>();
-            while (iter.hasNext()) {
-                Object recipient = iter.next();
-                try {
-                    Endpoint endpoint = resolveEndpoint(exchange, recipient);
-                    // acquire producer which we then release later
-                    Producer producer = producerCache.acquireProducer(endpoint);
-                    processors.add(producer);
-                    producers.put(endpoint, producer);
-                } catch (Exception e) {
-                    if (isIgnoreInvalidEndpoints()) {
-                        LOG.info("Endpoint uri is invalid: " + recipient + ". This exception
will be ignored.", e);
-                    } else {
-                        throw e;
-                    }
-                }
-            }
+        RecipientListProcessor rlp = new RecipientListProcessor(exchange.getContext(), producerCache,
iter, getAggregationStrategy(),
+                                                                isParallelProcessing(), getExecutorService(),
false, isStopOnException());
+        rlp.setIgnoreInvalidEndpoints(isIgnoreInvalidEndpoints());
 
-            MulticastProcessor mp = new MulticastProcessor(exchange.getContext(), processors,
getAggregationStrategy(),
-                                                           isParallelProcessing(), getExecutorService(),
false, isStopOnException());
-
-            // now let the multicast process the exchange
-            mp.process(exchange);
-        } finally {
-            // and release the producers back to the producer cache
-            for (Map.Entry<Endpoint, Producer> entry : producers.entrySet()) {
-                producerCache.releaseProducer(entry.getKey(), entry.getValue());
-            }
-        }
+        // now let the multicast process the exchange
+        return rlp.process(exchange, callback);
     }
 
     protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
@@ -162,7 +150,7 @@ public class RecipientList extends Servi
     protected void doStop() throws Exception {
         ServiceHelper.stopService(producerCache);
     }
-    
+
     public boolean isIgnoreInvalidEndpoints() {
         return ignoreInvalidEndpoints;
     }

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java?rev=956902&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
(added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
Tue Jun 22 14:11:05 2010
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.builder.ErrorHandlerBuilder;
+import org.apache.camel.impl.ProducerCache;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Implements a dynamic <a
+ * href="http://camel.apache.org/recipient-list.html">Recipient List</a>
+ * pattern where the list of actual endpoints to send a message exchange to are
+ * dependent on some dynamic expression.
+ * <p/>
+ * This implementation is a specialized {@link org.apache.camel.processor.MulticastProcessor}
which is based
+ * on recipient lists. This implementation have to handle the fact the processors is not
known at design time
+ * but evaluated at runtime from the dynamic recipient list. Therefore this implementation
have to at runtime
+ * lookup endpoints and create producers which should act as the processors for the multicast
processors which
+ * runs under the hood. Also this implementation supports the asynchronous routing engine
which makes the code
+ * more trickier.
+ *
+ * @version $Revision$
+ */
+public class RecipientListProcessor extends MulticastProcessor {
+
+    private static final transient Log LOG = LogFactory.getLog(RecipientListProcessor.class);
+    private final Iterator<Object> iter;
+    private boolean ignoreInvalidEndpoints;
+    private ProducerCache producerCache;
+
+    /**
+     * Class that represent each step in the recipient list to do
+     * <p/>
+     * This implementation ensures the provided producer is being released back in the producer
cache when
+     * its done using it.
+     */
+    static final class RecipientProcessorExchangePair implements ProcessorExchangePair {
+        private final int index;
+        private final Endpoint endpoint;
+        private final Producer producer;
+        private Processor prepared;
+        private final Exchange exchange;
+        private final ProducerCache producerCache;
+
+        private RecipientProcessorExchangePair(int index, ProducerCache producerCache, Endpoint
endpoint, Producer producer,
+                                               Processor prepared, Exchange exchange) {
+            this.index = index;
+            this.producerCache = producerCache;
+            this.endpoint = endpoint;
+            this.producer = producer;
+            this.prepared = prepared;
+            this.exchange = exchange;
+        }
+
+        public int getIndex() {
+            return index;
+        }
+
+        public Exchange getExchange() {
+            return exchange;
+        }
+
+        public Producer getProducer() {
+            return producer;
+        }
+
+        public Processor getProcessor() {
+            return prepared;
+        }
+
+        public void begin() {
+            // we have already acquired and prepare the producer so we
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("RecipientProcessorExchangePair #" + index + " begin: " + exchange);
+            }
+        }
+
+        public void done() {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("RecipientProcessorExchangePair #" + index + " done: " + exchange);
+            }
+            // when we are done we should release back in pool
+            try {
+                producerCache.releaseProducer(endpoint, producer);
+            } catch (Exception e) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Error releasing producer: " + producer + ". This exception
will be ignored.", e);
+                }
+            }
+        }
+
+    }
+
+    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache,
Iterator<Object> iter) {
+        super(camelContext, null);
+        this.producerCache = producerCache;
+        this.iter = iter;
+    }
+
+    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache,
Iterator<Object> iter, AggregationStrategy aggregationStrategy) {
+        super(camelContext, null, aggregationStrategy);
+        this.producerCache = producerCache;
+        this.iter = iter;
+    }
+
+    public RecipientListProcessor(CamelContext camelContext, ProducerCache producerCache,
Iterator<Object> iter, AggregationStrategy aggregationStrategy,
+                                  boolean parallelProcessing, ExecutorService executorService,
boolean streaming, boolean stopOnException) {
+        super(camelContext, null, aggregationStrategy, parallelProcessing, executorService,
streaming, stopOnException);
+        this.producerCache = producerCache;
+        this.iter = iter;
+    }
+
+    public boolean isIgnoreInvalidEndpoints() {
+        return ignoreInvalidEndpoints;
+    }
+
+    public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
+        this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
+    }
+
+    @Override
+    protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange
exchange) throws Exception {
+        // here we iterate the recipient lists and create the exchange pair for each of those
+        List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>();
+
+        // at first we must lookup the endpoint and acquire the producer which can send to
the endpoint
+        int index = 0;
+        while (iter.hasNext()) {
+            Object recipient = iter.next();
+            Endpoint endpoint;
+            Producer producer;
+            try {
+                endpoint = resolveEndpoint(exchange, recipient);
+                producer = producerCache.acquireProducer(endpoint);
+            } catch (Exception e) {
+                if (isIgnoreInvalidEndpoints()) {
+                    LOG.info("Endpoint uri is invalid: " + recipient + ". This exception
will be ignored.", e);
+                    continue;
+                } else {
+                    // failure so break out
+                    throw e;
+                }
+            }
+
+            // then create the exchange pair
+            Exchange copy = exchange.copy();
+            result.add(createProcessorExchangePair(index++, endpoint, producer, copy));
+        }
+
+        return result;
+    }
+
+    /**
+     * This logic is similar to MulticastProcessor but we have to return a RecipientProcessorExchangePair
instead
+     */
+    protected ProcessorExchangePair createProcessorExchangePair(int index, Endpoint endpoint,
Producer producer, Exchange exchange) {
+        Processor prepared = producer;
+
+        // set property which endpoint we send to
+        setToEndpoint(exchange, prepared);
+
+        // rework error handling to support fine grained error handling
+        if (exchange.getUnitOfWork() != null && exchange.getUnitOfWork().getRouteContext()
!= null) {
+            // wrap the producer in error handler so we have fine grained error handling
on
+            // the output side instead of the input side
+            // this is needed to support redelivery on that output alone and not doing redelivery
+            // for the entire multicast block again which will start from scratch again
+            RouteContext routeContext = exchange.getUnitOfWork().getRouteContext();
+            ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
+            // create error handler (create error handler directly to keep it light weight,
+            // instead of using ProcessorDefinition.wrapInErrorHandler)
+            try {
+                prepared = builder.createErrorHandler(routeContext, prepared);
+            } catch (Exception e) {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+
+        return new RecipientProcessorExchangePair(index, producerCache, endpoint, producer,
prepared, exchange);
+    }
+
+    protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
+        // trim strings as end users might have added spaces between separators
+        if (recipient instanceof String) {
+            recipient = ((String) recipient).trim();
+        }
+        return ExchangeHelper.resolveEndpoint(exchange, recipient);
+    }
+
+    protected void doStart() throws Exception {
+        super.doStart();
+        if (producerCache == null) {
+            producerCache = new ProducerCache(this, getCamelContext());
+            // add it as a service so we can manage it
+            getCamelContext().addService(producerCache);
+        }
+        ServiceHelper.startService(producerCache);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(producerCache);
+        super.doStop();
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientListProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RoutingSlip.java Tue Jun
22 14:11:05 2010
@@ -268,7 +268,7 @@ public class RoutingSlip extends Service
                 exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
                 boolean sync = asyncProducer.process(exchange, new AsyncCallback() {
                     public void done(boolean doneSync) {
-                        // we only have to handle async completion of the pipeline
+                        // we only have to handle async completion of the routing slip
                         if (doneSync) {
                             return;
                         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Tue Jun
22 14:11:05 2010
@@ -23,6 +23,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
@@ -43,7 +45,7 @@ import static org.apache.camel.util.Obje
  *
  * @version $Revision$
  */
-public class Splitter extends MulticastProcessor implements Processor, Traceable {
+public class Splitter extends MulticastProcessor implements AsyncProcessor, Traceable {
     private final Expression expression;
 
     public Splitter(CamelContext camelContext, Expression expression, Processor destination,
AggregationStrategy aggregationStrategy) {
@@ -70,8 +72,8 @@ public class Splitter extends MulticastP
     }
 
     @Override
-    public void process(Exchange exchange) throws Exception {
-        AggregationStrategy strategy = getAggregationStrategy();
+    public boolean process(Exchange exchange, final AsyncCallback callback) {
+        final AggregationStrategy strategy = getAggregationStrategy();
 
         // if original aggregation strategy then store exchange
         // on it as the original exchange
@@ -81,11 +83,14 @@ public class Splitter extends MulticastP
             original.setOriginal(exchange);
         }
 
-        super.process(exchange);
-
-        if (original != null) {
-            // and remove the reference when we are done (due to thread local stuff)
-            original.setOriginal(null);
+        // TODO: we will lose the original in the async routing engine when it return false
+        try {
+            return super.process(exchange, callback);
+        } finally {
+            if (original != null) {
+                // and remove the reference when we are done (due to thread local stuff)
+                original.setOriginal(null);
+            }
         }
     }
 
@@ -108,6 +113,8 @@ public class Splitter extends MulticastP
             public Iterator iterator() {
                 return new Iterator() {
 
+                    private int index;
+
                     public boolean hasNext() {
                         return iterator.hasNext();
                     }
@@ -121,7 +128,7 @@ public class Splitter extends MulticastP
                             Message in = newExchange.getIn();
                             in.setBody(part);
                         }
-                        return createProcessorExchangePair(getProcessors().iterator().next(),
newExchange);
+                        return createProcessorExchangePair(index++, getProcessors().iterator().next(),
newExchange);
                     }
 
                     public void remove() {
@@ -141,6 +148,8 @@ public class Splitter extends MulticastP
         } else {
             result = new ArrayList<ProcessorExchangePair>();
         }
+
+        int index = 0;
         Iterator<Object> iter = ObjectHelper.createIterator(value);
         while (iter.hasNext()) {
             Object part = iter.next();
@@ -151,7 +160,7 @@ public class Splitter extends MulticastP
                 Message in = newExchange.getIn();
                 in.setBody(part);
             }
-            result.add(createProcessorExchangePair(getProcessors().iterator().next(), newExchange));
+            result.add(createProcessorExchangePair(index++, getProcessors().iterator().next(),
newExchange));
         }
         return result;
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
Tue Jun 22 14:11:05 2010
@@ -39,8 +39,7 @@ public class AsyncEndpointRecipientList3
 
         assertMockEndpointsSatisfied();
 
-        // use same threads as its recipient list, and direct is the last in the recipient
list
-        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java
(from r956800, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java&r1=956800&r2=956902&rev=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList3Test.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientList4Test.java
Tue Jun 22 14:11:05 2010
@@ -24,7 +24,7 @@ import org.apache.camel.builder.RouteBui
 /**
  * @version $Revision$
  */
-public class AsyncEndpointRecipientList3Test extends ContextTestSupport {
+public class AsyncEndpointRecipientList4Test extends ContextTestSupport {
 
     private static String beforeThreadName;
     private static String afterThreadName;
@@ -39,8 +39,7 @@ public class AsyncEndpointRecipientList3
 
         assertMockEndpointsSatisfied();
 
-        // use same threads as its recipient list, and direct is the last in the recipient
list
-        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override
@@ -58,7 +57,7 @@ public class AsyncEndpointRecipientList3
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .recipientList(constant("async:Hi Camel,direct:foo"));
+                        .recipientList(constant("async:Hi Camel,async:Hi World,direct:foo"));
 
                 from("direct:foo")
                         .process(new Processor() {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListTest.java
Tue Jun 22 14:11:05 2010
@@ -37,8 +37,7 @@ public class AsyncEndpointRecipientListT
         String reply = template.requestBody("direct:start", "Hello Camel", String.class);
         assertEquals("Bye Camel", reply);
 
-        // should use same threads (recipient list is not async supported yet)
-        assertTrue("Should use same threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
+        assertFalse("Should use different threads", beforeThreadName.equalsIgnoreCase(afterThreadName));
     }
 
     @Override

Modified: camel/trunk/camel-core/src/test/resources/log4j.properties
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=956902&r1=956901&r2=956902&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/resources/log4j.properties (original)
+++ camel/trunk/camel-core/src/test/resources/log4j.properties Tue Jun 22 14:11:05 2010
@@ -28,6 +28,9 @@ log4j.logger.org.apache.activemq.spring=
 #log4j.logger.org.apache.camel.component.mock=DEBUG
 #log4j.logger.org.apache.camel.component.file=TRACE
 #log4j.logger.org.apache.camel.processor.Pipeline=TRACE
+#log4j.logger.org.apache.camel.processor.MulticastProcessor=TRACE
+#log4j.logger.org.apache.camel.processor.RecipientList=TRACE
+#log4j.logger.org.apache.camel.processor.RecipientListProcessor=TRACE
 #log4j.logger.org.apache.camel.processor.RoutingSlip=TRACE
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
 log4j.logger.org.apache.camel.impl.converter=WARN



Mime
View raw message