activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r520860 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/builder/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/ test/java/org/apache/...
Date Wed, 21 Mar 2007 12:33:36 GMT
Author: jstrachan
Date: Wed Mar 21 05:33:32 2007
New Revision: 520860

URL: http://svn.apache.org/viewvc?view=rev&rev=520860
Log:
added support for DeadLetterChannel, Multicast and Pipeline patterns along with adding an error handler to the RouteBuilder so folks can configure the dead letter policy and so forth

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java   (with props)
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Wed Mar 21 05:33:32 2007
@@ -40,6 +40,12 @@
     E createExchange();
 
     /**
+     * Creates a new exchange for communicating with this exchange using the given exchange to pre-populate the values
+     * of the headers and messages
+     */
+    E createExchange(E exchange);
+
+    /**
      * Called by the container to Activate the endpoint.  Once activated,
      * the endpoint will start delivering inbound message exchanges
      * that are received to the specified processor.

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Mar 21 05:33:32 2007
@@ -83,4 +83,11 @@
      * destination
      */
     Exchange copy();
+
+    /**
+     * Copies the data into this exchange from the given exchange
+     *
+     * #param source is the source from which headers and messages will be copied 
+     */
+    void copyFrom(Exchange source);
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java Wed Mar 21 05:33:32 2007
@@ -18,6 +18,7 @@
 
 import org.apache.camel.Expression;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 
 /**
  * Base class for implementation inheritance
@@ -26,6 +27,14 @@
  */
 public abstract class BuilderSupport<E extends Exchange> {
 
+    private ErrorHandlerBuilder<E> errorHandlerBuilder;
+                                                    
+    protected BuilderSupport() {
+    }
+
+    // Builder methods
+    //-------------------------------------------------------------------------
+
     /**
      * Returns a predicate and value builder for headers on an exchange
      */
@@ -66,5 +75,28 @@
         return new ValueBuilder<E>(expression);
     }
 
+
+    // Properties
+    //-------------------------------------------------------------------------
+
+    protected BuilderSupport(BuilderSupport<E> parent) {
+        if (parent.errorHandlerBuilder != null) {
+            this.errorHandlerBuilder = parent.errorHandlerBuilder.copy();
+        }
+    }
+
+    public ErrorHandlerBuilder<E> getErrorHandlerBuilder() {
+        if (errorHandlerBuilder == null) {
+            errorHandlerBuilder = new DeadLetterChannelBuilder<E>();
+        }
+        return errorHandlerBuilder;
+    }
+
+    /**
+     * Sets the error handler to use with processors created by this builder
+     */
+    public void setErrorHandlerBuilder(ErrorHandlerBuilder<E> errorHandlerBuilder) {
+        this.errorHandlerBuilder = errorHandlerBuilder;
+    }
 
 }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,128 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.Expression;
