camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r768186 [1/2] - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/builder/ camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/management/ camel-core/...
Date Fri, 24 Apr 2009 06:54:56 GMT
Author: davsclaus
Date: Fri Apr 24 06:54:55 2009
New Revision: 768186

URL: http://svn.apache.org/viewvc?rev=768186&view=rev
Log:
CAMEL-1562, CAMEL-1555: Introduced Channel to act as delegate between each processor in the route (work in progress). Fixed the JMX instrumentor to not affect the route if its enabled or not.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java   (with props)
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java   (with props)
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java   (with props)
Removed:
    camel/trunk/components/camel-spring/src/main/resources/META-INF/services/org/apache/camel/spring/
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/debug/
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultErrorHandlerWrappingStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ErrorHandlerWrappingStrategy.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationWithHandlePipelineAndExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexWithNestedErrorHandlerRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionRetryUntilTest.java
    camel/trunk/components/camel-jetty/src/test/java/org/apache/camel/component/jetty/JettySimplifiedHandle404Test.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/config/ErrorHandlerTest.java

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java?rev=768186&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java Fri Apr 24 06:54:55 2009
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel;
+
+import java.util.List;
+
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.RouteContext;
+
+/**
+ * Channel acts as a channel between {@link Processor}s in the route graph.
+ * <p/>
+ * The channel is responsible for routing the {@link Exchange} to the next {@link Processor} in the route graph.
+ *
+ * @version $Revision$
+ */
+public interface Channel extends Processor {
+
+    // TODO: This interface method names are not 100% settled yet
+    // some methods should many be moved to DefaultChannel only as they are more used for testing purpose
+    // and we should add methods to traverse the channels
+    // and maybe a channel registry
+
+    /**
+     * Sets the processor that the channel should route the {@link Exchange} to.
+     *
+     * @param output  the next output
+     */
+    void setNextProcessor(Processor output);
+
+    /**
+     * Sets the {@link org.apache.camel.processor.ErrorHandler} that the Channel uses.
+
+     * @param errorHandler the error handler
+     */
+    void setErrorHandler(Processor errorHandler);
+
+    Processor getErrorHandler();
+
+    /**
+     * Adds a {@link org.apache.camel.spi.InterceptStrategy} to apply each {@link Exchange} before
+     * its routed to the next {@link Processor}.
+     *
+     * @param strategy  the intercept strategy
+     */
+    void addInterceptStrategy(InterceptStrategy strategy);
+
+    /**
+     * Adds a list of {@link org.apache.camel.spi.InterceptStrategy} to apply each {@link Exchange} before
+     * its routed to the next {@link Processor}.
+     *
+     * @param strategy  list of strategies
+     */
+    void addInterceptStrategies(List<InterceptStrategy> strategy);
+
+    /**
+     * Initializes the channel.
+     *
+     * @param outputDefinition  the route defintion the {@link Channel} represents
+     * @param routeContext      the route context
+     * @throws Exception is thrown if some error occured
+     */
+    void initChannel(ProcessorDefinition outputDefinition, RouteContext routeContext) throws Exception;
+
+    /**
+     * Gets the wrapped output that at runtime should be delegated to.
+     *
+     * @return the output delegated to
+     */
+    Processor getOutput();
+
+    /**
+     * Gets the original next {@link Processor} that is not wrapped.
+     *
+     * @return  the next processor
+     */
+    Processor getNextProcessor();
+
+    boolean hasInterceptorStrategy(Class type);
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/Channel.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilderRef.java Fri Apr 24 06:54:55 2009
@@ -93,7 +93,7 @@
                         // otherwise we could recursive loop forever (triggered by createErrorHandler method)
                         handler = new DefaultErrorHandlerBuilder();
                         // inherit the error handlers from the other as they are to be shared
-                        // this is needed by camel-spring when using non error handler configured
+                        // this is needed by camel-spring when none error handler has been explicit configured
                         handler.setErrorHandlers(other.getErrorHandlers());
                     }
                 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultErrorHandlerWrappingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultErrorHandlerWrappingStrategy.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultErrorHandlerWrappingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultErrorHandlerWrappingStrategy.java Fri Apr 24 06:54:55 2009
@@ -26,6 +26,7 @@
  * The default error handler wrapping strategy used when JMX is disabled.
  *
  * @version $Revision$
+ * @deprecated is replaced by {@link org.apache.camel.Channel}
  */
 public class DefaultErrorHandlerWrappingStrategy implements ErrorHandlerWrappingStrategy {
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/InstrumentationInterceptStrategy.java Fri Apr 24 06:54:55 2009
@@ -51,6 +51,8 @@
         if (counter != null) {
             InstrumentationProcessor wrapper = new InstrumentationProcessor(counter);
             wrapper.setProcessor(target);
+            // remove to not double wrap it
+            registeredCounters.remove(processorDefinition);
             return wrapper;
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/LoadBalanceDefinition.java Fri Apr 24 06:54:55 2009
@@ -120,7 +120,7 @@
         LoadBalancer loadBalancer = LoadBalancerDefinition.getLoadBalancer(routeContext, loadBalancerType, ref);
         for (ProcessorDefinition processorType : getOutputs()) {            
             Processor processor = processorType.createProcessor(routeContext);
-            processor = processorType.wrapProcessorInInterceptors(routeContext, processor);
+            processor = wrapProcessor(routeContext, processor);
             loadBalancer.addProcessor(processor);
         }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/MulticastDefinition.java Fri Apr 24 06:54:55 2009
@@ -150,10 +150,4 @@
         this.executor = executor;        
     }
     
-    @Override
-    protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {        
-        //CAMEL-1193 now we need to wrap the multicast processor with the interceptors
-        //Current we wrap the StreamCachingInterceptor by default
-        return super.wrapProcessorInInterceptors(routeContext, new StreamCachingInterceptor(target));        
-    }
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/OnExceptionDefinition.java Fri Apr 24 06:54:55 2009
@@ -119,19 +119,18 @@
         return parentPolicy;
     }
 
-    public void
-    addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
+    public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
         setHandledFromExpressionType(routeContext);
         setRetryUntilFromExpressionType(routeContext);
