Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 37239 invoked from network); 21 Mar 2007 12:34:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 21 Mar 2007 12:34:00 -0000 Received: (qmail 28317 invoked by uid 500); 21 Mar 2007 12:34:08 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 28302 invoked by uid 500); 21 Mar 2007 12:34:08 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 28293 invoked by uid 99); 21 Mar 2007 12:34:08 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Mar 2007 05:34:08 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 21 Mar 2007 05:33:58 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 02E901A983E; Wed, 21 Mar 2007 05:33:38 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r520860 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/builder/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/ test/java/org/apache/... Date: Wed, 21 Mar 2007 12:33:36 -0000 To: commits@activemq.apache.org From: jstrachan@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070321123338.02E901A983E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jstrachan Date: Wed Mar 21 05:33:32 2007 New Revision: 520860 URL: http://svn.apache.org/viewvc?view=rev&rev=520860 Log: added support for DeadLetterChannel, Multicast and Pipeline patterns along with adding an error handler to the RouteBuilder so folks can configure the dead letter policy and so forth Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (with props) activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (with props) activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (with props) activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (with props) activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (with props) activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (with props) Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Endpoint.java Wed Mar 21 05:33:32 2007 @@ -40,6 +40,12 @@ E createExchange(); /** + * Creates a new exchange for communicating with this exchange using the given exchange to pre-populate the values + * of the headers and messages + */ + E createExchange(E exchange); + + /** * Called by the container to Activate the endpoint. Once activated, * the endpoint will start delivering inbound message exchanges * that are received to the specified processor. Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Wed Mar 21 05:33:32 2007 @@ -83,4 +83,11 @@ * destination */ Exchange copy(); + + /** + * Copies the data into this exchange from the given exchange + * + * #param source is the source from which headers and messages will be copied + */ + void copyFrom(Exchange source); } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BuilderSupport.java Wed Mar 21 05:33:32 2007 @@ -18,6 +18,7 @@ import org.apache.camel.Expression; import org.apache.camel.Exchange; +import org.apache.camel.Processor; /** * Base class for implementation inheritance @@ -26,6 +27,14 @@ */ public abstract class BuilderSupport { + private ErrorHandlerBuilder errorHandlerBuilder; + + protected BuilderSupport() { + } + + // Builder methods + //------------------------------------------------------------------------- + /** * Returns a predicate and value builder for headers on an exchange */ @@ -66,5 +75,28 @@ return new ValueBuilder(expression); } + + // Properties + //------------------------------------------------------------------------- + + protected BuilderSupport(BuilderSupport parent) { + if (parent.errorHandlerBuilder != null) { + this.errorHandlerBuilder = parent.errorHandlerBuilder.copy(); + } + } + + public ErrorHandlerBuilder getErrorHandlerBuilder() { + if (errorHandlerBuilder == null) { + errorHandlerBuilder = new DeadLetterChannelBuilder(); + } + return errorHandlerBuilder; + } + + /** + * Sets the error handler to use with processors created by this builder + */ + public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) { + this.errorHandlerBuilder = errorHandlerBuilder; + } } Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?view=auto&rev=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java Wed Mar 21 05:33:32 2007 @@ -0,0 +1,128 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Expression; +import org.apache.camel.processor.RedeliveryPolicy; +import org.apache.camel.processor.DeadLetterChannel; +import org.apache.camel.processor.RecipientList; + +/** + * A builder of a Dead Letter Channel + * + * @version $Revision$ + */ +public class DeadLetterChannelBuilder extends BuilderSupport implements ErrorHandlerBuilder { + private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); + private ProcessorFactory deadLetterFactory; + private Processor defaultDeadLetterEndpoint; + private Expression defaultDeadLetterEndpointExpression; + private String defaultDeadLetterEndpointUri = "log:org.apache.camel.DeadLetterChannel:error"; + + public DeadLetterChannelBuilder() { + } + + public DeadLetterChannelBuilder(ProcessorFactory deadLetterFactory) { + this.deadLetterFactory = deadLetterFactory; + } + + public ErrorHandlerBuilder copy() { + DeadLetterChannelBuilder answer = new DeadLetterChannelBuilder(deadLetterFactory); + answer.setRedeliveryPolicy(getRedeliveryPolicy().copy()); + return answer; + } + + public Processor createErrorHandler(Processor processor) { + Processor deadLetter = getDeadLetterFactory().createProcessor(); + return new DeadLetterChannel(processor, deadLetter, getRedeliveryPolicy()); + } + + public RedeliveryPolicy getRedeliveryPolicy() { + return redeliveryPolicy; + } + + /** + * Sets the redelivery policy + */ + public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) { + this.redeliveryPolicy = redeliveryPolicy; + } + + public ProcessorFactory getDeadLetterFactory() { + if (deadLetterFactory == null) { + deadLetterFactory = new ProcessorFactory() { + public Processor createProcessor() { + return getDefaultDeadLetterEndpoint(); + } + }; + } + return deadLetterFactory; + } + + /** + * Sets the default dead letter queue factory + */ + public void setDeadLetterFactory(ProcessorFactory deadLetterFactory) { + this.deadLetterFactory = deadLetterFactory; + } + + public Processor getDefaultDeadLetterEndpoint() { + if (defaultDeadLetterEndpoint == null) { + defaultDeadLetterEndpoint = new RecipientList(getDefaultDeadLetterEndpointExpression()); + } + return defaultDeadLetterEndpoint; + } + + /** + * Sets the default dead letter endpoint used + */ + public void setDefaultDeadLetterEndpoint(Processor defaultDeadLetterEndpoint) { + this.defaultDeadLetterEndpoint = defaultDeadLetterEndpoint; + } + + public Expression getDefaultDeadLetterEndpointExpression() { + if (defaultDeadLetterEndpointExpression == null) { + defaultDeadLetterEndpointExpression = ExpressionBuilder.constantExpression(getDefaultDeadLetterEndpointUri()); + } + return defaultDeadLetterEndpointExpression; + } + + /** + * Sets the expression used to decide the dead letter channel endpoint for an exchange + * if no factory is provided via {@link #setDeadLetterFactory(ProcessorFactory)} + */ + public void setDefaultDeadLetterEndpointExpression(Expression defaultDeadLetterEndpointExpression) { + this.defaultDeadLetterEndpointExpression = defaultDeadLetterEndpointExpression; + } + + public String getDefaultDeadLetterEndpointUri() { + return defaultDeadLetterEndpointUri; + } + + /** + * Sets the default dead letter endpoint URI used if no factory is provided via {@link #setDeadLetterFactory(ProcessorFactory)} + * and no expression is provided via {@link #setDefaultDeadLetterEndpointExpression(Expression)} + * + * @param defaultDeadLetterEndpointUri the default URI if no deadletter factory or expression is provided + */ + public void setDefaultDeadLetterEndpointUri(String defaultDeadLetterEndpointUri) { + this.defaultDeadLetterEndpointUri = defaultDeadLetterEndpointUri; + } +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java?view=auto&rev=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java Wed Mar 21 05:33:32 2007 @@ -0,0 +1,36 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +/** + * @version $Revision$ + */ +public interface ErrorHandlerBuilder { + /** + * Creates a copy of this builder + */ + ErrorHandlerBuilder copy(); + + /** + * Creates the error handler interceptor + */ + Processor createErrorHandler(Processor processor); +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ErrorHandlerBuilder.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java Wed Mar 21 05:33:32 2007 @@ -81,7 +81,7 @@ @Override public String toString() { - return "Body"; + return "body"; } }; } @@ -98,7 +98,7 @@ @Override public String toString() { - return "BodyAs[" + type.getName() + "]"; + return "bodyAs[" + type.getName() + "]"; } }; } @@ -114,7 +114,7 @@ @Override public String toString() { - return "OutBody"; + return "outBody"; } }; } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Wed Mar 21 05:33:32 2007 @@ -20,11 +20,14 @@ import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.processor.InterceptorProcessor; +import org.apache.camel.processor.MulticastProcessor; +import org.apache.camel.processor.Pipeline; import org.apache.camel.Predicate; import org.apache.camel.Processor; import java.util.ArrayList; import java.util.List; +import java.util.Collection; /** * @version $Revision$ @@ -52,6 +55,23 @@ return getBuilder().endpoint(uri); } + public List> endpoints(String... uris) { + List> endpoints = new ArrayList>(); + for (String uri : uris) { + endpoints.add(endpoint(uri)); + } + return endpoints; + } + + public List> endpoints(Endpoint... uris) { + List> endpoints = new ArrayList>(); + for (Endpoint uri : uris) { + endpoints.add(uri); + } + return endpoints; + } + + /** * Sends the exchange to the given endpoint URI */ @@ -70,29 +90,52 @@ /** - * Sends the exchange to the given endpoint URI + * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern */ public ProcessorFactory to(String... uris) { - ProcessorFactory answer = null; - for (String uri : uris) { - answer = to(endpoint(uri)); - } - return answer; + return to(endpoints(uris)); } /** - * Sends the exchange to the given endpoint + * Sends the exchange to a list of endpoints using the {@link MulticastProcessor} pattern */ public ProcessorFactory to(Endpoint... endpoints) { - ProcessorFactory answer = null; - for (Endpoint endpoint : endpoints) { - answer = to(endpoint); - } - return answer; + return to(endpoints(endpoints)); } /** + * Sends the exchange to a list of endpoint using the {@link MulticastProcessor} pattern + */ + public ProcessorFactory to(Collection> endpoints) { + return addProcessBuilder(new MulticastBuilder(this, endpoints)); + } + + /** + * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn + * and for request/response the output of one endpoint will be the input of the next endpoint + */ + public ProcessorFactory pipeline(String... uris) { + return pipeline(endpoints(uris)); + } + + /** + * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn + * and for request/response the output of one endpoint will be the input of the next endpoint + */ + public ProcessorFactory pipeline(Endpoint... endpoints) { + return pipeline(endpoints(endpoints)); + } + + /** + * Creates a {@link Pipeline} of the list of endpoints so that the message will get processed by each endpoint in turn + * and for request/response the output of one endpoint will be the input of the next endpoint + */ + public ProcessorFactory pipeline(Collection> endpoints) { + return addProcessBuilder(new PipelineBuilder(this, endpoints)); + } + + /** * Adds the custom processor to this destination */ public ConstantProcessorBuilder process(Processor processor) { @@ -161,8 +204,9 @@ return from; } - public void addProcessBuilder(ProcessorFactory processFactory) { + public ProcessorFactory addProcessBuilder(ProcessorFactory processFactory) { processFactories.add(processFactory); + return processFactory; } public void addProcessor(Processor processor) { @@ -173,7 +217,7 @@ List> answer = new ArrayList>(); for (ProcessorFactory processFactory : processFactories) { - Processor processor = processFactory.createProcessor(); + Processor processor = makeProcessor(processFactory); if (processor == null) { throw new IllegalArgumentException("No processor created for processBuilder: " + processFactory); } @@ -188,6 +232,14 @@ else { return new CompositeProcessor(answer); } + } + + /** + * Creates the processor and wraps it in any necessary interceptors and error handlers + */ + protected Processor makeProcessor(ProcessorFactory processFactory) { + Processor processor = processFactory.createProcessor(); + return getErrorHandlerBuilder().createErrorHandler(processor); } public List> getProcessors() { Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java?view=auto&rev=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java Wed Mar 21 05:33:32 2007 @@ -0,0 +1,44 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.processor.MulticastProcessor; + +import java.util.Collection; + +/** + * A builder for the {@link MulticastProcessor} pattern + * + * @version $Revision$ + */ +public class MulticastBuilder extends FromBuilder { + private final Collection> endpoints; + + public MulticastBuilder(FromBuilder parent, Collection> endpoints) { + super(parent); + this.endpoints = endpoints; + } + + @Override + public Processor createProcessor() { + return new MulticastProcessor(endpoints); + } +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/MulticastBuilder.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java?view=auto&rev=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java Wed Mar 21 05:33:32 2007 @@ -0,0 +1,45 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.builder; + +import org.apache.camel.processor.MulticastProcessor; +import org.apache.camel.processor.Pipeline; +import org.apache.camel.Exchange; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; + +import java.util.Collection; + +/** + * A builder for the {@link Pipeline} pattern + * + * @version $Revision$ + */ +public class PipelineBuilder extends FromBuilder { + private final Collection> endpoints; + + public PipelineBuilder(FromBuilder parent, Collection> endpoints) { + super(parent); + this.endpoints = endpoints; + } + + @Override + public Processor createProcessor() { + return new Pipeline(endpoints); + } +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PipelineBuilder.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/PredicateBuilder.java Wed Mar 21 05:33:32 2007 @@ -38,6 +38,11 @@ public boolean evaluate(E exchange) { return left.evaluate(exchange) && right.evaluate(exchange); } + + @Override + public String toString() { + return "(" + left + ") and (" + right + ")"; + } }; } @@ -51,10 +56,14 @@ public boolean evaluate(E exchange) { return left.evaluate(exchange) || right.evaluate(exchange); } + + @Override + public String toString() { + return "(" + left + ") or (" + right + ")"; + } }; } - public static Predicate isEqualTo(final Expression left, final Expression right) { notNull(left, "left"); notNull(right, "right"); @@ -65,6 +74,11 @@ Object value2 = right.evaluate(exchange); return ObjectHelper.equals(value1, value2); } + + @Override + public String toString() { + return left + " == " + right; + } }; } @@ -78,6 +92,11 @@ Object value2 = right.evaluate(exchange); return !ObjectHelper.equals(value1, value2); } + + @Override + public String toString() { + return left + " != " + right; + } }; } @@ -91,6 +110,11 @@ Object value2 = right.evaluate(exchange); return ObjectHelper.compare(value1, value2) < 0; } + + @Override + public String toString() { + return left + " < " + right; + } }; } @@ -104,6 +128,11 @@ Object value2 = right.evaluate(exchange); return ObjectHelper.compare(value1, value2) <= 0; } + + @Override + public String toString() { + return left + " <= " + right; + } }; } @@ -117,6 +146,11 @@ Object value2 = right.evaluate(exchange); return ObjectHelper.compare(value1, value2) > 0; } + + @Override + public String toString() { + return left + " > " + right; + } }; } @@ -130,6 +164,11 @@ Object value2 = right.evaluate(exchange); return ObjectHelper.compare(value1, value2) >= 0; } + + @Override + public String toString() { + return left + " >= " + right; + } }; } @@ -142,6 +181,11 @@ Object value = expression.evaluate(exchange); return type.isInstance(value); } + + @Override + public String toString() { + return expression + " instanceof " + type.getName(); + } }; } @@ -153,6 +197,11 @@ Object value = expression.evaluate(exchange); return value == null; } + + @Override + public String toString() { + return expression + " == null"; + } }; } @@ -164,6 +213,11 @@ Object value = expression.evaluate(exchange); return value != null; } + + @Override + public String toString() { + return expression + " != null"; + } }; } @@ -177,8 +231,11 @@ Object value2 = right.evaluate(exchange); return ObjectHelper.contains(value1, value2); } + + @Override + public String toString() { + return left + ".contains(" + right + ")"; + } }; } - - } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultEndpoint.java Wed Mar 21 05:33:32 2007 @@ -25,9 +25,11 @@ import java.util.concurrent.atomic.AtomicBoolean; /** + * A default endpoint useful for implementation inheritence + * * @version $Revision$ */ -public abstract class DefaultEndpoint implements Endpoint { +public abstract class DefaultEndpoint implements Endpoint { private String endpointUri; private CamelContext context; private Processor inboundProcessor; @@ -92,6 +94,13 @@ activated.set(false); doDeactivate(); } + } + + + public E createExchange(E exchange) { + E answer = createExchange(); + answer.copyFrom(exchange); + return answer; } /** Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java Wed Mar 21 05:33:32 2007 @@ -41,17 +41,18 @@ public Exchange copy() { Exchange exchange = newInstance(); - if (exchange instanceof DefaultExchange) { - DefaultExchange defaultExchange = (DefaultExchange) exchange; - defaultExchange.setHeaders(getHeaders().copy()); - defaultExchange.setIn(getIn().copy()); - defaultExchange.setOut(getOut().copy()); - defaultExchange.setFault(getFault().copy()); - defaultExchange.setException(getException()); - } + exchange.copyFrom(this); return exchange; } + public void copyFrom(Exchange exchange) { + setHeaders(exchange.getHeaders().copy()); + setIn(exchange.getIn().copy()); + setOut(exchange.getOut().copy()); + setFault(exchange.getFault().copy()); + setException(exchange.getException()); + } + public Exchange newInstance() { return new DefaultExchange(context); } @@ -124,5 +125,4 @@ protected Message createOutMessage() { return new DefaultMessage(); } - } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java Wed Mar 21 05:33:32 2007 @@ -48,6 +48,11 @@ this.redeliveryPolicy = redeliveryPolicy; } + @Override + public String toString() { + return "DeadLetterChannel[" + output + ", " + deadLetter + ", " + redeliveryPolicy + "]"; + } + public void onExchange(E exchange) { int redeliveryCounter = 0; long redeliveryDelay = 0; @@ -77,6 +82,21 @@ // Properties //------------------------------------------------------------------------- + + /** + * Returns the output processor + */ + public Processor getOutput() { + return output; + } + + /** + * Returns the dead letter that message exchanges will be sent to if the redelivery attempts fail + */ + public Processor getDeadLetter() { + return deadLetter; + } + public RedeliveryPolicy getRedeliveryPolicy() { return redeliveryPolicy; } Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?view=auto&rev=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Wed Mar 21 05:33:32 2007 @@ -0,0 +1,68 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; + +import java.util.Collection; + +/** + * Implements the Multicast pattern to send a message exchange to a number of endpoints, each endpoint receiving a copy of + * the message exchange. + * + * @version $Revision$ + */ +public class MulticastProcessor implements Processor { + private Collection> endpoints; + + public MulticastProcessor(Collection> endpoints) { + this.endpoints = endpoints; + } + + @Override + public String toString() { + return "Multicast" + endpoints; + } + + public void onExchange(E exchange) { + for (Endpoint endpoint : endpoints) { + E copy = copyExchangeStrategy(endpoint, exchange); + endpoint.onExchange(copy); + } + } + + /** + * Returns the endpoints to multicast to + */ + public Collection> getEndpoints() { + return endpoints; + } + + /** + * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the + * {@link Pipeline} will not clone the exchange + * + * @param endpoint the endpoint that the exchange will be sent to + * @param exchange @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned. + */ + protected E copyExchangeStrategy(Endpoint endpoint, E exchange) { + return endpoint.createExchange(exchange); + } +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?view=auto&rev=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java (added) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java Wed Mar 21 05:33:32 2007 @@ -0,0 +1,86 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.Exchange; +import org.apache.camel.Endpoint; +import org.apache.camel.Processor; + +import java.util.Collection; + +/** + * Creates a Pipeline pattern where the output of the previous step is sent as input to the next step when working + * with request/response message exchanges. + * + * @version $Revision$ + */ +public class Pipeline implements Processor { + private Collection> endpoints; + + public Pipeline(Collection> endpoints) { + this.endpoints = endpoints; + } + + public void onExchange(E exchange) { + E nextExchange = exchange; + boolean first = true; + for (Endpoint endpoint : endpoints) { + if (first) { + first = false; + } + else { + nextExchange = createNextExchange(endpoint, nextExchange); + } + endpoint.onExchange(nextExchange); + } + } + + /** + * Strategy method to create the next exchange from the + * + * @param endpoint the endpoint the exchange will be sent to + * @param previousExchange the previous exchange + * @return a new exchange + */ + protected E createNextExchange(Endpoint endpoint, E previousExchange) { + E answer = endpoint.createExchange(previousExchange); + + // now lets set the input of the next exchange to the output of the previous message if it is not null + Object output = previousExchange.getOut().getBody(); + if (output != null) { + answer.getIn().setBody(output); + } + return answer; + } + + /** + * Strategy method to copy the exchange before sending to another endpoint. Derived classes such as the + * {@link Pipeline} will not clone the exchange + * + * @param exchange + * @return the current exchange if no copying is required such as for a pipeline otherwise a new copy of the exchange is returned. + */ + protected E copyExchangeStrategy(E exchange) { + return (E) exchange.copy(); + } + + @Override + public String toString() { + return "Pipeline" + endpoints; + } +} Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java ------------------------------------------------------------------------------ svn:keywords = Date Author Id Revision HeadURL Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Wed Mar 21 05:33:32 2007 @@ -18,6 +18,7 @@ package org.apache.camel.processor; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ExchangeHelper; import static org.apache.camel.util.ObjectHelper.notNull; import org.apache.camel.Exchange; import org.apache.camel.Processor; @@ -56,19 +57,7 @@ } } - @SuppressWarnings({"unchecked"}) protected Endpoint resolveEndpoint(E exchange, Object recipient) { - Endpoint endpoint; - if (recipient instanceof Endpoint) { - endpoint = (Endpoint) recipient; - } - else { - String uri = recipient.toString(); - endpoint = (Endpoint) exchange.getContext().resolveEndpoint(uri); - if (endpoint == null) { - throw new NoSuchEndpointException(uri); - } - } - return endpoint; + return ExchangeHelper.resolveEndpoint(exchange, recipient); } } Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryPolicy.java Wed Mar 21 05:33:32 2007 @@ -40,6 +40,11 @@ public RedeliveryPolicy() { } + @Override + public String toString() { + return "RedeliveryPolicy[maximumRedeliveries=" + maximumRedeliveries + "]"; + } + public RedeliveryPolicy copy() { try { return (RedeliveryPolicy) clone(); Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original) +++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Wed Mar 21 05:33:32 2007 @@ -17,8 +17,9 @@ */ package org.apache.camel.util; +import org.apache.camel.Endpoint; import org.apache.camel.Exchange; -import org.apache.camel.Expression; +import org.apache.camel.NoSuchEndpointException; /** * Some helper methods for working with {@link Exchange} objects @@ -26,4 +27,30 @@ * @version $Revision$ */ public class ExchangeHelper { + + /** + * Attempts to resolve the endpoint for the given value + * + * @param exchange the message exchange being processed + * @param value the value which can be an {@link Endpoint} or an object which provides a String representation + * of an endpoint via {@link #toString()} + * + * @return the endpoint + * @throws NoSuchEndpointException if the endpoint cannot be resolved + */ + @SuppressWarnings({"unchecked"}) + public static Endpoint resolveEndpoint(E exchange, Object value) throws NoSuchEndpointException { + Endpoint endpoint; + if (value instanceof Endpoint) { + endpoint = (Endpoint) value; + } + else { + String uri = value.toString(); + endpoint = (Endpoint) exchange.getContext().resolveEndpoint(uri); + if (endpoint == null) { + throw new NoSuchEndpointException(uri); + } + } + return endpoint; + } } Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java?view=diff&rev=520860&r1=520859&r2=520860 ============================================================================== --- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java (original) +++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/RouteBuilderTest.java Wed Mar 21 05:33:32 2007 @@ -19,12 +19,13 @@ import junit.framework.TestCase; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.processor.ChoiceProcessor; -import org.apache.camel.processor.CompositeProcessor; import org.apache.camel.processor.SendProcessor; import org.apache.camel.processor.FilterProcessor; import org.apache.camel.processor.InterceptorProcessor; import org.apache.camel.processor.RecipientList; import org.apache.camel.processor.Splitter; +import org.apache.camel.processor.DeadLetterChannel; +import org.apache.camel.processor.MulticastProcessor; import java.util.ArrayList; import java.util.List; @@ -68,7 +69,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a SendProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof SendProcessor); SendProcessor sendProcessor = (SendProcessor) processor; @@ -76,7 +77,7 @@ } } - protected RouteBuilder buildSimpleRouteWithHeaderPredicate() { + protected RouteBuilder buildSimpleRouteWithHeaderPredicate() { // START SNIPPET: e2 RouteBuilder builder = new RouteBuilder() { public void configure() { @@ -98,17 +99,18 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a FilterProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof FilterProcessor); FilterProcessor filterProcessor = (FilterProcessor) processor; - SendProcessor sendProcessor = (SendProcessor) filterProcessor.getProcessor(); + SendProcessor sendProcessor = (SendProcessor) unwrapErrorHandler(filterProcessor.getProcessor()); assertEquals("Endpoint URI", "queue:b", sendProcessor.getDestination().getEndpointUri()); } } - protected RouteBuilder buildSimpleRouteWithChoice() { + + protected RouteBuilder buildSimpleRouteWithChoice() { // START SNIPPET: e3 RouteBuilder builder = new RouteBuilder() { public void configure() { @@ -133,7 +135,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a ChoiceProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof ChoiceProcessor); ChoiceProcessor choiceProcessor = (ChoiceProcessor) processor; @@ -178,7 +180,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertEquals("Should be called with my processor", myProcessor, processor); } @@ -207,11 +209,11 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a FilterProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof FilterProcessor); FilterProcessor filterProcessor = (FilterProcessor) processor; - assertEquals("Should be called with my processor", myProcessor, filterProcessor.getProcessor()); + assertEquals("Should be called with my processor", myProcessor, unwrapErrorHandler(filterProcessor.getProcessor())); } } @@ -238,18 +240,19 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); + + assertTrue("Processor should be a MulticastProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof MulticastProcessor); + MulticastProcessor multicastProcessor = (MulticastProcessor) processor; - assertTrue("Processor should be a CompositeProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof CompositeProcessor); - CompositeProcessor compositeProcessor = (CompositeProcessor) processor; - List> processors = new ArrayList>(compositeProcessor.getProcessors()); - assertEquals("Should have 2 processors", 2, processors.size()); + List> endpoints = new ArrayList>(multicastProcessor.getEndpoints()); + assertEquals("Should have 2 endpoints", 2, endpoints.size()); - assertSendTo(processors.get(0), "queue:tap"); - assertSendTo(processors.get(1), "queue:b"); + assertEndpointUri(endpoints.get(0), "queue:tap"); + assertEndpointUri(endpoints.get(1), "queue:b"); } } - + protected RouteBuilder buildRouteWithInterceptor() { interceptor1 = new InterceptorProcessor() { }; @@ -288,7 +291,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a interceptor1 but was: " + processor + " with type: " + processor.getClass().getName(), processor==interceptor1); InterceptorProcessor p1 = (InterceptorProcessor) processor; @@ -311,7 +314,6 @@ }; // END SNIPPET: e7 - Map, Processor> routeMap = builder.getRouteMap(); System.out.println("Created map: " + routeMap); @@ -320,7 +322,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); System.out.println("processor: " + processor); /* TODO @@ -367,7 +369,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a RecipientList but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof RecipientList); RecipientList p1 = (RecipientList) processor; @@ -396,7 +398,7 @@ for (Map.Entry, Processor> route : routes) { Endpoint key = route.getKey(); assertEquals("From endpoint", "queue:a", key.getEndpointUri()); - Processor processor = route.getValue(); + Processor processor = getProcessorWithoutErrorHandler(route); assertTrue("Processor should be a Splitter but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof Splitter); Splitter p1 = (Splitter) processor; @@ -404,9 +406,29 @@ } protected void assertSendTo(Processor processor, String uri) { + processor = unwrapErrorHandler(processor); + assertTrue("Processor should be a SendProcessor but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof SendProcessor); SendProcessor sendProcessor = (SendProcessor) processor; assertEquals("Endpoint URI", uri, sendProcessor.getDestination().getEndpointUri()); + } + + /** + * By default routes should be wrapped in the {@link DeadLetterChannel} so lets unwrap that and return the actual processor + */ + protected Processor getProcessorWithoutErrorHandler(Map.Entry, Processor> route) { + Processor processor = route.getValue(); + return unwrapErrorHandler(processor); + } + + protected Processor unwrapErrorHandler(Processor processor) { + assertTrue("Processor should be a DeadLetterChannel but was: " + processor + " with type: " + processor.getClass().getName(), processor instanceof DeadLetterChannel); + DeadLetterChannel deadLetter = (DeadLetterChannel) processor; + return deadLetter.getOutput(); + } + + protected void assertEndpointUri(Endpoint endpoint, String uri) { + assertEquals("Endoint uri for: " + endpoint, uri, endpoint.getEndpointUri()); } }