+import org.apache.camel.processor.RedeliveryPolicy;
+import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.RecipientList;
+
+/**
+ * A builder of a <a href="http://activemq.apache.org/camel/dead-letter-channel.html">Dead Letter Channel</a>
+ * 
+ * @version $Revision$
+ */
+public class DeadLetterChannelBuilder<E extends Exchange> extends BuilderSupport<E> implements ErrorHandlerBuilder<E> {
+    private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+    private ProcessorFactory<E> deadLetterFactory;
+    private Processor<E> defaultDeadLetterEndpoint;
+    private Expression<E> defaultDeadLetterEndpointExpression;
+    private String defaultDeadLetterEndpointUri = "log:org.apache.camel.DeadLetterChannel:error";
+
+    public DeadLetterChannelBuilder() {
+    }
+
+    public DeadLetterChannelBuilder(ProcessorFactory<E> deadLetterFactory) {
+        this.deadLetterFactory = deadLetterFactory;
+    }
+
+    public ErrorHandlerBuilder<E> copy() {
+        DeadLetterChannelBuilder<E> answer = new DeadLetterChannelBuilder<E>(deadLetterFactory);
+        answer.setRedeliveryPolicy(getRedeliveryPolicy().copy());
+        return answer;
+    }
+
+    public Processor<E> createErrorHandler(Processor<E> processor) {
+        Processor<E> deadLetter = getDeadLetterFactory().createProcessor();
+        return new DeadLetterChannel<E>(processor, deadLetter, getRedeliveryPolicy());
+    }
+
+    public RedeliveryPolicy getRedeliveryPolicy() {
+        return redeliveryPolicy;
+    }
+
+    /**
+     * Sets the redelivery policy
+     */
+    public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
+        this.redeliveryPolicy = redeliveryPolicy;
+    }
+
+    public ProcessorFactory<E> getDeadLetterFactory() {
+        if (deadLetterFactory == null) {
+            deadLetterFactory = new ProcessorFactory<E>() {
+                public Processor<E> createProcessor() {
+                    return getDefaultDeadLetterEndpoint();
+                }
+            };
+        }
+        return deadLetterFactory;
+    }
+
+    /**
+     * Sets the default dead letter queue factory
+     */
+    public void setDeadLetterFactory(ProcessorFactory<E> deadLetterFactory) {
+        this.deadLetterFactory = deadLetterFactory;
+    }
+
+     public Processor<E> getDefaultDeadLetterEndpoint() {
+         if (defaultDeadLetterEndpoint == null) {
+            defaultDeadLetterEndpoint = new RecipientList<E>(getDefaultDeadLetterEndpointExpression());
+         }
+         return defaultDeadLetterEndpoint;
+     }
+
+     /**
+      * Sets the default dead letter endpoint used
+      */
+     public void setDefaultDeadLetterEndpoint(Processor<E> defaultDeadLetterEndpoint) {
+         this.defaultDeadLetterEndpoint = defaultDeadLetterEndpoint;
+     }
+
+    public Expression<E> getDefaultDeadLetterEndpointExpression() {
+        if (defaultDeadLetterEndpointExpression == null) {
+            defaultDeadLetterEndpointExpression = ExpressionBuilder.constantExpression(getDefaultDeadLetterEndpointUri());
+        }
+        return defaultDeadLetterEndpointExpression;
+    }
+
+    /**
+     * Sets the expression used to decide the dead letter channel endpoint for an exchange
+     * if no factory is provided via {@link #setDeadLetterFactory(ProcessorFactory)}
+     */
+    public void setDefaultDeadLetterEndpointExpression(Expression<E> defaultDeadLetterEndpointExpression) {
+        this.defaultDeadLetterEndpointExpression = defaultDeadLetterEndpointExpression;
+    }
+
+    public String getDefaultDeadLetterEndpointUri() {
+        return defaultDeadLetterEndpointUri;
+    }
+
+    /**
+     * Sets the default dead letter endpoint URI used if no factory is provided via {@link #setDeadLetterFactory(ProcessorFactory)}
+     * and no expression is provided via {@link #setDefaultDeadLetterEndpointExpression(Expression)}
+     *
+     * @param defaultDeadLetterEndpointUri the default URI if no deadletter factory or expression is provided
+     */
+    public void setDefaultDeadLetterEndpointUri(String defaultDeadLetterEndpointUri) {
+        this.defaultDeadLetterEndpointUri = defaultDeadLetterEndpointUri;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,36 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+/**
+ * @version $Revision$
+ */
+public interface ErrorHandlerBuilder<E extends Exchange> {
+    /**
+     * Creates a copy of this builder
+     */
+    ErrorHandlerBuilder<E> copy();
+
+    /**
+     * Creates the error handler interceptor
+     */
+    Processor<E> createErrorHandler(Processor<E> processor);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java Wed Mar 21 05:33:32 2007
@@ -81,7 +81,7 @@
 
             @Override
             public String toString() {
-                return "Body";
+                return "body";
             }
         };
     }
