camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1137592 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/dataset/ camel-core/src/main/java/org/apache/camel/component/mock/ camel-core/src/main/java/org/apache/camel/impl/ ca...
Date Mon, 20 Jun 2011 10:48:39 GMT
Author: davsclaus
Date: Mon Jun 20 10:48:38 2011
New Revision: 1137592

URL: http://svn.apache.org/viewvc?rev=1137592&view=rev
Log:
CAMEL-1817: Moved defensive copy of Exchange from Pipeline to RedeliveryErrorHandler. CAMEL-4117: Fixed issue with redelivery the passed in Exchange could contain state from the previous attempts. CAMEL-4121: Improved doc on using InOnly/InOut DSL and how its related to SetExchangePattern.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastRedeliverTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ProcessorMutateExchangeRedeliverTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListRedeliverTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipRedeliverTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnExceptionOnRedeliveryTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SetExchangePatternTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateHeadersTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsGetHeaderKeyFormatIssueWithContentTypeHeaderTest.java
    camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzRouteTest.java
    camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ExchangePatternTest.scala
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithFileLocalOnExceptionTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java Mon Jun 20 10:48:38 2011
@@ -85,6 +85,15 @@ public interface Channel extends AsyncPr
     void initChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception;
 
     /**
+     * Post initializes the channel.
+     *
+     * @param outputDefinition  the route definition the {@link Channel} represents
+     * @param routeContext      the route context
+     * @throws Exception is thrown if some error occurred
+     */
+    void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception;
+
+    /**
      * If the initialized output definition contained outputs (children) then we need to
      * set the child so we can leverage fine grained tracing
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Mon Jun 20 10:48:38 2011
@@ -183,7 +183,7 @@ public class DataSetEndpoint extends Moc
     //-------------------------------------------------------------------------
 
     @Override
-    protected void performAssertions(Exchange actual) throws Exception {
+    protected void performAssertions(Exchange actual, Exchange copy) throws Exception {
         int receivedCount = receivedCounter.incrementAndGet();
         long index = receivedCount - 1;
         Exchange expected = createExchange(index);
@@ -191,13 +191,13 @@ public class DataSetEndpoint extends Moc
         // now lets assert that they are the same
         if (log.isDebugEnabled()) {
             log.debug("Received message: {} (DataSet index={}) = {}",
-                    new Object[]{index, actual.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), actual});
+                    new Object[]{index, copy.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class), copy});
         }
 
-        assertMessageExpected(index, expected, actual);
+        assertMessageExpected(index, expected, copy);
 
         if (reporter != null) {
-            reporter.process(actual);
+            reporter.process(copy);
         }
 
         if (consumeDelay > 0) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java Mon Jun 20 10:48:38 2011
@@ -43,13 +43,13 @@ import org.apache.camel.Message;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.builder.ProcessorBuilder;
 import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.impl.InterceptSendToEndpoint;
 import org.apache.camel.spi.BrowsableEndpoint;
 import org.apache.camel.util.CamelContextHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ExpressionComparator;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
@@ -984,7 +984,9 @@ public class MockEndpoint extends Defaul
             if (reporter != null) {
                 reporter.process(exchange);
             }
-            performAssertions(exchange);
+            // copy the exchange so the mock stores the copy and not the actual exchange
+            Exchange copy = ExchangeHelper.createCopy(exchange, true);
+            performAssertions(exchange, copy);
         } catch (Throwable e) {
             // must catch java.lang.Throwable as AssertionException extends java.lang.Error
             failures.add(e);
@@ -996,8 +998,15 @@ public class MockEndpoint extends Defaul
         }
     }
 
-    protected void performAssertions(Exchange exchange) throws Exception {
-        Message in = exchange.getIn();
+    /**
+     * Performs the assertions on the incoming exchange.
+     *
+     * @param exchange   the actual exchange
+     * @param copy       a copy of the exchange (only store this)
+     * @throws Exception can be thrown if something went wrong
+     */
+    protected void performAssertions(Exchange exchange, Exchange copy) throws Exception {
+        Message in = copy.getIn();
         Object actualBody = in.getBody();
 
         if (headerName != null) {
@@ -1005,7 +1014,7 @@ public class MockEndpoint extends Defaul
         }
 
         if (propertyName != null) {
-            actualProperty = exchange.getProperty(propertyName);
+            actualProperty = copy.getProperty(propertyName);
         }
 
         if (expectedBodyValues != null) {
@@ -1021,23 +1030,25 @@ public class MockEndpoint extends Defaul
 
         // let counter be 0 index-based in the logs
         if (LOG.isDebugEnabled()) {
-            String msg = getEndpointUri() + " >>>> " + counter + " : " + exchange + " with body: " + actualBody;
-            if (exchange.getIn().hasHeaders()) {
-                msg += " and headers:" + exchange.getIn().getHeaders();
+            String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody;
+            if (copy.getIn().hasHeaders()) {
+                msg += " and headers:" + copy.getIn().getHeaders();
             }
             LOG.debug(msg);
         }
         ++counter;
 
         // record timestamp when exchange was received
-        exchange.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
-        receivedExchanges.add(exchange);
+        copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
+        receivedExchanges.add(copy);
 
         Processor processor = processors.get(getReceivedCounter()) != null
                 ? processors.get(getReceivedCounter()) : defaultProcessor;
 
         if (processor != null) {
             try {
+                // must process the incoming exchange and NOT the copy as the idea
+                // is the end user can manipulate the exchange
                 processor.process(exchange);
             } catch (Exception e) {
                 // set exceptions on exchange so we can throw exceptions to simulate errors

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultUnitOfWork.java Mon Jun 20 10:48:38 2011
@@ -83,6 +83,8 @@ public class DefaultUnitOfWork implement
             this.originalInMessage = exchange.getIn().copy();
         }
 
+        // TODO: Optimize to only copy if useOriginalMessage has been enabled
+
         // mark the creation time when this Exchange was created
         if (exchange.getProperty(Exchange.CREATED_TIMESTAMP) == null) {
             exchange.setProperty(Exchange.CREATED_TIMESTAMP, new Date());

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Jun 20 10:48:38 2011
@@ -276,7 +276,10 @@ public abstract class ProcessorDefinitio
             wrapChannelInErrorHandler(channel, routeContext);
         }
 
+        // do post init at the end
+        channel.postInitChannel(defn, routeContext);
         log.trace("{} wrapped in Channel: {}", defn, channel);
+
         return channel;
     }
 
@@ -632,7 +635,6 @@ public abstract class ProcessorDefinitio
         return (Type) this;
     }
 
-
     /**
      * Sends the exchange to the given endpoint
      *
@@ -647,6 +649,8 @@ public abstract class ProcessorDefinitio
     
     /**
      * Sends the exchange with certain exchange pattern to the given endpoint
+     * <p/>
+     * Notice the existing MEP is preserved
      *
      * @param pattern the pattern to use for the message exchange
      * @param uri  the endpoint to send to
@@ -657,10 +661,11 @@ public abstract class ProcessorDefinitio
         addOutput(new ToDefinition(uri, pattern));
         return (Type) this;
     }   
-    
 
     /**
      * Sends the exchange with certain exchange pattern to the given endpoint
+     * <p/>
+     * Notice the existing MEP is preserved
      *
      * @param pattern the pattern to use for the message exchange
      * @param endpoint  the endpoint to send to
@@ -686,7 +691,6 @@ public abstract class ProcessorDefinitio
         return (Type) this;
     }
 
-
     /**
      * Sends the exchange to a list of endpoints
      *
@@ -714,10 +718,11 @@ public abstract class ProcessorDefinitio
         }
         return (Type) this;
     }
-    
-    
+
     /**
      * Sends the exchange to a list of endpoints
+     * <p/>
+     * Notice the existing MEP is preserved
      *
      * @param pattern the pattern to use for the message exchanges
      * @param uris  list of endpoints to send to
@@ -733,6 +738,8 @@ public abstract class ProcessorDefinitio
 
     /**
      * Sends the exchange to a list of endpoints
+     * <p/>
+     * Notice the existing MEP is preserved
      *
      * @param pattern the pattern to use for the message exchanges
      * @param endpoints  list of endpoints to send to
@@ -763,7 +770,9 @@ public abstract class ProcessorDefinitio
 
     /**
      * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
-     * set the ExchangePattern {@link ExchangePattern} into the exchange
+     * set the {@link ExchangePattern} into the {@link Exchange}.
+     * <p/>
+     * The pattern set on the {@link Exchange} will
      *
      * @param exchangePattern  instance of {@link ExchangePattern}
      * @return the builder
@@ -778,9 +787,10 @@ public abstract class ProcessorDefinitio
      * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
      * set the exchange's ExchangePattern {@link ExchangePattern} to be InOnly
      *
-     *
      * @return the builder
+     * @deprecated use {@link #setExchangePattern(org.apache.camel.ExchangePattern)} instead
      */
+    @Deprecated
     public Type inOnly() {
         return setExchangePattern(ExchangePattern.InOnly);
     }
@@ -789,6 +799,8 @@ public abstract class ProcessorDefinitio
      * Sends the message to the given endpoint using an
      * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param uri The endpoint uri which is used for sending the exchange
      * @return the builder
@@ -801,6 +813,8 @@ public abstract class ProcessorDefinitio
      * Sends the message to the given endpoint using an
      * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 
      * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param endpoint The endpoint which is used for sending the exchange
      * @return the builder
@@ -809,11 +823,12 @@ public abstract class ProcessorDefinitio
         return to(ExchangePattern.InOnly, endpoint);
     }
 
-
     /**
      * Sends the message to the given endpoints using an
      * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param uris  list of endpoints to send to
      * @return the builder
@@ -822,11 +837,12 @@ public abstract class ProcessorDefinitio
         return to(ExchangePattern.InOnly, uris);
     }
 
-
     /**
      * Sends the message to the given endpoints using an
      * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param endpoints  list of endpoints to send to
      * @return the builder
@@ -839,6 +855,8 @@ public abstract class ProcessorDefinitio
      * Sends the message to the given endpoints using an
      * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param endpoints  list of endpoints to send to
      * @return the builder
@@ -847,14 +865,14 @@ public abstract class ProcessorDefinitio
         return to(ExchangePattern.InOnly, endpoints);
     }
 
-
     /**
      * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
      * set the exchange's ExchangePattern {@link ExchangePattern} to be InOut
      *
-     *
      * @return the builder
+     * @deprecated use {@link #setExchangePattern(org.apache.camel.ExchangePattern)} instead
      */
+    @Deprecated
     public Type inOut() {
         return setExchangePattern(ExchangePattern.InOut);
     }
@@ -863,6 +881,8 @@ public abstract class ProcessorDefinitio
      * Sends the message to the given endpoint using an
      * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param uri The endpoint uri which is used for sending the exchange
      * @return the builder
@@ -871,11 +891,12 @@ public abstract class ProcessorDefinitio
         return to(ExchangePattern.InOut, uri);
     }
 