-        // lets attach a processor to an error handler
-        errorHandler = routeContext.createProcessor(this);
-        ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
-        builder.addErrorHandlers(this);
-
         // lookup onRedelivery if ref is provided
         if (ObjectHelper.isNotEmpty(onRedeliveryRef)) {
-            onRedelivery = routeContext.lookup(onRedeliveryRef, Processor.class);
+            setOnRedelivery(routeContext.lookup(onRedeliveryRef, Processor.class));
         }
+
+        // lets attach this on exception to the route error handler
+        errorHandler = routeContext.createProcessor(this);
+        ErrorHandlerBuilder builder = routeContext.getRoute().getErrorHandlerBuilder();
+        builder.addErrorHandlers(this);
     }
 
     @Override

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Fri Apr 24 06:54:55 2009
@@ -21,10 +21,8 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Executor;
 
 import javax.xml.bind.annotation.XmlAccessType;
@@ -32,8 +30,8 @@
 import javax.xml.bind.annotation.XmlAttribute;
 import javax.xml.bind.annotation.XmlTransient;
 
-import org.apache.camel.CamelContext;
 import org.apache.camel.CamelException;
+import org.apache.camel.Channel;
 import org.apache.camel.Endpoint;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Expression;
@@ -45,18 +43,17 @@
 import org.apache.camel.builder.ErrorHandlerBuilderRef;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.builder.ProcessorBuilder;
-import org.apache.camel.management.InstrumentationProcessor;
 import org.apache.camel.model.dataformat.DataFormatDefinition;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
+import org.apache.camel.processor.DefaultChannel;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
 import org.apache.camel.spi.IdempotentRepository;
-import org.apache.camel.spi.InterceptStrategy;
 import org.apache.camel.spi.Policy;
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TransactedPolicy;
@@ -100,12 +97,90 @@
     }
 
     /**
-     * Wraps the child processor in whatever necessary interceptors and error
-     * handlers
+     * Wraps the child processor in whatever necessary interceptors and error handlers
      */
     public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
-        processor = wrapProcessorInInterceptors(routeContext, processor);
-        return wrapInErrorHandler(routeContext, processor);
+        // dont double wrap
+        if (processor instanceof Channel) {
+            return processor;
+        }
+        return wrapChannel(routeContext, processor);
+    }
+
+    private Processor wrapChannel(RouteContext routeContext, Processor processor) throws Exception {
+        // put a channel inbetween this and each output to control the route flow logic
+        Channel channel = createChannel(routeContext);
+        channel.setNextProcessor(processor);
+
+        // add interceptor strategies to the channel
+        channel.addInterceptStrategies(routeContext.getCamelContext().getInterceptStrategies());
+        channel.addInterceptStrategies(routeContext.getInterceptStrategies());
+
+        // init the channel
+        channel.initChannel(this, routeContext);
+
+        // set the error handler, must be done after init as we can set the error handler as first in the chain
+        // skip error handlers in on exception and try .. catch .. finally routes
+        if (this instanceof OnExceptionDefinition) {
+            // also use error handler for on exception
+            Processor errorHandler = getErrorHandlerBuilder().createErrorHandler(routeContext, channel.getOutput());
+            channel.setErrorHandler(errorHandler);
+            return channel;
+        } else if (this instanceof TryDefinition || this instanceof CatchDefinition || this instanceof FinallyDefinition) {
+            // TODO: special error handler, where we have a local error handler with onlly the catch definitions
+            return channel;
+        } else {
+            // regular definition so add the error handler
+            Processor errorHandler = getErrorHandlerBuilder().createErrorHandler(routeContext, channel.getOutput());
+            channel.setErrorHandler(errorHandler);
+            return channel;
+        }
+    }
+
+    /**
+     * Creates a new instance of some kind of composite processor which defaults
+     * to using a {@link Pipeline} but derived classes could change the behaviour
+     */
+    protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
+        return new Pipeline(list);
+    }
+
+    /**
+     * Creates a new instance of the {@link Channel}.
+     */
+    protected Channel createChannel(RouteContext routeContext) {
+        return new DefaultChannel();
+    }
+
+    protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition> outputs) throws Exception {
+        List<Processor> list = new ArrayList<Processor>();
+        for (ProcessorDefinition output : outputs) {
+            Processor processor = output.createProcessor(routeContext);
+            // if the ProceedType/StopType create processor is null we keep on going
+            if ((output instanceof ProceedDefinition || output instanceof StopDefinition || output instanceof Channel) && processor == null) {
+                continue;
+            }
+
+            Processor channel = wrapChannel(routeContext, processor);
+            list.add(channel);
+        }
+
+        // if more than one output wrap than in a composite processor else just keep it as is
+        Processor processor = null;
+        if (!list.isEmpty()) {
+            if (list.size() == 1) {
+                processor = list.get(0);
+            } else {
+                processor = createCompositeProcessor(routeContext, list);
+            }
+        }
+
+        return processor;
+    }
+
+    public void clearOutput() {
+        getOutputs().clear();
+        blocks.clear();
     }
 
     // Fluent API
@@ -1946,37 +2021,6 @@
     }
 
     /**
-     * A strategy method which allows derived classes to wrap the child
-     * processor in some kind of interceptor
-     *
-     * @param routeContext the route context
-     * @param target       the processor which can be wrapped
-     * @return the original processor or a new wrapped interceptor
-     * @throws Exception can be thrown in case of error
-     */
-    protected Processor wrapProcessorInInterceptors(RouteContext routeContext, Processor target) throws Exception {
-        ObjectHelper.notNull(target, "target", this);
-
-        if (target instanceof InstrumentationProcessor) {
-            // do not double wrap instrumentation
-            return target;
-        }
-
-        List<InterceptStrategy> strategies = new ArrayList<InterceptStrategy>();
-        CamelContext camelContext = routeContext.getCamelContext();
-        strategies.addAll(camelContext.getInterceptStrategies());
-        strategies.addAll(routeContext.getInterceptStrategies());
-        // TODO: Order the strategies, eg using Comparable so we can have Tracer near the real processor
-        for (InterceptStrategy strategy : strategies) {
-            if (strategy != null) {
-                target = strategy.wrapProcessorInInterceptors(this, target);
-            }
-        }
-
-        return target;
-    }
-
-    /**
      * A strategy method to allow newly created processors to be wrapped in an
      * error handler.
      */