@@ -98,7 +98,7 @@
 
             @Override
             public String toString() {
-                return "BodyAs[" + type.getName() + "]";
+                return "bodyAs[" + type.getName() + "]";
             }
         };
     }
@@ -114,7 +114,7 @@
 
             @Override
             public String toString() {
-                return "OutBody";
+                return "outBody";
             }
         };
     }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Wed Mar 21 05:33:32 2007
@@ -20,11 +20,14 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.processor.InterceptorProcessor;
+import org.apache.camel.processor.MulticastProcessor;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Collection;
 
 /**
  * @version $Revision$
@@ -52,6 +55,23 @@
         return getBuilder().endpoint(uri);
     }
 
+    public List<Endpoint<E>> endpoints(String... uris) {
+        List<Endpoint<E>> endpoints = new ArrayList<Endpoint<E>>();
+        for (String uri : uris) {
+            endpoints.add(endpoint(uri));
+        }
+        return endpoints;
+    }
+
+    public List<Endpoint<E>> endpoints(Endpoint<E>... uris) {
+        List<Endpoint<E>> endpoints = new ArrayList<Endpoint<E>>();
+        for (Endpoint<E> uri : uris) {
+            endpoints.add(uri);
+        }
+        return endpoints;
+    }
+
+
     /**
      * Sends the exchange to the given endpoint URI
      */
@@ -70,29 +90,52 @@
 
 
     /**
-     * Sends the exchange to the given endpoint URI
+     * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
      */
     public ProcessorFactory<E> to(String... uris) {
-        ProcessorFactory<E> answer = null;
-        for (String uri : uris) {
-            answer = to(endpoint(uri));
-        }
-        return answer;
+        return to(endpoints(uris));
     }
 
     /**
-     * Sends the exchange to the given endpoint
+     * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern
      */
     public ProcessorFactory<E> to(Endpoint<E>... endpoints) {
-        ProcessorFactory<E> answer = null;
-        for (Endpoint<E> endpoint : endpoints) {
-            answer = to(endpoint);          
-        }
-        return answer;
+        return to(endpoints(endpoints));
     }
 
 
     /**
+     * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern
+     */
+    public ProcessorFactory<E> to(Collection<Endpoint<E>> endpoints) {
+        return addProcessBuilder(new MulticastBuilder<E>(this, endpoints));
+    }
+
+    /**
+     * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
+     * and for request/response the output of one endpoint will be the input of the next endpoint
+     */
+    public ProcessorFactory<E> pipeline(String... uris) {
+        return pipeline(endpoints(uris));
+    }
+
+    /**
+     * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
+     * and for request/response the output of one endpoint will be the input of the next endpoint
+     */
+    public ProcessorFactory<E> pipeline(Endpoint<E>... endpoints) {
+        return pipeline(endpoints(endpoints));
+    }
+
+    /**
+     * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn
+     * and for request/response the output of one endpoint will be the input of the next endpoint
+     */
+    public ProcessorFactory<E> pipeline(Collection<Endpoint<E>> endpoints) {
+        return addProcessBuilder(new PipelineBuilder<E>(this, endpoints));
+    }
+
+    /**
      * Adds the custom processor to this destination
      */
     public ConstantProcessorBuilder<E> process(Processor<E> processor) {
@@ -161,8 +204,9 @@
         return from;
     }
 