-
     /**
      * Sends the message to the given endpoint using an
      * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param endpoint The endpoint which is used for sending the exchange
      * @return the builder
@@ -888,6 +909,8 @@ public abstract class ProcessorDefinitio
      * Sends the message to the given endpoints using an
      * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param uris  list of endpoints to send to
      * @return the builder
@@ -896,11 +919,12 @@ public abstract class ProcessorDefinitio
         return to(ExchangePattern.InOut, uris);
     }
 
-
     /**
      * Sends the message to the given endpoints using an
      * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param endpoints  list of endpoints to send to
      * @return the builder
@@ -913,6 +937,8 @@ public abstract class ProcessorDefinitio
      * Sends the message to the given endpoints using an
      * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
      * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
+     * <p/>
+     * Notice the existing MEP is restored after the message has been sent to the given endpoint.
      *
      * @param endpoints  list of endpoints to send to
      * @return the builder
@@ -1570,7 +1596,6 @@ public abstract class ProcessorDefinitio
      * @param expression  to decide the destinations
      * @param uriDelimiter  is the delimiter that will be used to split up
      *                      the list of URIs in the routing slip.
-     * 
      * @return the builder
      */
     public RoutingSlipDefinition<Type> routingSlip(Expression expression, String uriDelimiter) {
@@ -1589,7 +1614,6 @@ public abstract class ProcessorDefinitio
      * The route slip will be evaluated <i>once</i>, use {@link #dynamicRouter()} if you need even more dynamic routing.
      *
      * @param expression  to decide the destinations
-     * 
      * @return the builder
      */
     public RoutingSlipDefinition<Type> routingSlip(Expression expression) {
@@ -2171,6 +2195,7 @@ public abstract class ProcessorDefinitio
 
     /**
      * Pushes the given block on the stack as current block
+     *
      * @param block  the block
      */
     void pushBlock(Block block) {
@@ -2179,6 +2204,7 @@ public abstract class ProcessorDefinitio
 
     /**
      * Pops the block off the stack as current block
+     *
      * @return the block
      */
     Block popBlock() {
@@ -2549,7 +2575,6 @@ public abstract class ProcessorDefinitio
         return (Type) this;
     }
 
-
     /**
      * Adds a processor which sets the exchange property
      *
@@ -2724,7 +2749,7 @@ public abstract class ProcessorDefinitio
      * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
      * <p/>
      * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
-     * to obatin the additional data, where as pollEnrich uses a polling consumer.
+     * to obtain the additional data, where as pollEnrich uses a polling consumer.
      *
      * @param resourceRef            Reference of resource endpoint for obtaining additional data.
      * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
@@ -2813,7 +2838,7 @@ public abstract class ProcessorDefinitio
      * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
      * <p/>
      * The difference between this and {@link #enrich(String)} is that this uses a consumer
-     * to obatin the additional data, where as enrich uses a producer.
+     * to obtain the additional data, where as enrich uses a producer.
      * <p/>
      * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
      * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
@@ -2836,7 +2861,7 @@ public abstract class ProcessorDefinitio
      * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
      * <p/>
      * The difference between this and {@link #enrich(String)} is that this uses a consumer
-     * to obatin the additional data, where as enrich uses a producer.
+     * to obtain the additional data, where as enrich uses a producer.
      * <p/>
      * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
      * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Mon Jun 20 10:48:38 2011
@@ -32,6 +32,7 @@ import org.apache.camel.Service;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.processor.interceptor.StreamCaching;
 import org.apache.camel.processor.interceptor.TraceFormatter;
 import org.apache.camel.processor.interceptor.TraceInterceptor;
 import org.apache.camel.processor.interceptor.Tracer;
@@ -89,6 +90,8 @@ public class DefaultChannel extends Serv
         // the errorHandler is already decorated with interceptors
         // so it contain the entire chain of processors, so we can safely use it directly as output
         // if no error handler provided we use the output
+        // TODO: Camel 3.0 we should determine the output dynamically at runtime instead of having the
+        // the error handlers, interceptors, etc. woven in at design time
         return errorHandler != null ? errorHandler : output;
     }
 
@@ -204,6 +207,10 @@ public class DefaultChannel extends Serv
             if (strategy instanceof Tracer) {
                 continue;
             }
+            // skip stream caching as it must be wrapped as outer most, which we do later
+            if (strategy instanceof StreamCaching) {
+                continue;
+            }
             // use the fine grained definition (eg the child if available). Its always possible to get back to the parent
             Processor wrapped = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), targetOutputDef, target, next);
             if (!(wrapped instanceof AsyncProcessor)) {
@@ -227,6 +234,21 @@ public class DefaultChannel extends Serv
         output = target;
     }
 
+    @Override
+    public void postInitChannel(ProcessorDefinition<?> outputDefinition, RouteContext routeContext) throws Exception {
+        for (InterceptStrategy strategy : interceptors) {
+            // apply stream caching at the end as it should be outer most
+            if (strategy instanceof StreamCaching) {
+                if (errorHandler != null) {
+                    errorHandler = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, errorHandler, null);
+                } else {
+                    output = strategy.wrapProcessorInInterceptors(routeContext.getCamelContext(), outputDefinition, output, null);
+                }
+                break;
+            }
+        }
+    }
+
     private InterceptStrategy getOrCreateTracer() {
         InterceptStrategy tracer = Tracer.getTracer(camelContext);
         if (tracer == null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ExchangePatternProcessor.java Mon Jun 20 10:48:38 2011
@@ -35,7 +35,7 @@ public class ExchangePatternProcessor im
     }
    
     public void process(Exchange exchange) throws Exception {
-        exchange.setPattern(exchangePattern);        
+        exchange.setPattern(exchangePattern);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Mon Jun 20 10:48:38 2011
@@ -25,7 +25,6 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ExchangeHelper;
@@ -157,20 +156,14 @@ public class Pipeline extends MulticastP
      * @return a new exchange
      */
     protected Exchange createNextExchange(Exchange previousExchange) {
-        Exchange answer = new DefaultExchange(previousExchange);
-        // we must use the same id as this is a snapshot strategy where Camel copies a snapshot
-        // before processing the next step in the pipeline, so we have a snapshot of the exchange
-        // just before. This snapshot is used if Camel should do redeliveries (re try) using
-        // DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
-        // exchange being routed.
-        answer.setExchangeId(previousExchange.getExchangeId());
-
-        answer.getProperties().putAll(previousExchange.getProperties());
+        Exchange answer = previousExchange;
 
         // now lets set the input of the next exchange to the output of the
         // previous message if it is not null
-        answer.setIn(previousExchange.hasOut() 
-            ? previousExchange.getOut().copy() : previousExchange.getIn().copy());
+        if (answer.hasOut()) {
+            answer.setIn(answer.getOut());
+            answer.setOut(null);
+        }
         return answer;
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java Mon Jun 20 10:48:38 2011
@@ -67,6 +67,7 @@ public abstract class RedeliveryErrorHan
      * Contains the current redelivery data
      */
     protected class RedeliveryData {
+        Exchange original;
         boolean sync = true;
         int redeliveryCounter;
         long redeliveryDelay;
@@ -103,7 +104,7 @@ public abstract class RedeliveryErrorHan
 
         public Boolean call() throws Exception {
             // prepare for redelivery
-            prepareExchangeForRedelivery(exchange);
+            prepareExchangeForRedelivery(exchange, data);
 
             // letting onRedeliver be executed at first
             deliverToOnRedeliveryProcessor(exchange, data);
@@ -206,11 +207,20 @@ public abstract class RedeliveryErrorHan
         return processErrorHandler(exchange, callback, new RedeliveryData());
     }
 
+    protected Exchange defensiveCopyExchange(Exchange exchange) {
+        // TODO: Optimize to only copy if redelivery is possible/enabled
+        return ExchangeHelper.createCopy(exchange, true);
+    }
+
     /**
      * Process the exchange using redelivery error handling.
      */
     protected boolean processErrorHandler(final Exchange exchange, final AsyncCallback callback, final RedeliveryData data) {
 
+        // do a defensive copy of the original Exchange, which is needed for redelivery so we can ensure the
+        // original Exchange is being redelivered, and not a mutated Exchange
+        data.original = defensiveCopyExchange(exchange);
+
         // use looping to have redelivery attempts
         while (true) {
 
@@ -298,7 +308,7 @@ public abstract class RedeliveryErrorHan
                 }
 
                 // prepare for redelivery
-                prepareExchangeForRedelivery(exchange);
+                prepareExchangeForRedelivery(exchange, data);
 
                 // letting onRedeliver be executed
                 deliverToOnRedeliveryProcessor(exchange, data);
@@ -502,8 +512,13 @@ public abstract class RedeliveryErrorHan
     protected void prepareExchangeForContinue(Exchange exchange, RedeliveryData data) {
         Exception caught = exchange.getException();
 
-        // continue is a kind of redelivery so reuse the logic to prepare
-        prepareExchangeForRedelivery(exchange);
+        // we continue so clear any exceptions
+        exchange.setException(null);
+        // clear rollback flags
+        exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
+        // reset cached streams so they can be read again
+        MessageHelper.resetStreamCache(exchange.getIn());
+
         // its continued then remove traces of redelivery attempted and caught exception
         exchange.getIn().removeHeader(Exchange.REDELIVERED);
         exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
@@ -520,15 +535,37 @@ public abstract class RedeliveryErrorHan
         logFailedDelivery(false, false, true, exchange, msg, data, null);
     }
 
-    protected void prepareExchangeForRedelivery(Exchange exchange) {
+    protected void prepareExchangeForRedelivery(Exchange exchange, RedeliveryData data) {
         // okay we will give it another go so clear the exception so we can try again
         exchange.setException(null);
 
         // clear rollback flags
         exchange.setProperty(Exchange.ROLLBACK_ONLY, null);
 
+        // TODO: We may want to store these as state on RedelieryData so we keep them in case end user messes with Exchange
+        // and then put these on the exchange when doing a redelivery / fault processor
+
+        // preserve these headers
+        Integer redeliveryCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_COUNTER, Integer.class);
+        Integer redeliveryMaxCounter = exchange.getIn().getHeader(Exchange.REDELIVERY_MAX_COUNTER, Integer.class);
+        Boolean redelivered = exchange.getIn().getHeader(Exchange.REDELIVERED, Boolean.class);
+
+        // we are redelivering so copy from original back to exchange
+        exchange.getIn().copyFrom(data.original.getIn());
+        exchange.setOut(null);
         // reset cached streams so they can be read again
         MessageHelper.resetStreamCache(exchange.getIn());
+
+        // put back headers
+        if (redeliveryCounter != null) {
+            exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, redeliveryCounter);
+        }
+        if (redeliveryMaxCounter != null) {
+            exchange.getIn().setHeader(Exchange.REDELIVERY_MAX_COUNTER, redeliveryMaxCounter);
+        }
+        if (redelivered != null) {
+            exchange.getIn().setHeader(Exchange.REDELIVERED, redelivered);
+        }
     }
 
     protected void handleException(Exchange exchange, RedeliveryData data) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SendProcessor.java Mon Jun 20 10:48:38 2011
@@ -76,12 +76,21 @@ public class SendProcessor extends Servi
             throw new IllegalStateException("SendProcessor has not been started: " + this);
         }
 
+        // we should preserve existing MEP so remember old MEP
+        // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
+        final ExchangePattern existingPattern = exchange.getPattern();
+
         // send the exchange to the destination using a producer
         producerCache.doInProducer(destination, exchange, pattern, new ProducerCallback<Exchange>() {
             public Exchange doInProducer(Producer producer, Exchange exchange, ExchangePattern pattern) throws Exception {
                 exchange = configureExchange(exchange, pattern);
                 log.debug(">>>> {} {}", destination, exchange);
-                producer.process(exchange);
+                try {
+                    producer.process(exchange);
+                } finally {
+                    // restore previous MEP
+                    exchange.setPattern(existingPattern);
+                }
                 return exchange;
             }
         });
@@ -92,12 +101,24 @@ public class SendProcessor extends Servi
             throw new IllegalStateException("SendProcessor has not been started: " + this);
         }
 
+        // we should preserve existing MEP so remember old MEP
+        // if you want to permanently to change the MEP then use .setExchangePattern in the DSL
+        final ExchangePattern existingPattern = exchange.getPattern();
+
         // send the exchange to the destination using a producer
         return producerCache.doInAsyncProducer(destination, exchange, pattern, callback, new AsyncProducerCallback() {
-            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, Exchange exchange, ExchangePattern pattern, AsyncCallback callback) {
-                exchange = configureExchange(exchange, pattern);
+            public boolean doInAsyncProducer(Producer producer, AsyncProcessor asyncProducer, final Exchange exchange,
+                                             ExchangePattern pattern, final AsyncCallback callback) {
+                final Exchange target = configureExchange(exchange, pattern);
                 log.debug(">>>> {} {}", destination, exchange);
-                return AsyncProcessorHelper.process(asyncProducer, exchange, callback);
+                return AsyncProcessorHelper.process(asyncProducer, target, new AsyncCallback() {
+                    public void done(boolean doneSync) {
+                        // restore previous MEP
+                        target.setPattern(existingPattern);
+                        // signal we are done
+                        callback.done(doneSync);
+                    }
+                });
             }
         });
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Mon Jun 20 10:48:38 2011
@@ -36,6 +36,7 @@ import org.apache.camel.NoSuchHeaderExce
 import org.apache.camel.NoSuchPropertyException;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConverter;
+import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.spi.UnitOfWork;
 
 /**
@@ -192,10 +193,16 @@ public final class ExchangeHelper {
      * @return the copy
      */
     public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
-        Exchange copy = exchange.copy();
+        Exchange copy = new DefaultExchange(exchange);
         if (preserveExchangeId) {
+            // must preserve exchange id
             copy.setExchangeId(exchange.getExchangeId());
         }
+        copy.getProperties().putAll(exchange.getProperties());
+        copy.setIn(exchange.getIn().copy());
+        if (exchange.hasOut()) {
+            copy.setOut(exchange.getOut().copy());
+        }
         return copy;
     }
 
@@ -212,6 +219,18 @@ public final class ExchangeHelper {
         //  TODO: merge logic with that of copyResultsPreservePattern()
         // --------------------------------------------------------------------
 
+        if (result == source) {
+            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
+            // and the result is not failed
+            if (result.getPattern() == ExchangePattern.InOptionalOut) {
+                // keep as is
+            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
+                // copy IN to OUT as we expect a OUT response
+                result.getOut().copyFrom(source.getIn());
+            }
+            return;
+        }
+
         if (result != source) {
             result.setException(source.getException());
             if (source.hasOut()) {
@@ -257,8 +276,15 @@ public final class ExchangeHelper {
         //  TODO: merge logic with that of copyResults()
         // --------------------------------------------------------------------
 
-        if (source == result) {
-            // no need to copy
+        if (result == source) {
+            // we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
+            // and the result is not failed
+            if (result.getPattern() == ExchangePattern.InOptionalOut) {
+                // keep as is
+            } else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
+                // copy IN to OUT as we expect a OUT response
+                result.getOut().copyFrom(source.getIn());
+            }
             return;
         }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnExceptionOnRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnExceptionOnRedeliveryTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnExceptionOnRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnExceptionOnRedeliveryTest.java Mon Jun 20 10:48:38 2011
@@ -34,7 +34,7 @@ public class DeadLetterChannelOnExceptio
 
     public void testGlobalOnRedelivery() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World123");
+        mock.expectedBodiesReceived("Hello World3");
 
         template.sendBody("direct:start", "Hello World");
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java Mon Jun 20 10:48:38 2011
@@ -31,7 +31,7 @@ public class DeadLetterChannelOnRedelive
 
     public void testOnExceptionAlterMessageBeforeRedelivery() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World123");
+        mock.expectedBodiesReceived("Hello World3");
 
         template.sendBody("direct:start", "Hello World");
 
@@ -40,7 +40,7 @@ public class DeadLetterChannelOnRedelive
 
     public void testOnExceptionAlterMessageWithHeadersBeforeRedelivery() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedBodiesReceived("Hello World123");
+        mock.expectedBodiesReceived("Hello World3");
         mock.expectedHeaderReceived("foo", "123");
 
         template.sendBodyAndHeader("direct:start", "Hello World", "foo", "123");

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastRedeliverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastRedeliverTest.java?rev=1137592&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastRedeliverTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastRedeliverTest.java Mon Jun 20 10:48:38 2011
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class MulticastRedeliverTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOk() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+
+        template.sendBody("direct:test1", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testThrowExceptionAtA() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:test2", "Hello World");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    public void testThrowExceptionAtB() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:test3", "Hello World");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // try to redeliver up till 3 times
+                errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:test1")
+                    .multicast().stopOnException()
+                        .to("mock:a").to("mock:b");
+
+                from("direct:test2")
+                    .multicast().stopOnException()
+                        .to("mock:a").to("direct:a").to("mock:b");
+
+                from("direct:test3")
+                    .multicast().stopOnException()
+                        .to("mock:a").to("mock:b").to("direct:b").to("mock:c");
+
+                from("direct:a")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+
+                from("direct:b")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            // mutate IN body
+                            exchange.getOut().setBody("Bye World");
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+            }
+        };
+    }
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ProcessorMutateExchangeRedeliverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ProcessorMutateExchangeRedeliverTest.java?rev=1137592&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ProcessorMutateExchangeRedeliverTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ProcessorMutateExchangeRedeliverTest.java Mon Jun 20 10:48:38 2011
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class ProcessorMutateExchangeRedeliverTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testRedeliverA() throws Exception {
+        counter = 0;
+
+        try {
+            template.sendBody("direct:a", "Hello World");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    public void testRedeliverB() throws Exception {
+        counter = 0;
+
+        try {
+            template.sendBody("direct:b", "Hello World");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // try to redeliver up till 3 times
+                errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:a")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            // mutate IN body
+                            exchange.getIn().setBody("Bye World");
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+
+                from("direct:b")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            // mutate OUT body
+                            exchange.getOut().setBody("Bye World");
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+            }
+        };
+    }
+
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListRedeliverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListRedeliverTest.java?rev=1137592&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListRedeliverTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListRedeliverTest.java Mon Jun 20 10:48:38 2011
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class RecipientListRedeliverTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOk() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "mySlip", "mock:a,mock:b");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testThrowExceptionAtA() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "mySlip", "mock:a,direct:a,mock:b");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    public void testThrowExceptionAtB() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "mySlip", "mock:a,mock:b,direct:b,mock:c");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // try to redeliver up till 3 times
+                errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start")
+                    .recipientList(header("mySlip")).stopOnException();
+
+                from("direct:a")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+
+                from("direct:b")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            // mutate IN body
+                            exchange.getIn().setBody("Bye World");
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+            }
+        };
+    }
+}

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RollbackTest.java Mon Jun 20 10:48:38 2011
@@ -66,8 +66,8 @@ public class RollbackTest extends Contex
         assertIsInstanceOf(RollbackExchangeException.class, out.getException());
         assertEquals("Should be marked as rollback", true, out.isRollbackOnly());
         // should not try to redeliver if exchange was marked as rollback only