@@ -2015,46 +2059,4 @@
         }
     }
 
-    /**
-     * Creates a new instance of some kind of composite processor which defaults
-     * to using a {@link Pipeline} but derived classes could change the
-     * behaviour
-     */
-    protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) {
-        return new Pipeline(list);
-    }
-
-    protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition> outputs)
-        throws Exception {
-        List<Processor> list = new ArrayList<Processor>();
-        for (ProcessorDefinition output : outputs) {
-            Processor processor = output.createProcessor(routeContext);
-            // if the ProceedType/StopType create processor is null we keep on going
-            if ((output instanceof ProceedDefinition || output instanceof StopDefinition) && processor == null) {
-                continue;
-            }
-            processor = output.wrapProcessorInInterceptors(routeContext, processor);
-
-            ProcessorDefinition currentProcessor = this;
-            if (!(currentProcessor instanceof OnExceptionDefinition || currentProcessor instanceof TryDefinition)) {
-                processor = output.wrapInErrorHandler(routeContext, processor);
-            }
-
-            list.add(processor);
-        }
-        Processor processor = null;
-        if (!list.isEmpty()) {
-            if (list.size() == 1) {
-                processor = list.get(0);
-            } else {
-                processor = createCompositeProcessor(routeContext, list);
-            }
-        }
-        return processor;
-    }
-
-    public void clearOutput() {
-        getOutputs().clear();
-        blocks.clear();
-    }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Fri Apr 24 06:54:55 2009
@@ -45,6 +45,8 @@
  */
 public class DeadLetterChannel extends ErrorHandlerSupport implements AsyncProcessor {
 
+    // TODO: The code in this class will be much easier when the AsyncProcessor is removed
+
     // we can use a single shared static timer for async redeliveries
     private static final Timer REDELIVER_TIMER = new Timer("Camel DeadLetterChannel Redeliver Timer", true);
     private final Processor deadLetter;

Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java?rev=768186&view=auto
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java (added)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java Fri Apr 24 06:54:55 2009
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Channel;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.model.ProcessorDefinition;
+import org.apache.camel.spi.InterceptStrategy;
+import org.apache.camel.spi.RouteContext;
+import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ServiceHelper;
+
+/**
+ * DefaultChannel is the default {@link Channel}.
+ * <p/>
+ * The current implementation is just a composite containing the interceptors and error handler
+ * that beforehand was added to the route graph directly.
+ * <br/>
+ * With this {@link Channel} we can in the future implement better strategies for routing the
+ * {@link Exchange} in the route graph, as we have a {@link Channel} between each and every node
+ * in the graph.
+ *
+ * @version $Revision$
+ */
+public class DefaultChannel extends DelegateProcessor implements AsyncProcessor, Channel {
+
+    private final List<InterceptStrategy> interceptors = new ArrayList<InterceptStrategy>();
+    private Processor errorHandler;
+    // target is the original output
+    private Processor nextProcessor;
+    // output is the real output used as its wrapped by error handler and interceptors
+    private Processor output;
+
+    public void setNextProcessor(Processor output) {
+        this.nextProcessor = output;
+    }
+
+    public Processor getOutput() {
+        // the errorHandler is already decorated with interceptors
+        // so it cointain the entire chain of processors, so we can safely use it directly as output
+        // if no error handler provided we can use the output direcly
+        return errorHandler != null ? errorHandler : output;
+    }
+
+    public Processor getNextProcessor() {
+        return nextProcessor;
+    }
+
+    public boolean hasInterceptorStrategy(Class type) {
+        for (InterceptStrategy strategy : interceptors) {
+            if (type.isInstance(strategy)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public void setErrorHandler(Processor errorHandler) {
+        this.errorHandler = errorHandler;
+    }
+
+    public Processor getErrorHandler() {
+        return errorHandler;
+    }
+
+    public void addInterceptStrategy(InterceptStrategy strategy) {
+        interceptors.add(strategy);
+    }
+
+    public void addInterceptStrategies(List<InterceptStrategy> strategies) {
+        interceptors.addAll(strategies);
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+        ServiceHelper.startServices(errorHandler, output);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        ServiceHelper.startServices(output, errorHandler);
+        super.doStop();
+    }
+
+    public void initChannel(ProcessorDefinition outputDefinition, RouteContext routeContext) throws Exception {
+        // TODO: Support ordering of interceptors
+
+        // wrap the output with the interceptors
+        Processor target = nextProcessor;
+        for (InterceptStrategy strategy : interceptors) {
+            target = strategy.wrapProcessorInInterceptors(outputDefinition, target);
+        }
+
+        // sets the delegate to our wrapped output
+        output = target;
+        setProcessor(target);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        Processor processor = getOutput();
+
+        if (processor instanceof AsyncProcessor) {
+            return ((AsyncProcessor) processor).process(exchange, callback);
+        } else if (processor != null) {
+            try {
+                processor.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
+        }
+
+        callback.done(true);
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        // just output the next processor as all the interceptors and error handler is just too verbose
+        return "Channel[" + nextProcessor + "]";
+    }
+
+}

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DefaultChannel.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ErrorHandlerWrappingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ErrorHandlerWrappingStrategy.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ErrorHandlerWrappingStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ErrorHandlerWrappingStrategy.java Fri Apr 24 06:54:55 2009
@@ -24,6 +24,7 @@
  * provide custom logic to wrap a processor with error handler
  *
  * @version $Revision$
+ * @deprecated is replaced by {@link org.apache.camel.Channel}
  */
 public interface ErrorHandlerWrappingStrategy {
 
@@ -37,6 +38,7 @@
      * @param target the processor to be wrapped
      * @return processor wrapped with an interceptor or not wrapped
      * @throws Exception can be thrown
+     * @deprecated is replaced by {@link org.apache.camel.Channel}
      */
     Processor wrapProcessorInErrorHandler(ProcessorDefinition processorType, Processor target) throws Exception;
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/TestSupport.java Fri Apr 24 06:54:55 2009
@@ -362,6 +362,27 @@
     }
 
     /**
+     * If a processor is wrapped with a bunch of DelegateProcessor or DelegateAsyncProcessor objects
+     * this call will drill through them and return the Channel.
+     * <p/>
+     * Returns null if no channel is found.
+     */
+    protected Channel unwrapChannel(Processor processor) {
+        while (true) {
+            if (processor instanceof Channel) {
+                return (Channel) processor;
+            }
+            if (processor instanceof DelegateAsyncProcessor) {
+                processor = ((DelegateAsyncProcessor)processor).getProcessor();
+            } else if (processor instanceof DelegateProcessor) {
+                processor = ((DelegateProcessor)processor).getProcessor();
+            } else {
+                return null;
+            }
+        }
+    }
+
+    /**
      * Recursively delete a directory, useful to zapping test data
      *
      * @param file the directory to be deleted

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/BuilderWithScopesTest.java Fri Apr 24 06:54:55 2009
@@ -236,10 +236,14 @@
     protected RouteBuilder createTryCatchNoEnd() {
         return new RouteBuilder() {
             public void configure() {
-                errorHandler(deadLetterChannel("mock:error").delay(0));
-                
-                from("direct:a").doTry().process(validator).process(toProcessor)
-                    .doCatch(ValidationException.class).process(orderProcessor).process(orderProcessor3); // continuation of the handle clause
+                from("direct:a")
+                    .doTry()
+                        .process(validator)
+                        .process(toProcessor)
+                    .doCatch(ValidationException.class)
+                        .process(orderProcessor)
+                        .process(orderProcessor3)
+                    .end();
             }
         };
     }
@@ -263,7 +267,7 @@
 
     public void testRouteWithTryCatchNoEndWithUncaughtException() throws Exception {
         ArrayList<String> expected = new ArrayList<String>();
-        expected.addAll(Collections.nCopies(6, "VALIDATE"));
+        expected.add("VALIDATE");
 
         runTest(createTryCatchNoEnd(), expected);
     }
@@ -271,8 +275,6 @@
     protected RouteBuilder createTryCatchEnd() {
         return new RouteBuilder() {
             public void configure() {
-                errorHandler(deadLetterChannel("mock:error").delay(0));
-
                 from("direct:a").doTry().process(validator).process(toProcessor)
                     .doCatch(ValidationException.class).process(orderProcessor).end().process(orderProcessor3);
             }
@@ -299,7 +301,7 @@
 
     public void testRouteWithTryCatchEndWithUncaughtException() throws Exception {
         ArrayList<String> expected = new ArrayList<String>();
-        expected.addAll(Collections.nCopies(6, "VALIDATE"));
+        expected.add("VALIDATE");
 
         runTest(createTryCatchEnd(), expected);
     }
@@ -307,8 +309,6 @@
     protected RouteBuilder createTryCatchFinallyNoEnd() {
         return new RouteBuilder() {
             public void configure() {
-                errorHandler(deadLetterChannel("mock:error").delay(0).maximumRedeliveries(1));
-
                 from("direct:a").doTry().process(validator).process(toProcessor)
                     .doCatch(ValidationException.class).process(orderProcessor).doFinally()
                     .process(orderProcessor2).process(orderProcessor3); // continuation of the finallyBlock clause
@@ -341,10 +341,6 @@
         expected.add("VALIDATE");
         expected.add("INVOKED2");
         expected.add("INVOKED3");
-        // exchange should be processed twice for an uncaught exception and maximumRedeliveries(1)
-        expected.add("VALIDATE");
-        expected.add("INVOKED2");
-        expected.add("INVOKED3");
 
         runTest(createTryCatchFinallyNoEnd(), expected);
     }
@@ -352,8 +348,6 @@
     protected RouteBuilder createTryCatchFinallyEnd() {
         return new RouteBuilder() {
             public void configure() {
-                errorHandler(deadLetterChannel().maximumRedeliveries(1).delay(0));
-                
                 from("direct:a").doTry().process(validator).process(toProcessor)
                     .doCatch(ValidationException.class).process(orderProcessor).doFinally()
                     .process(orderProcessor2).end().process(orderProcessor3);
@@ -385,10 +379,6 @@
         ArrayList<String> expected = new ArrayList<String>();
         expected.add("VALIDATE");
         expected.add("INVOKED2");
-        // exchange should be processed twice for an uncaught exception and maximumRedeliveries(1)
-        expected.add("VALIDATE");
-        expected.add("INVOKED2");
-        // orderProcessor3 will not be invoked past end() with an uncaught exception
 
         runTest(createTryCatchFinallyEnd(), expected);
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ContextErrorHandlerTest.java Fri Apr 24 06:54:55 2009
@@ -18,13 +18,11 @@
 
 import java.util.List;
 
-import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.TestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
 import org.apache.camel.processor.DeadLetterChannel;
 import org.apache.camel.processor.LoggingErrorHandler;
@@ -77,10 +75,10 @@
 
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
             Processor processor = consumerRoute.getProcessor();
-            processor = unwrap(processor);
-            LoggingErrorHandler loggingProcessor = assertIsInstanceOf(LoggingErrorHandler.class, processor);
-            processor = unwrap(loggingProcessor.getOutput());
-            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, processor);
+
+            Channel channel = unwrapChannel(processor);
+            assertIsInstanceOf(LoggingErrorHandler.class, channel.getErrorHandler());
+            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, channel.getNextProcessor());
             log.debug("Found sendProcessor: " + sendProcessor);
         }
     }
@@ -100,9 +98,9 @@
 
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
             Processor processor = consumerRoute.getProcessor();
-            processor = unwrap(processor);
 
-            DeadLetterChannel deadLetterChannel = assertIsInstanceOf(DeadLetterChannel.class, processor);
+            Channel channel = unwrapChannel(processor);
+            DeadLetterChannel deadLetterChannel = assertIsInstanceOf(DeadLetterChannel.class, channel.getErrorHandler());
 
             RedeliveryPolicy redeliveryPolicy = deadLetterChannel.getRedeliveryPolicy();
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/ErrorHandlerTest.java Fri Apr 24 06:54:55 2009
@@ -18,20 +18,17 @@
 
 import java.util.List;
 
+import org.apache.camel.Channel;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.TestSupport;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.management.InstrumentationProcessor;
-import org.apache.camel.management.JmxSystemPropertyKeys;
 import org.apache.camel.processor.DeadLetterChannel;
-import org.apache.camel.processor.DefaultErrorHandler;
 import org.apache.camel.processor.FilterProcessor;
 import org.apache.camel.processor.LoggingErrorHandler;
 import org.apache.camel.processor.RedeliveryPolicy;
 import org.apache.camel.processor.SendProcessor;
-import org.apache.camel.processor.interceptor.StreamCachingInterceptor;
 
 /**
  * @version $Revision$
@@ -58,12 +55,12 @@
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
 
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
-            Processor processor = consumerRoute.getProcessor();
-            processor = unwrap(processor);
-            LoggingErrorHandler loggingProcessor = assertIsInstanceOf(LoggingErrorHandler.class, processor);
-            processor = unwrap(loggingProcessor.getOutput());
+            Channel channel = unwrapChannel(consumerRoute.getProcessor());
+
+            assertIsInstanceOf(LoggingErrorHandler.class, channel.getErrorHandler());
+
+            Processor processor = unwrap(channel.getNextProcessor());
             SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, processor);
-            log.debug("Found sendProcessor: " + sendProcessor);
         }
     }
 
@@ -87,40 +84,6 @@
 
         List<Route> list = getRouteList(builder);
         assertEquals("Number routes created" + list, 2, list.size());
-        for (Route route : list) {
-            Endpoint key = route.getEndpoint();
-            String endpointUri = key.getEndpointUri();
-            EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
-            Processor processor = unwrap(consumerRoute.getProcessor());
-
-            SendProcessor sendProcessor = null;
-            if (endpointUri.equals("seda:a")) {
-                LoggingErrorHandler loggingProcessor = assertIsInstanceOf(LoggingErrorHandler.class,
-                                                                          processor);
-                Processor outputProcessor = loggingProcessor.getOutput();
-                if (Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                    StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, outputProcessor);
-                    sendProcessor = assertIsInstanceOf(SendProcessor.class, cache.getProcessor());
-                } else {
-                    InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, outputProcessor);
-                    StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, interceptor.getProcessor());
-                    sendProcessor = assertIsInstanceOf(SendProcessor.class, cache.getProcessor());
-                }
-            } else {
-                assertEquals("From endpoint", "seda:b", endpointUri);
-                DefaultErrorHandler defaultErrorHandler = assertIsInstanceOf(DefaultErrorHandler.class, processor);
-                Processor outputProcessor = defaultErrorHandler.getOutput();
-                if (Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                    StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, outputProcessor);
-                    sendProcessor = assertIsInstanceOf(SendProcessor.class, cache.getProcessor());
-                } else {
-                    InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, outputProcessor);
-                    StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, interceptor.getProcessor());
-                    sendProcessor = assertIsInstanceOf(SendProcessor.class, cache.getProcessor());
-                }
-            }
-            log.debug("For " + endpointUri + " using: " + sendProcessor);
-        }
     }
 
     public void testConfigureDeadLetterChannel() throws Exception {
@@ -145,7 +108,7 @@
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
             Processor processor = unwrap(consumerRoute.getProcessor());
 
-            assertIsInstanceOf(DeadLetterChannel.class, processor);
+            assertIsInstanceOf(SendProcessor.class, processor);
         }
     }
 
@@ -172,10 +135,9 @@
 
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
             Processor processor = consumerRoute.getProcessor();
-            processor = unwrap(processor);
-
-            DeadLetterChannel deadLetterChannel = assertIsInstanceOf(DeadLetterChannel.class, processor);
+            Channel channel = unwrapChannel(processor);
 
+            DeadLetterChannel deadLetterChannel = assertIsInstanceOf(DeadLetterChannel.class, channel.getErrorHandler());
             RedeliveryPolicy redeliveryPolicy = deadLetterChannel.getRedeliveryPolicy();
 
             assertEquals("getMaximumRedeliveries()", 2, redeliveryPolicy.getMaximumRedeliveries());
@@ -199,25 +161,10 @@
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
             EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
-            Processor processor = unwrap(consumerRoute.getProcessor());
-
-            LoggingErrorHandler loggingProcessor = assertIsInstanceOf(LoggingErrorHandler.class, processor);
-
-            if (Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, loggingProcessor.getOutput());
-                processor = cache.getProcessor();
-            } else {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, loggingProcessor.getOutput());
-                StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, interceptor.getProcessor());
-                processor = cache.getProcessor();
-            }
-
-            FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, processor);
-            LoggingErrorHandler logging = assertIsInstanceOf(LoggingErrorHandler.class, filterProcessor.getProcessor());
-            StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, logging.getOutput());
-            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, cache.getProcessor());
+            Channel channel = unwrapChannel(consumerRoute.getProcessor());
 
-            log.debug("Found sendProcessor: " + sendProcessor);
+            assertIsInstanceOf(LoggingErrorHandler.class, channel.getErrorHandler());
+            assertIsInstanceOf(FilterProcessor.class, channel.getNextProcessor());
         }
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java Fri Apr 24 06:54:55 2009
@@ -20,6 +20,7 @@
 import java.util.List;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Channel;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -28,8 +29,6 @@
 import org.apache.camel.TestSupport;
 import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.management.InstrumentationProcessor;
-import org.apache.camel.management.JmxSystemPropertyKeys;
 import org.apache.camel.processor.ChoiceProcessor;
 import org.apache.camel.processor.DeadLetterChannel;
 import org.apache.camel.processor.DelegateProcessor;
@@ -88,15 +87,11 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
 
-            SendProcessor sendProcessor;
-            if (Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                sendProcessor = assertIsInstanceOf(SendProcessor.class, processor);
-            } else {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                sendProcessor = assertIsInstanceOf(SendProcessor.class, interceptor.getProcessor());
-            }
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
+
+            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, channel.getNextProcessor());
             assertEquals("Endpoint URI", "seda:b", sendProcessor.getDestination().getEndpointUri());
         }
     }
@@ -123,16 +118,12 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
 
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
 
-            FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, processor);
-            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class,
-                    unwrapErrorHandler(filterProcessor.getProcessor()));
+            FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, channel.getNextProcessor());
+            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, unwrapChannel(filterProcessor).getNextProcessor());
             assertEquals("Endpoint URI", "seda:b", sendProcessor.getDestination().getEndpointUri());
         }
     }
@@ -160,24 +151,21 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
 
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
 
-            ChoiceProcessor choiceProcessor = assertIsInstanceOf(ChoiceProcessor.class, processor);
+            ChoiceProcessor choiceProcessor = assertIsInstanceOf(ChoiceProcessor.class, channel.getNextProcessor());
             List<FilterProcessor> filters = choiceProcessor.getFilters();
             assertEquals("Should be two when clauses", 2, filters.size());
 
             FilterProcessor filter1 = filters.get(0);
-            assertSendTo(filter1.getProcessor(), "seda:b");
+            assertSendTo(unwrapChannel(filter1.getProcessor()).getNextProcessor(), "seda:b");
 
             FilterProcessor filter2 = filters.get(1);
-            assertSendTo(filter2.getProcessor(), "seda:c");
+            assertSendTo(unwrapChannel(filter2.getProcessor()).getNextProcessor(), "seda:c");
 
-            assertSendTo(choiceProcessor.getOtherwise(), "seda:d");
+            assertSendTo(unwrapChannel(choiceProcessor.getOtherwise()).getNextProcessor(), "seda:d");
         }
     }
 
@@ -207,13 +195,10 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
 
-            assertEquals("Should be called with my processor", myProcessor, processor);
+            assertEquals("Should be called with my processor", myProcessor, channel.getNextProcessor());
         }
     }
 
@@ -239,14 +224,12 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
-            FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, processor);
-            assertEquals("Should be called with my processor", myProcessor,
-                         unwrapErrorHandler(filterProcessor.getProcessor()));
+
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
+
+            FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, channel.getNextProcessor());
+            assertEquals("Should be called with my processor", myProcessor, unwrapChannel(filterProcessor.getProcessor()).getNextProcessor());
         }
     }
 
@@ -272,17 +255,16 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
-            // take off the InstrumentationProcessor
-            processor = unwrapDelegateProcessor(processor);
-            // take off the StreamCacheInterceptor
-            processor = unwrapDelegateProcessor(processor);
-            MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, processor);
+
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
+
+            MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, channel.getNextProcessor());
             List<Processor> endpoints = new ArrayList<Processor>(multicastProcessor.getProcessors());
             assertEquals("Should have 2 endpoints", 2, endpoints.size());
 
-            assertSendToProcessor(endpoints.get(0), "seda:tap");
-            assertSendToProcessor(endpoints.get(1), "seda:b");
+            assertSendToProcessor(unwrapChannel(endpoints.get(0)).getNextProcessor(), "seda:tap");
+            assertSendToProcessor(unwrapChannel(endpoints.get(1)).getNextProcessor(), "seda:b");
         }
     }
 
@@ -314,16 +296,16 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
 
-            Pipeline line = assertIsInstanceOf(Pipeline.class, processor);
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+
+            Pipeline line = assertIsInstanceOf(Pipeline.class, unwrap(consumer.getProcessor()));
             assertEquals(3, line.getProcessors().size());
             // last should be our seda
+
             List<Processor> processors = new ArrayList<Processor>(line.getProcessors());
-            processor = unwrapErrorHandler(processors.get(2));
-            processor = unwrapDelegateProcessor(processor);
-            assertIsInstanceOf(SendProcessor.class, processor);
-            assertSendTo(processor, "seda:d");
+            Processor sendTo = assertIsInstanceOf(SendProcessor.class, unwrapChannel(processors.get(2)).getNextProcessor());
+            assertSendTo(sendTo, "seda:d");
         }
     }
 
@@ -385,12 +367,11 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
-            RecipientList p1 = assertIsInstanceOf(RecipientList.class, processor);
+
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
+
+            assertIsInstanceOf(RecipientList.class, channel.getNextProcessor());
         }
     }
 
@@ -417,12 +398,10 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
-            Splitter p1 = assertIsInstanceOf(Splitter.class, processor);
+
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
+            assertIsInstanceOf(Splitter.class, channel.getNextProcessor());
         }
     }
 
@@ -450,18 +429,15 @@
         for (Route route : routes) {
             Endpoint key = route.getEndpoint();
             assertEquals("From endpoint", "seda:a", key.getEndpointUri());
-            Processor processor = getProcessorWithoutErrorHandler(route);
-            if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-                InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, processor);
-                processor = interceptor.getProcessor();
-            }
 
-            IdempotentConsumer idempotentConsumer = assertIsInstanceOf(IdempotentConsumer.class, processor);
+            EventDrivenConsumerRoute consumer = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+            Channel channel = unwrapChannel(consumer.getProcessor());
+
+            IdempotentConsumer idempotentConsumer = assertIsInstanceOf(IdempotentConsumer.class, channel.getNextProcessor());
             assertEquals("messageIdExpression", "header(myMessageId)", idempotentConsumer.getMessageIdExpression().toString());
 
             assertIsInstanceOf(MemoryIdempotentRepository.class, idempotentConsumer.getIdempotentRepository());
-            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class,
-                                                             unwrapErrorHandler(idempotentConsumer.getNextProcessor()));
+            SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, unwrapChannel(idempotentConsumer.getNextProcessor()).getNextProcessor());
             assertEquals("Endpoint URI", "seda:b", sendProcessor.getDestination().getEndpointUri());
         }
     }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java?rev=768186&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java Fri Apr 24 06:54:55 2009
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class ChannelTest extends ContextTestSupport {
+
+    private static int counter;
+
+    @Override
+    protected void setUp() throws Exception {
+        disableJMX();
+        super.setUp();
+    }
+
+    public void testChannel() throws Exception {
+        counter = 0;
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(2);
+
+        template.sendBody("direct:start", "Hello World");
+        template.sendBody("direct:start", "Bye World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+                from("direct:start")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            if (counter++ < 1) {
+                                throw new IllegalArgumentException("Damn");
+                            }
+                        }
+                    }).to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ChannelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DefaultErrorHandlerTest.java Fri Apr 24 06:54:55 2009
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.Channel;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -41,8 +42,9 @@
 
         // there should be a default error handler in front of each processor in this pipeline
         for (Processor child : pipeline.getProcessors()) {
-            DefaultErrorHandler errorHandler = assertIsInstanceOf(DefaultErrorHandler.class, child);
-            assertNotNull(errorHandler);
+            Channel channel = assertIsInstanceOf(Channel.class, child);
+            assertNotNull("There should be an error handler", channel.getErrorHandler());
+            assertIsInstanceOf(DefaultErrorHandler.class, channel.getErrorHandler());
         }
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ResequencerTest.java Fri Apr 24 06:54:55 2009
@@ -18,16 +18,15 @@
 
 import java.util.List;
 
+import org.apache.camel.Channel;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
-import org.apache.camel.Processor;
 import org.apache.camel.Route;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.management.InstrumentationProcessor;
 import org.apache.camel.management.JmxSystemPropertyKeys;
-import org.apache.camel.processor.interceptor.StreamCachingInterceptor;
+import org.apache.camel.processor.interceptor.StreamCaching;
 
 /**
  * @version $Revision$
@@ -66,12 +65,7 @@
 
     public void testBatchResequencerTypeWithJmx() throws Exception {
         System.setProperty(JmxSystemPropertyKeys.DISABLED, "true");
-
-        List<Route> list = getRouteList(createRouteBuilder());
-        assertEquals("Number of routes created: " + list, 1, list.size());
-
-        Route route = list.get(0);
-        assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+        testBatchResequencerTypeWithoutJmx();
     }
 
     public void testBatchResequencerTypeWithoutJmx() throws Exception {
@@ -81,17 +75,11 @@
         Route route = list.get(0);
         EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
 
-        Processor processor = unwrap(consumerRoute.getProcessor());
-
-        DefaultErrorHandler defaultErrorHandler = assertIsInstanceOf(DefaultErrorHandler.class, processor);
-
-        Processor outputProcessor = defaultErrorHandler.getOutput();
-        InstrumentationProcessor interceptor = assertIsInstanceOf(InstrumentationProcessor.class, outputProcessor);
-
-        outputProcessor = interceptor.getProcessor();
+        Channel channel = unwrapChannel(consumerRoute.getProcessor());
+        assertIsInstanceOf(DefaultErrorHandler.class, channel.getErrorHandler());
+        assertTrue("Should have stream caching", channel.hasInterceptorStrategy(StreamCaching.class));
 
-        StreamCachingInterceptor cache = assertIsInstanceOf(StreamCachingInterceptor.class, outputProcessor);
-        assertIsInstanceOf(Resequencer.class, cache.getProcessor());
+        assertIsInstanceOf(Resequencer.class, channel.getNextProcessor());
     }
 
 }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java?rev=768186&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java Fri Apr 24 06:54:55 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * @version $Revision$
+ */
+public class SimpleMockTest extends ContextTestSupport {
+
+    public void testSimple() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").to("log:foo").to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SimpleMockTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/StreamResequencerTest.java Fri Apr 24 06:54:55 2009
@@ -18,6 +18,7 @@
 
 import java.util.List;
 
+import org.apache.camel.Channel;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -27,15 +28,13 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.impl.EventDrivenConsumerRoute;
-import org.apache.camel.management.InstrumentationProcessor;
-import org.apache.camel.management.JmxSystemPropertyKeys;
 
 public class StreamResequencerTest extends ContextTestSupport {
 
     protected MockEndpoint resultEndpoint;
 
     protected void sendBodyAndHeader(String endpointUri, final Object body,
-                                   final String headerName, final Object headerValue) {
+                                     final String headerName, final Object headerValue) {
         template.send(endpointUri, new Processor() {
             public void process(Exchange exchange) {
                 Message in = exchange.getIn();
@@ -108,25 +107,12 @@
         assertEquals("Number of routes created: " + list, 1, list.size());
 
         Route route = list.get(0);
-        EventDrivenConsumerRoute consumerRoute =
-            assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
+        EventDrivenConsumerRoute consumerRoute = assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
 
-        Processor processor = unwrap(consumerRoute.getProcessor());
+        Channel channel = unwrapChannel(consumerRoute.getProcessor());
 
-        DefaultErrorHandler deadLetterChannel = assertIsInstanceOf(DefaultErrorHandler.class, processor);
-        Processor outputProcessor = deadLetterChannel.getOutput();
-        if (!Boolean.getBoolean(JmxSystemPropertyKeys.DISABLED)) {
-            InstrumentationProcessor interceptor =
-                assertIsInstanceOf(InstrumentationProcessor.class, outputProcessor);
-            outputProcessor = interceptor.getProcessor();
-        }
-
-        // we are not interested in any other delegate processors in the route (e.g. stream caching)
-        while (outputProcessor instanceof DelegateProcessor) {
-            outputProcessor = ((DelegateProcessor) outputProcessor).getProcessor();
-        }
-
-        assertIsInstanceOf(StreamResequencer.class, outputProcessor);
+        assertIsInstanceOf(DefaultErrorHandler.class, channel.getErrorHandler());
+        assertIsInstanceOf(StreamResequencer.class, channel.getNextProcessor());
     }
     
     private static class Sender extends Thread {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationFinallyBlockNoCatchTest.java Fri Apr 24 06:54:55 2009
@@ -20,7 +20,6 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.model.TryDefinition;
 
 /**
  * No catch blocks but handle all should work
@@ -31,16 +30,13 @@
     protected Processor validator = new MyValidator();
     protected MockEndpoint validEndpoint;
     protected MockEndpoint allEndpoint;
+    protected MockEndpoint deadEndpoint;
 
     public void testValidMessage() throws Exception {
         validEndpoint.expectedMessageCount(1);
         allEndpoint.expectedMessageCount(1);
 
-        try {
-            template.sendBodyAndHeader("direct:start", "<valid/>", "foo", "bar");
-        } catch (Exception e) {
-            // expected
-        }
+        template.sendBodyAndHeader("direct:start", "<valid/>", "foo", "bar");
 
         assertMockEndpointsSatisfied();
     }
@@ -48,8 +44,11 @@
     public void testInvalidMessage() throws Exception {
         validEndpoint.expectedMessageCount(0);
         
-        // allEndpoint receives 1 + 5 messages, ordinary (1 attempt) and redelivery (5 attempts) is involved
-        allEndpoint.expectedMessageCount(1 + 5);
+        // allEndpoint should only receive 1 when the message is being moved to the dead letter queue
+        allEndpoint.expectedMessageCount(1);
+
+        // regular error handler is disbled for try .. catch .. finally
+        deadEndpoint.expectedMessageCount(0);
 
         try {
             template.sendBodyAndHeader("direct:start", "<invalid/>", "foo", "notMatchedHeaderValue");
@@ -66,18 +65,21 @@
 
         validEndpoint = resolveMandatoryEndpoint("mock:valid", MockEndpoint.class);
         allEndpoint = resolveMandatoryEndpoint("mock:all", MockEndpoint.class);
+        deadEndpoint = resolveMandatoryEndpoint("mock:dead", MockEndpoint.class);
     }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                // use little delay to run unit test fast
-                errorHandler(deadLetterChannel().delay(25));
+                // use dead letter channel that supports redeliveries
+                errorHandler(deadLetterChannel("mock:dead").delay(0).maximumRedeliveries(3).logStackTrace(false));
 
-                TryDefinition tryType = from("direct:start").doTry().
-                        process(validator).
-                        to("mock:valid");
-                tryType.doFinally().to("mock:all");
+                from("direct:start")
+                    .doTry()
+                        .process(validator)
+                        .to("mock:valid")
+                    .doFinally()
+                        .to("mock:all");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationTest.java Fri Apr 24 06:54:55 2009
@@ -89,11 +89,11 @@
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:start").
-                        doTry().
-                        process(validator).
-                        to("mock:valid").
-                        doCatch(ValidationException.class).to("mock:invalid");
+                from("direct:start")
+                    .doTry()
+                        .process(validator).to("mock:valid")
+                    .doCatch(ValidationException.class)
+                        .to("mock:invalid");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationWithHandlePipelineAndExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationWithHandlePipelineAndExceptionTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationWithHandlePipelineAndExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ValidationWithHandlePipelineAndExceptionTest.java Fri Apr 24 06:54:55 2009
@@ -31,10 +31,15 @@
             public void configure() {
                 errorHandler(deadLetterChannel("mock:error").delay(0).maximumRedeliveries(3));
 
-                onException(ValidationException.class).to("mock:invalid");
+                onException(ValidationException.class).to("mock:outer");
 
-                from("direct:start").doTry().process(validator).to("mock:valid").doCatch(
-                        ValidationException.class).process(validator);
+                from("direct:start")
+                    .doTry()
+                        .process(validator)
+                        .to("mock:valid")
+                    .doCatch(ValidationException.class)
+                        .to("mock:invalid")
+                        .process(validator);
             }
         };
     }

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java?rev=768186&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java Fri Apr 24 06:54:55 2009
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.onexception;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class ErrorOccuredInOnExceptionRoute extends ContextTestSupport {
+
+    public void testErrorInOnException() throws Exception {
+        getMockEndpoint("mock:onFunc").expectedMessageCount(1);
+        getMockEndpoint("mock:doneFunc").expectedMessageCount(0);
+        getMockEndpoint("mock:tech").expectedMessageCount(1);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // TODO: Should also work with DLC
+                // will be possible when we remove the AsyncProcessor so the processing logic
+                // is much easier to deal with
+                // errorHandler(deadLetterChannel("mock:dead").disableRedelivery());
+
+                onException(MyTechnicalException.class)
+                    .handled(true)
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            // System.out.println("tech");
+                        }
+                    })
+                    .to("mock:tech");
+
+                onException(MyFunctionalException.class)
+                    .handled(true)
+                    .to("mock:onFunc")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                // System.out.println("func");
+                                throw new MyTechnicalException("Tech error");
+                            }
+                        })
+                    .to("mock:doneFunc");
+
+                // in this regular route the processing failed
+                from("direct:start")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws Exception {
+                            throw new MyFunctionalException("Func error");
+                        }
+                    });
+            }
+        };
+
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/ErrorOccuredInOnExceptionRoute.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexRouteTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexRouteTest.java Fri Apr 24 06:54:55 2009
@@ -107,7 +107,8 @@
             public void configure() throws Exception {
                 // START SNIPPET: e1
                 // global error handler
-                errorHandler(deadLetterChannel("mock:error"));
+                // as its based on a unit test we do not have any delays between and do not log the stack trace
+                errorHandler(deadLetterChannel("mock:error").delay(0).logStackTrace(false));
 
                 // shared for both routes
                 onException(MyTechnicalException.class).handled(true).maximumRedeliveries(2).to("mock:tech.error");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexWithNestedErrorHandlerRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexWithNestedErrorHandlerRouteTest.java?rev=768186&r1=768185&r2=768186&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexWithNestedErrorHandlerRouteTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/onexception/OnExceptionComplexWithNestedErrorHandlerRouteTest.java Fri Apr 24 06:54:55 2009
@@ -46,7 +46,8 @@
             @Override
             public void configure() throws Exception {
                 // global error handler
-                errorHandler(deadLetterChannel("mock:error"));
+                // as its based on a unit test we do not have any delays between and do not log the stack trace
+                errorHandler(deadLetterChannel("mock:error").delay(0).logStackTrace(false));
 
                 // shared for both routes
                 onException(MyTechnicalException.class).handled(true).maximumRedeliveries(2).to("mock:tech.error");



Mime
View raw message