-    public void addProcessBuilder(ProcessorFactory<E> processFactory) {
+    public ProcessorFactory<E> addProcessBuilder(ProcessorFactory<E> processFactory) {
         processFactories.add(processFactory);
+        return processFactory;
     }
 
     public void addProcessor(Processor<E> processor) {
@@ -173,7 +217,7 @@
         List<Processor<E>> answer = new ArrayList<Processor<E>>();
 
         for (ProcessorFactory<E> processFactory : processFactories) {
-            Processor<E> processor = processFactory.createProcessor();
+            Processor<E> processor = makeProcessor(processFactory);
             if (processor == null) {
                 throw new IllegalArgumentException("No processor created for processBuilder: " + processFactory);
             }
@@ -188,6 +232,14 @@
         else {
             return new CompositeProcessor<E>(answer);
         }
+    }
+
+    /**
+     * Creates the processor and wraps it in any necessary interceptors and error handlers
+     */
+    protected Processor<E> makeProcessor(ProcessorFactory<E> processFactory) {
+        Processor<E> processor = processFactory.createProcessor();
+        return getErrorHandlerBuilder().createErrorHandler(processor);
     }
 
     public List<Processor<E>> getProcessors() {

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,44 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.MulticastProcessor;
+
+import java.util.Collection;
+
+/**
+ * A builder for the {@link MulticastProcessor} pattern
+ *
+ * @version $Revision$
+ */
+public class MulticastBuilder<E extends Exchange> extends FromBuilder<E> {
+    private final Collection<Endpoint<E>> endpoints;
+
+    public MulticastBuilder(FromBuilder<E> parent, Collection<Endpoint<E>> endpoints) {
+        super(parent);
+        this.endpoints = endpoints;
+    }
+
+    @Override
+    public Processor<E> createProcessor() {
+        return new MulticastProcessor<E>(endpoints);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,45 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.processor.MulticastProcessor;
+import org.apache.camel.processor.Pipeline;
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+
+import java.util.Collection;
+
+/**
+ * A builder for the {@link Pipeline} pattern
+ *
+ * @version $Revision$
+ */
+public class PipelineBuilder<E extends Exchange> extends FromBuilder<E> {
+    private final Collection<Endpoint<E>> endpoints;
+
+    public PipelineBuilder(FromBuilder<E> parent, Collection<Endpoint<E>> endpoints) {
+        super(parent);
+        this.endpoints = endpoints;
+    }
+
+    @Override
+    public Processor<E> createProcessor() {
+        return new Pipeline<E>(endpoints);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java Wed Mar 21 05:33:32 2007
@@ -38,6 +38,11 @@
             public boolean evaluate(E exchange) {
                 return left.evaluate(exchange) && right.evaluate(exchange);
             }
+
+            @Override
+            public String toString() {
+                return "(" + left + ") and (" + right + ")";
+            }
         };
     }
 
@@ -51,10 +56,14 @@
             public boolean evaluate(E exchange) {
                 return left.evaluate(exchange) || right.evaluate(exchange);
             }
+
+            @Override
+            public String toString() {
+                return "(" + left + ") or (" + right + ")";
+            }
         };
     }
 
-
     public static <E extends Exchange> Predicate<E> isEqualTo(final Expression<E> left, final Expression<E> right) {
         notNull(left, "left");
         notNull(right, "right");
@@ -65,6 +74,11 @@
                 Object value2 = right.evaluate(exchange);
                 return ObjectHelper.equals(value1, value2);
             }
+
+            @Override
+            public String toString() {
+                return left + " == " + right;
+            }
         };
     }
 
@@ -78,6 +92,11 @@
                 Object value2 = right.evaluate(exchange);
                 return !ObjectHelper.equals(value1, value2);
             }
+
+            @Override
+            public String toString() {
+                return left + " != " + right;
+            }
         };
     }
 
@@ -91,6 +110,11 @@
                 Object value2 = right.evaluate(exchange);
                 return ObjectHelper.compare(value1, value2) < 0;
             }
+
+            @Override
+            public String toString() {
+                return left + " < " + right;
+            }
         };
     }
 
@@ -104,6 +128,11 @@
                 Object value2 = right.evaluate(exchange);
                 return ObjectHelper.compare(value1, value2) <= 0;
             }
+
+            @Override
+            public String toString() {
+                return left + " <= " + right;
+            }
         };
     }
 
@@ -117,6 +146,11 @@
                 Object value2 = right.evaluate(exchange);
                 return ObjectHelper.compare(value1, value2) > 0;
             }