-        assertEquals(0, out.getOut().getHeader(Exchange.REDELIVERY_COUNTER));
-        assertEquals(false, out.getOut().getHeader(Exchange.REDELIVERED));
+        assertEquals(0, out.getIn().getHeader(Exchange.REDELIVERY_COUNTER));
+        assertEquals(false, out.getIn().getHeader(Exchange.REDELIVERED));
     }
 
     @Override

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipRedeliverTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipRedeliverTest.java?rev=1137592&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipRedeliverTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RoutingSlipRedeliverTest.java Mon Jun 20 10:48:38 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class RoutingSlipRedeliverTest extends ContextTestSupport {
+
+    private static int counter;
+
+    public void testOk() throws Exception {
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+
+        template.sendBodyAndHeader("direct:start", "Hello World", "mySlip", "mock:a,mock:b");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testThrowExceptionAtA() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "mySlip", "mock:a,direct:a,mock:b");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    public void testThrowExceptionAtB() throws Exception {
+        counter = 0;
+
+        getMockEndpoint("mock:a").expectedMessageCount(1);
+        getMockEndpoint("mock:b").expectedMessageCount(1);
+        getMockEndpoint("mock:c").expectedMessageCount(0);
+
+        try {
+            template.sendBodyAndHeader("direct:start", "Hello World", "mySlip", "mock:a,mock:b,direct:b,mock:c");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(1 + 3, counter); // first call + 3 redeliveries
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // try to redeliver up till 3 times
+                errorHandler(defaultErrorHandler().maximumRedeliveries(3).redeliveryDelay(0));
+
+                from("direct:start")
+                    .routingSlip(header("mySlip"));
+
+                from("direct:a")
+                    .to("log:a")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+
+                from("direct:b")
+                    .to("log:b")
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws Exception {
+                            // should be same input body
+                            assertEquals("Hello World", exchange.getIn().getBody());
+                            assertFalse("Should not have OUT", exchange.hasOut());
+                            assertNull(exchange.getException());
+
+                            // mutate IN body
+                            exchange.getIn().setBody("Bye World");
+
+                            counter++;
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
+            }
+        };
+    }
+}

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SetExchangePatternTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SetExchangePatternTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SetExchangePatternTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SetExchangePatternTest.java Mon Jun 20 10:48:38 2011
@@ -17,7 +17,9 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
@@ -55,6 +57,48 @@ public class SetExchangePatternTest exte
         assertMessageReceivedWithPattern("direct:testSetExchangePatternInOnly", ExchangePattern.InOnly);
     }
 