+
+            @Override
+            public String toString() {
+                return left + " > " + right;
+            }
         };
     }
 
@@ -130,6 +164,11 @@
                 Object value2 = right.evaluate(exchange);
                 return ObjectHelper.compare(value1, value2) >= 0;
             }
+
+            @Override
+            public String toString() {
+                return left + " >= " + right;
+            }
         };
     }
 
@@ -142,6 +181,11 @@
                 Object value = expression.evaluate(exchange);
                 return type.isInstance(value);
             }
+
+            @Override
+            public String toString() {
+                return expression + " instanceof " + type.getName();
+            }
         };
     }
 
@@ -153,6 +197,11 @@
                 Object value = expression.evaluate(exchange);
                 return value == null;
             }
+
+            @Override
+            public String toString() {
+                return expression + " == null";
+            }
         };
     }
 
@@ -164,6 +213,11 @@
                 Object value = expression.evaluate(exchange);
                 return value != null;
             }
+
+            @Override
+            public String toString() {
+                return expression + " != null";
+            }
         };
     }
 
@@ -177,8 +231,11 @@
                 Object value2 = right.evaluate(exchange);
                 return ObjectHelper.contains(value1, value2);
             }
+
+            @Override
+            public String toString() {
+                return left + ".contains(" + right + ")";
+            }
         };
     }
-
-
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Wed Mar 21 05:33:32 2007
@@ -25,9 +25,11 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
+ * A default endpoint useful for implementation inheritence
+ * 
  * @version $Revision$
  */
-public abstract class DefaultEndpoint<E> implements Endpoint<E> {
+public abstract class DefaultEndpoint<E extends Exchange> implements Endpoint<E> {
     private String endpointUri;
     private CamelContext context;
     private Processor<E> inboundProcessor;
@@ -92,6 +94,13 @@
             activated.set(false);
             doDeactivate();
         }
+    }
+
+
+    public E createExchange(E exchange) {
+        E answer = createExchange();
+        answer.copyFrom(exchange);
+        return answer;
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Wed Mar 21 05:33:32 2007
@@ -41,17 +41,18 @@
 
     public Exchange copy() {
         Exchange exchange = newInstance();
-        if (exchange instanceof DefaultExchange) {
-            DefaultExchange defaultExchange = (DefaultExchange) exchange;
-            defaultExchange.setHeaders(getHeaders().copy());
-            defaultExchange.setIn(getIn().copy());
-            defaultExchange.setOut(getOut().copy());
-            defaultExchange.setFault(getFault().copy());
-            defaultExchange.setException(getException());
-        }
+        exchange.copyFrom(this);
         return exchange;
     }
 
+    public void copyFrom(Exchange exchange) {
+        setHeaders(exchange.getHeaders().copy());
+        setIn(exchange.getIn().copy());
+        setOut(exchange.getOut().copy());
+        setFault(exchange.getFault().copy());
+        setException(exchange.getException());
+    }
+
     public Exchange newInstance() {
         return new DefaultExchange(context);
     }
@@ -124,5 +125,4 @@
     protected Message createOutMessage() {
         return new DefaultMessage();
     }
-
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Wed Mar 21 05:33:32 2007
@@ -48,6 +48,11 @@
         this.redeliveryPolicy = redeliveryPolicy;
     }
 
+    @Override
+    public String toString() {
+        return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]";
+    }
+
     public void onExchange(E exchange) {
         int redeliveryCounter = 0;
         long redeliveryDelay = 0;
@@ -77,6 +82,21 @@
 
     // Properties
     //-------------------------------------------------------------------------
+
+    /**
+     * Returns the output processor
+     */
+    public Processor<E> getOutput() {
+        return output;
+    }
+
+    /**
+     * Returns the dead letter that message exchanges will be sent to if the redelivery attempts fail
+     */
+    public Processor<E> getDeadLetter() {
+        return deadLetter;
+    }
+
     public RedeliveryPolicy getRedeliveryPolicy() {
         return redeliveryPolicy;
     }

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+
+import java.util.Collection;
+
+/**
+ * Implements the Multicast pattern to send a message exchange to a number of endpoints, each endpoint receiving a copy of
+ * the message exchange.
+ *
+ * @version $Revision$
+ */
+public class MulticastProcessor<E extends Exchange> implements Processor<E> {
+    private Collection<Endpoint<E>> endpoints;
+
+    public MulticastProcessor(Collection<Endpoint<E>> endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    @Override
+    public String toString() {
+        return "Multicast" + endpoints;
+    }
+
+    public void onExchange(E exchange) {
+        for (Endpoint<E> endpoint : endpoints) {
+            E copy = copyExchangeStrategy(endpoint, exchange);
+            endpoint.onExchange(copy);
+        }
+    }
+
+    /**
+     * Returns the endpoints to multicast to
+     */
+    public Collection<Endpoint<E>> getEndpoints() {
+        return endpoints;
+    }
+
+    /**
+     * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
+     * {@link Pipeline} will not clone the exchange
+     *
+     * @param endpoint the endpoint that the exchange will be sent to
+     * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
+     */
+    protected E copyExchangeStrategy(Endpoint<E> endpoint, E exchange) {
+        return endpoint.createExchange(exchange);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=auto&rev=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed Mar 21 05:33:32 2007
@@ -0,0 +1,86 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Processor;
+
+import java.util.Collection;
+
+/**
+ * Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working
+ * with request/response message exchanges.
+ *  
+ * @version $Revision$
+ */
+public class Pipeline<E extends Exchange> implements Processor<E> {
+    private Collection<Endpoint<E>> endpoints;
+
+    public Pipeline(Collection<Endpoint<E>> endpoints) {
+        this.endpoints = endpoints;
+    }
+
+    public void onExchange(E exchange) {
+        E nextExchange = exchange;
+        boolean first = true;
+        for (Endpoint<E> endpoint : endpoints) {
+            if (first) {
+                first = false;
+            }
+            else {
+                nextExchange = createNextExchange(endpoint, nextExchange);
+            }
+            endpoint.onExchange(nextExchange);
+        }
+    }
+
+    /**
+     * Strategy method to create the next exchange from the
+     *
+     * @param endpoint the endpoint the exchange will be sent to
+     * @param previousExchange the previous exchange
+     * @return a new exchange
+     */
+    protected E createNextExchange(Endpoint<E> endpoint, E previousExchange) {
+        E answer = endpoint.createExchange(previousExchange);
+
+        // now lets set the input of the next exchange to the output of the previous message if it is not null
+        Object output = previousExchange.getOut().getBody();
+        if (output != null) {
+            answer.getIn().setBody(output);
+        }
+        return answer;
+    }
+
+    /**
+     * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the
+     * {@link Pipeline} will not clone the exchange
+     *
+     * @param exchange
+     * @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned.
+     */
+    protected E copyExchangeStrategy(E exchange) {
+        return (E) exchange.copy();
+    }
+
+    @Override
+    public String toString() {
+        return "Pipeline" + endpoints;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed Mar 21 05:33:32 2007
@@ -18,6 +18,7 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.ExchangeHelper;
 import static org.apache.camel.util.ObjectHelper.notNull;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -56,19 +57,7 @@
         }
     }
 
-    @SuppressWarnings({"unchecked"})
     protected Endpoint<E> resolveEndpoint(E exchange, Object recipient) {
-        Endpoint<E> endpoint;
-        if (recipient instanceof Endpoint) {
-            endpoint = (Endpoint<E>) recipient;
-        }
-        else {
-            String uri = recipient.toString();
-            endpoint = (Endpoint<E>) exchange.getContext().resolveEndpoint(uri);
-            if (endpoint == null) {
-                throw new NoSuchEndpointException(uri);
-            }
-        }
-        return endpoint;
+        return ExchangeHelper.resolveEndpoint(exchange, recipient);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Wed Mar 21 05:33:32 2007
@@ -40,6 +40,11 @@
     public RedeliveryPolicy() {
     }
 
+    @Override
+    public String toString() {
+        return "RedeliveryPolicy[maximumRedeliveries=" + maximumRedeliveries + "]";
+    }
+
     public RedeliveryPolicy copy() {
         try {
             return (RedeliveryPolicy) clone();

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Wed Mar 21 05:33:32 2007
@@ -17,8 +17,9 @@
  */
 package org.apache.camel.util;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.Expression;
+import org.apache.camel.NoSuchEndpointException;
 
 /**
  * Some helper methods for working with {@link Exchange} objects
@@ -26,4 +27,30 @@
  * @version $Revision$
  */
 public class ExchangeHelper {
+
+    /**
+     * Attempts to resolve the endpoint for the given value
+     *
+     * @param exchange the message exchange being processed
+     * @param value the value which can be an {@link Endpoint} or an object which provides a String representation
+     * of an endpoint via {@link #toString()}
+     *
+     * @return the endpoint
+     * @throws NoSuchEndpointException if the endpoint cannot be resolved
+     */
+    @SuppressWarnings({"unchecked"})
+    public static <E extends Exchange> Endpoint<E> resolveEndpoint(E exchange, Object value) throws NoSuchEndpointException {
+        Endpoint<E> endpoint;
+        if (value instanceof Endpoint) {
+            endpoint = (Endpoint<E>) value;
+        }
+        else {
+            String uri = value.toString();
+            endpoint = (Endpoint<E>) exchange.getContext().resolveEndpoint(uri);
+            if (endpoint == null) {
+                throw new NoSuchEndpointException(uri);
+            }
+        }
+        return endpoint;
+    }
 }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java?view=diff&rev=520860&r1=520859&r2=520860
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java Wed Mar 21 05:33:32 2007
@@ -19,12 +19,13 @@
 import junit.framework.TestCase;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.ChoiceProcessor;
-import org.apache.camel.processor.CompositeProcessor;
 import org.apache.camel.processor.SendProcessor;
 import org.apache.camel.processor.FilterProcessor;
 import org.apache.camel.processor.InterceptorProcessor;
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.Splitter;
+import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.MulticastProcessor;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -68,7 +69,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a SendProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof SendProcessor);
             SendProcessor sendProcessor = (SendProcessor) processor;
@@ -76,7 +77,7 @@
         }
     }
 
-	protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
+    protected RouteBuilder<Exchange> buildSimpleRouteWithHeaderPredicate() {
 		// START SNIPPET: e2
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
@@ -98,17 +99,18 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a FilterProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof FilterProcessor);
             FilterProcessor filterProcessor = (FilterProcessor) processor;
 
-            SendProcessor sendProcessor = (SendProcessor) filterProcessor.getProcessor();
+            SendProcessor sendProcessor = (SendProcessor) unwrapErrorHandler(filterProcessor.getProcessor());
             assertEquals("Endpoint URI", "queue:b", sendProcessor.getDestination().getEndpointUri());
         }
     }
 
-	protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
+
+    protected RouteBuilder<Exchange> buildSimpleRouteWithChoice() {
 		// START SNIPPET: e3
         RouteBuilder<Exchange> builder = new RouteBuilder<Exchange>() {
             public void configure() {
@@ -133,7 +135,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a ChoiceProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof ChoiceProcessor);
             ChoiceProcessor<Exchange> choiceProcessor = (ChoiceProcessor<Exchange>) processor;
@@ -178,7 +180,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertEquals("Should be called with my processor", myProcessor, processor);
         }
@@ -207,11 +209,11 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a FilterProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof FilterProcessor);
             FilterProcessor filterProcessor = (FilterProcessor) processor;
-            assertEquals("Should be called with my processor", myProcessor, filterProcessor.getProcessor());
+            assertEquals("Should be called with my processor", myProcessor, unwrapErrorHandler(filterProcessor.getProcessor()));
         }
     }
 