+    public void testPreserveOldMEPInOut() throws Exception {
+        // the mock should get an InOut MEP
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:result").message(0).exchangePattern().isEqualTo(ExchangePattern.InOut);
+
+        // we send an InOnly
+        Exchange out = template.send("direct:testInOut", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOnly);
+            }
+        });
+
+        // the MEP should be preserved
+        assertNotNull(out);
+        assertEquals(ExchangePattern.InOnly, out.getPattern());
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testPreserveOldMEPInOnly() throws Exception {
+        // the mock should get an InOnly MEP
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+        getMockEndpoint("mock:result").message(0).exchangePattern().isEqualTo(ExchangePattern.InOnly);
+
+        // we send an InOut
+        Exchange out = template.send("direct:testInOnly", new Processor() {
+            @Override
+            public void process(Exchange exchange) throws Exception {
+                exchange.getIn().setBody("Hello World");
+                exchange.setPattern(ExchangePattern.InOut);
+            }
+        });
+
+        // the MEP should be preserved
+        assertNotNull(out);
+        assertEquals(ExchangePattern.InOut, out.getPattern());
+
+        assertMockEndpointsSatisfied();
+    }
+
 
     protected void assertMessageReceivedWithPattern(String sendUri, ExchangePattern expectedPattern) throws InterruptedException {
         ExchangePattern sendPattern;

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/enricher/EnricherTest.java Mon Jun 20 10:48:38 2011
@@ -95,7 +95,7 @@ public class EnricherTest extends Contex
         });
         assertEquals("bar", exchange.getIn().getHeader("foo"));
         assertEquals("test:blah", exchange.getIn().getBody());