@@ -238,18 +240,19 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
+
+            assertTrue("Processor should be a MulticastProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof MulticastProcessor);
+            MulticastProcessor<Exchange> multicastProcessor = (MulticastProcessor<Exchange>) processor;
 
-            assertTrue("Processor should be a CompositeProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof CompositeProcessor);
-            CompositeProcessor<Exchange> compositeProcessor = (CompositeProcessor<Exchange>) processor;
-            List<Processor<Exchange>> processors = new ArrayList<Processor<Exchange>>(compositeProcessor.getProcessors());
-            assertEquals("Should have 2 processors", 2, processors.size());
+            List<Endpoint<Exchange>> endpoints = new ArrayList<Endpoint<Exchange>>(multicastProcessor.getEndpoints());
+            assertEquals("Should have 2 endpoints", 2, endpoints.size());
 
-            assertSendTo(processors.get(0), "queue:tap");
-            assertSendTo(processors.get(1), "queue:b");
+            assertEndpointUri(endpoints.get(0), "queue:tap");
+            assertEndpointUri(endpoints.get(1), "queue:b");
         }
     }
-    
+
     protected RouteBuilder<Exchange> buildRouteWithInterceptor() {
 		interceptor1 = new InterceptorProcessor<Exchange>() {
         };
@@ -288,7 +291,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a interceptor1 but was: " + processor + " with type: " + processor.getClass().getName(), processor==interceptor1);
             InterceptorProcessor<Exchange> p1 = (InterceptorProcessor<Exchange>) processor;
@@ -311,7 +314,6 @@
         };
         // END SNIPPET: e7
 