-        assertFalse(exchange.hasOut());
+        assertTrue(exchange.hasOut());
         assertNull(exchange.getException());
     }
 

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateHeadersTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateHeadersTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateHeadersTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/ActiveMQPropagateHeadersTest.java Mon Jun 20 10:48:38 2011
@@ -19,20 +19,17 @@ package org.apache.camel.component.jms.i
 import java.util.Date;
 import java.util.List;
 import javax.jms.ConnectionFactory;
-import javax.jms.Message;
 
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
+import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.jms.CamelJmsTestHelper;
-import org.apache.camel.component.jms.JmsBinding;
-import org.apache.camel.component.jms.JmsMessage;
 import org.apache.camel.component.mock.AssertionClause;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
-import org.apache.camel.util.ExchangeHelper;
 import org.junit.Test;
 
 import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
@@ -84,14 +81,11 @@ public class ActiveMQPropagateHeadersTes
             public void configure() throws Exception {
                 from("activemq:test.a").process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
-                        // lets set the custom JMS headers using the JMS API
-                        assertNotNull(ExchangeHelper.getBinding(exchange, JmsBinding.class));
-                        JmsMessage in = (JmsMessage) exchange.getIn();
-                        assertNotNull(in);
-                        Message inMessage = in.getJmsMessage();
-                        inMessage.setJMSReplyTo(replyQueue);
-                        inMessage.setJMSCorrelationID(correlationID);
-                        inMessage.setJMSType(messageType);
+                        // set the JMS headers
+                        Message in = exchange.getIn();
+                        in.setHeader("JMSReplyTo", replyQueue);
+                        in.setHeader("JMSCorrelationID", correlationID);
+                        in.setHeader("JMSType", messageType);
                     }
                 // must set option to preserve message QoS as we send an InOnly but put a JMSReplyTo
                 // that does not work well on the consumer side, as it would assume it should send a reply

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsGetHeaderKeyFormatIssueWithContentTypeHeaderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsGetHeaderKeyFormatIssueWithContentTypeHeaderTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsGetHeaderKeyFormatIssueWithContentTypeHeaderTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/issues/JmsGetHeaderKeyFormatIssueWithContentTypeHeaderTest.java Mon Jun 20 10:48:38 2011
@@ -76,7 +76,6 @@ public class JmsGetHeaderKeyFormatIssueW
                             // do not mutate it
                             JmsMessage msg = assertIsInstanceOf(JmsMessage.class, exchange.getIn());
                             assertNotNull("javax.jms.Message should not be null", msg.getJmsMessage());