-
         Map<Endpoint<Exchange>, Processor<Exchange>> routeMap = builder.getRouteMap();
         System.out.println("Created map: " + routeMap);
 
@@ -320,7 +322,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             System.out.println("processor: " + processor);
             /* TODO
@@ -367,7 +369,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a RecipientList but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof RecipientList);
             RecipientList<Exchange> p1 = (RecipientList<Exchange>) processor;
@@ -396,7 +398,7 @@
         for (Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route : routes) {
             Endpoint<Exchange> key = route.getKey();
             assertEquals("From endpoint", "queue:a", key.getEndpointUri());
-            Processor processor = route.getValue();
+            Processor processor = getProcessorWithoutErrorHandler(route);
 
             assertTrue("Processor should be a Splitter but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof Splitter);
             Splitter<Exchange> p1 = (Splitter<Exchange>) processor;
@@ -404,9 +406,29 @@
     }
 
     protected void assertSendTo(Processor processor, String uri) {
+        processor = unwrapErrorHandler(processor);
+        
         assertTrue("Processor should be a SendProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof SendProcessor);
 
         SendProcessor sendProcessor = (SendProcessor) processor;
         assertEquals("Endpoint URI", uri, sendProcessor.getDestination().getEndpointUri());
+    }
+
+    /**
+     * By default routes should be wrapped in the {@link DeadLetterChannel} so lets unwrap that and return the actual processor
+     */
+    protected Processor<Exchange> getProcessorWithoutErrorHandler(Map.Entry<Endpoint<Exchange>, Processor<Exchange>> route) {
+        Processor<Exchange> processor = route.getValue();
+        return unwrapErrorHandler(processor);
+    }
+
+    protected Processor<Exchange> unwrapErrorHandler(Processor<Exchange> processor) {
+        assertTrue("Processor should be a DeadLetterChannel but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof DeadLetterChannel);
+        DeadLetterChannel deadLetter = (DeadLetterChannel) processor;
+        return deadLetter.getOutput();
+    }
+
+    protected void assertEndpointUri(Endpoint<Exchange> endpoint, String uri) {
+        assertEquals("Endoint uri for: " + endpoint, uri, endpoint.getEndpointUri());
     }
 }



Mime
View raw message