-                            assertEquals("Should NOT have been muated", false, msg.shouldCreateNewMessage());
                         }
                     })
                     .to("activemq:queue:copy", "mock:result");

Modified: camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzRouteTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzRouteTest.java (original)
+++ camel/trunk/components/camel-quartz/src/test/java/org/apache/camel/component/quartz/QuartzRouteTest.java Mon Jun 20 10:48:38 2011
@@ -16,11 +16,6 @@
  */
 package org.apache.camel.component.quartz;
 
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.test.junit4.CamelTestSupport;
@@ -41,21 +36,7 @@ public class QuartzRouteTest extends Cam
 
         // lets test the receive worked
         resultEndpoint.assertIsSatisfied();
-
-        List<Exchange> list = resultEndpoint.getReceivedExchanges();
-        for (Exchange exchange : list) {
-            Message in = exchange.getIn();
-            log.debug("Received: " + in + " with headers: " + in.getHeaders());
-            // should be quartz message
-            QuartzMessage qm = exchange.getIn(QuartzMessage.class);
-            assertNotNull(qm.getJobExecutionContext());
-
-            Iterator iterator = exchange.getIn().getBody(Iterator.class);
-            // the iterator should not have any values
-            assertFalse(iterator.hasNext());
-        }
     }
-    
 
     @Override
     protected RouteBuilder createRouteBuilder() {

Modified: camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ExchangePatternTest.scala
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ExchangePatternTest.scala?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ExchangePatternTest.scala (original)
+++ camel/trunk/components/camel-scala/src/test/scala/org/apache/camel/scala/dsl/ExchangePatternTest.scala Mon Jun 20 10:48:38 2011
@@ -37,7 +37,7 @@ class ExchangePatternTest extends ScalaT
     getMockEndpoint("mock:a").expectedMessageCount(1)
     getMockEndpoint("mock:a").expectedExchangePattern(InOnly)
     getMockEndpoint("mock:result").expectedMessageCount(1)
-    getMockEndpoint("mock:result").expectedExchangePattern(InOnly)
+    getMockEndpoint("mock:result").expectedExchangePattern(InOut)
 
     template.requestBody("direct:a", "Hello World")
 
@@ -48,14 +48,14 @@ class ExchangePatternTest extends ScalaT
     getMockEndpoint("mock:b").expectedMessageCount(1)
     getMockEndpoint("mock:b").expectedExchangePattern(InOut)
     getMockEndpoint("mock:result").expectedMessageCount(1)
-    getMockEndpoint("mock:result").expectedExchangePattern(InOut)
+    getMockEndpoint("mock:result").expectedExchangePattern(InOnly)
 
     template.sendBody("direct:b", "Hello World")
 
     assertMockEndpointsSatisfied
   }
 
-  def testRequstInOut() = {
+  def testRequestInOut() = {
     getMockEndpoint("mock:b").expectedMessageCount(1)
     getMockEndpoint("mock:b").expectedExchangePattern(InOut)
     getMockEndpoint("mock:result").expectedMessageCount(1)

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithFileLocalOnExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithFileLocalOnExceptionTest.java?rev=1137592&r1=1137591&r2=1137592&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithFileLocalOnExceptionTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/interceptor/TransactionalClientDataSourceTransactedWithFileLocalOnExceptionTest.java Mon Jun 20 10:48:38 2011
@@ -34,11 +34,11 @@ public class TransactionalClientDataSour
                     .setBody(constant("Elephant in Action")).beanRef("bookService");
 
                 from("file://target/transacted/fail?moveFailed=../failed")
-                    .transacted()
                     .onException(IllegalArgumentException.class)
                         .handled(false)
                         .to("mock:error")
                     .end()
+                    .transacted()
                     .setBody(constant("Tiger in Action")).beanRef("bookService")
                     .setBody(constant("Donkey in Action")).beanRef("bookService");
             }



Mime
View raw message