Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 46823 invoked from network); 7 May 2009 10:43:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 7 May 2009 10:43:09 -0000 Received: (qmail 23242 invoked by uid 500); 7 May 2009 10:43:09 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 23199 invoked by uid 500); 7 May 2009 10:43:09 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 23190 invoked by uid 99); 7 May 2009 10:43:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2009 10:43:09 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2009 10:43:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 24405238896B; Thu, 7 May 2009 10:42:45 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r772598 [1/2] - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/component/seda/ main/java/org/apache/camel/impl/ main/java/org/apache/camel/impl/converter/ main/java/org/apache/camel/model/ main/java/o... Date: Thu, 07 May 2009 10:42:44 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090507104245.24405238896B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Thu May 7 10:42:40 2009 New Revision: 772598 URL: http://svn.apache.org/viewvc?rev=772598&view=rev Log: CAMEL-1572: Added first cut of async DSL. Fixed CS. Added: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java (with props) camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java - copied, changed from r772297, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (with props) camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteWithErrorTest.java (with props) camel/trunk/camel-core/src/test/resources/org/apache/camel/processor/students.xml (with props) Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Added: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java Thu May 7 10:42:40 2009 @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel; + +/** + * Exception occured during execution/processing of an {@link Exchange}. + *

+ * Is usually thrown to the caller when using the {@link org.apache.camel.ProducerTemplate} + * to send messages to Camel. + * + * @version $Revision$ + */ +public class CamelExecutionException extends RuntimeExchangeException { + + public CamelExecutionException(String message, Exchange exchange) { + super(message, exchange); + } + + public CamelExecutionException(String message, Exchange exchange, Throwable cause) { + super(message, exchange, cause); + } +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/CamelExecutionException.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Thu May 7 10:42:40 2009 @@ -30,6 +30,9 @@ */ public interface Exchange { + String ASYNC_WAIT = "CamelAsyncWait"; + String ASYNC_WAIT_TIMEOUT = "CamelAsyncWaitTimeout"; + String BEAN_METHOD_NAME = "CamelBeanMethodName"; String BEAN_HOLDER = "CamelBeanHolder"; String BEAN_MULTI_PARAMETER_ARRAY = "CamelBeanMultiParameterArray"; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/InvalidPayloadException.java Thu May 7 10:42:40 2009 @@ -29,13 +29,13 @@ } public InvalidPayloadException(Exchange exchange, Class type, Message message) { - super("No body available of type: " + type.getName() + super("No body available of type: " + type.getCanonicalName() + NoSuchPropertyException.valueDescription(message.getBody()) + " on: " + message, exchange); this.type = type; } public InvalidPayloadException(Exchange exchange, Class type, Message message, Throwable cause) { - super("No body available of type: " + type.getName() + super("No body available of type: " + type.getCanonicalName() + NoSuchPropertyException.valueDescription(message.getBody()) + " on: " + message + ". Caused by: " + cause.getMessage(), exchange, cause); this.type = type; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/ProducerTemplate.java Thu May 7 10:42:40 2009 @@ -70,37 +70,53 @@ /** * Sends the body to the default endpoint + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param body the payload to send + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBody(Object body); /** * Sends the body to the default endpoint with a specified header and header * value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param body the payload to send * @param header the header name * @param headerValue the header value + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndHeader(Object body, String header, Object headerValue); /** * Sends the body to the default endpoint with a specified property and property * value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param body the payload to send * @param property the property name * @param propertyValue the property value + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndProperty(Object body, String property, Object propertyValue); /** * Sends the body to the default endpoint with the specified headers and * header values + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param body the payload to send * @param headers the headers + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndHeaders(Object body, Map headers); @@ -171,65 +187,95 @@ /** * Send the body to an endpoint + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the endpoint to send the exchange to * @param body the payload + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBody(Endpoint endpoint, Object body); /** * Send the body to an endpoint + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send the exchange to * @param body the payload + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBody(String endpointUri, Object body); /** * Send the body to an endpoint with the given {@link ExchangePattern} * returning any result output body + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the endpoint to send the exchange to * @param body the payload * @param pattern the message {@link ExchangePattern} such as * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBody(Endpoint endpoint, ExchangePattern pattern, Object body); /** * Send the body to an endpoint returning any result output body + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send the exchange to * @param pattern the message {@link ExchangePattern} such as * {@link ExchangePattern#InOnly} or {@link ExchangePattern#InOut} * @param body the payload * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBody(String endpointUri, ExchangePattern pattern, Object body); /** * Sends the body to an endpoint with a specified header and header value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload to send * @param header the header name * @param headerValue the header value + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndHeader(String endpointUri, Object body, String header, Object headerValue); /** * Sends the body to an endpoint with a specified header and header value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param body the payload to send * @param header the header name * @param headerValue the header value + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue); /** * Sends the body to an endpoint with a specified header and header value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param pattern the message {@link ExchangePattern} such as @@ -238,12 +284,16 @@ * @param header the header name * @param headerValue the header value * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBodyAndHeader(Endpoint endpoint, ExchangePattern pattern, Object body, String header, Object headerValue); /** * Sends the body to an endpoint with a specified header and header value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint URI to send to * @param pattern the message {@link ExchangePattern} such as @@ -252,32 +302,44 @@ * @param header the header name * @param headerValue the header value * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBodyAndHeader(String endpoint, ExchangePattern pattern, Object body, String header, Object headerValue); /** * Sends the body to an endpoint with a specified property and property value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload to send * @param property the property name * @param propertyValue the property value + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndProperty(String endpointUri, Object body, String property, Object propertyValue); /** * Sends the body to an endpoint with a specified property and property value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param body the payload to send * @param property the property name * @param propertyValue the property value + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndProperty(Endpoint endpoint, Object body, String property, Object propertyValue); /** * Sends the body to an endpoint with a specified property and property value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param pattern the message {@link ExchangePattern} such as @@ -286,12 +348,16 @@ * @param property the property name * @param propertyValue the property value * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBodyAndProperty(Endpoint endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue); /** * Sends the body to an endpoint with a specified property and property value + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint URI to send to * @param pattern the message {@link ExchangePattern} such as @@ -300,33 +366,42 @@ * @param property the property name * @param propertyValue the property value * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBodyAndProperty(String endpoint, ExchangePattern pattern, Object body, String property, Object propertyValue); /** - * Sends the body to an endpoint with the specified headers and header - * values + * Sends the body to an endpoint with the specified headers and header values + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload to send * @param headers headers + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndHeaders(String endpointUri, Object body, Map headers); /** - * Sends the body to an endpoint with the specified headers and header - * values + * Sends the body to an endpoint with the specified headers and header values + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the endpoint URI to send to * @param body the payload to send * @param headers headers + * @throws CamelExecutionException if the processing of the exchange failed */ void sendBodyAndHeaders(Endpoint endpoint, Object body, Map headers); /** - * Sends the body to an endpoint with the specified headers and header - * values + * Sends the body to an endpoint with the specified headers and header values + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param pattern the message {@link ExchangePattern} such as @@ -334,13 +409,16 @@ * @param body the payload to send * @param headers headers * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBodyAndHeaders(String endpointUri, ExchangePattern pattern, Object body, Map headers); /** - * Sends the body to an endpoint with the specified headers and header - * values + * Sends the body to an endpoint with the specified headers and header values + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the endpoint URI to send to * @param pattern the message {@link ExchangePattern} such as @@ -348,6 +426,7 @@ * @param body the payload to send * @param headers headers * @return the result if {@link ExchangePattern} is OUT capable, otherwise null + * @throws CamelExecutionException if the processing of the exchange failed */ Object sendBodyAndHeaders(Endpoint endpoint, ExchangePattern pattern, Object body, Map headers); @@ -379,79 +458,110 @@ /** * Sends the body to the default endpoint and returns the result content * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param body the payload to send * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBody(Object body); /** * Sends the body to the default endpoint and returns the result content * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param body the payload to send * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBody(Object body, Class type); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param body the payload * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBody(Endpoint endpoint, Object body); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param body the payload * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBody(Endpoint endpoint, Object body, Class type); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBody(String endpointUri, Object body); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBody(String endpointUri, Object body, Class type); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param body the payload * @param header the header name * @param headerValue the header value * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the Endpoint to send to * @param body the payload @@ -459,24 +569,32 @@ * @param headerValue the header value * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBodyAndHeader(Endpoint endpoint, Object body, String header, Object headerValue, Class type); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload * @param header the header name * @param headerValue the header value * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue); /** * Send the body to an endpoint returning any result output body. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload @@ -484,56 +602,69 @@ * @param headerValue the header value * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBodyAndHeader(String endpointUri, Object body, String header, Object headerValue, Class type); /** - * Sends the body to an endpoint with the specified headers and header - * values. + * Sends the body to an endpoint with the specified headers and header values. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload to send * @param headers headers * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBodyAndHeaders(String endpointUri, Object body, Map headers); /** - * Sends the body to an endpoint with the specified headers and header - * values. + * Sends the body to an endpoint with the specified headers and header values. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpointUri the endpoint URI to send to * @param body the payload to send * @param headers headers * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBodyAndHeaders(String endpointUri, Object body, Map headers, Class type); /** - * Sends the body to an endpoint with the specified headers and header - * values. + * Sends the body to an endpoint with the specified headers and header values. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the endpoint URI to send to * @param body the payload to send * @param headers headers * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ Object requestBodyAndHeaders(Endpoint endpoint, Object body, Map headers); /** - * Sends the body to an endpoint with the specified headers and header - * values. + * Sends the body to an endpoint with the specified headers and header values. * Uses an {@link ExchangePattern#InOut} message exchange pattern. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param endpoint the endpoint URI to send to * @param body the payload to send * @param headers headers * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T requestBodyAndHeaders(Endpoint endpoint, Object body, Map headers, Class type); @@ -650,15 +781,22 @@ /** * Gets the response body from the future handle, will wait until the response is ready. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param future the handle to get the response * @param type the expected response type * @return the result (see class javadoc) + * @throws CamelExecutionException if the processing of the exchange failed */ T asyncExtractBody(Future future, Class type); /** * Gets the response body from the future handle, will wait at most the given time for the response to be ready. + *

Notice: that if the processing of the exchange failed with an Exception + * it is thrown from this method as a {@link org.apache.camel.CamelExecutionException} with + * the caused exception wrapped. * * @param future the handle to get the response * @param timeout the maximum time to wait @@ -666,6 +804,7 @@ * @param type the expected response type * @return the result (see class javadoc) * @throws java.util.concurrent.TimeoutException if the wait timed out + * @throws CamelExecutionException if the processing of the exchange failed */ T asyncExtractBody(Future future, long timeout, TimeUnit unit, Class type) throws TimeoutException; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/RollbackExchangeException.java Thu May 7 10:42:40 2009 @@ -21,22 +21,22 @@ * * @version $Revision$ */ -public class RollbackExchangeException extends Exception { +public class RollbackExchangeException extends CamelExchangeException { public RollbackExchangeException(Exchange exchange) { - this("Intended rollback on exchange", exchange); + this("Intended rollback", exchange); } public RollbackExchangeException(Exchange exchange, Throwable cause) { - this("Intended rollback on exchange", exchange, cause); + this("Intended rollback", exchange, cause); } public RollbackExchangeException(String message, Exchange exchange) { - super(message + ": " + exchange); + super(message, exchange); } public RollbackExchangeException(String message, Exchange exchange, Throwable cause) { - super(message + ": " + exchange, cause); + super(message, exchange, cause); } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaConsumer.java Thu May 7 10:42:40 2009 @@ -23,8 +23,8 @@ import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; -import org.apache.camel.util.concurrent.ExecutorServiceHelper; import org.apache.camel.impl.ServiceSupport; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultComponent.java Thu May 7 10:42:40 2009 @@ -19,8 +19,8 @@ import java.net.URI; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import org.apache.camel.CamelContext; import org.apache.camel.Component; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java Thu May 7 10:42:40 2009 @@ -16,10 +16,10 @@ */ package org.apache.camel.impl; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.ExecutorService; import org.apache.camel.Endpoint; import org.apache.camel.Processor; Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/DefaultTypeConverter.java Thu May 7 10:42:40 2009 @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.camel.CamelExecutionException; import org.apache.camel.Exchange; import org.apache.camel.NoFactoryAvailableException; import org.apache.camel.NoTypeConversionAvailableException; @@ -68,6 +69,7 @@ addFallbackTypeConverter(new EnumTypeConverter()); addFallbackTypeConverter(new ArrayTypeConverter()); addFallbackTypeConverter(new PropertyEditorTypeConverter()); + addFallbackTypeConverter(new FutureTypeConverter(this)); } public List getTypeConverterLoaders() { @@ -82,6 +84,9 @@ Object answer; try { answer = doConvertTo(type, exchange, value); + } catch (CamelExecutionException e) { + // rethrow exception exception as its not due to failed convertion + throw e; } catch (Exception e) { // we cannot convert so return null if (LOG.isDebugEnabled()) { Added: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java Thu May 7 10:42:40 2009 @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.impl.converter; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.NoTypeConversionAvailableException; +import org.apache.camel.StreamCache; +import org.apache.camel.TypeConverter; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Future type converter. + * + * @version $Revision$ + */ +@Converter +public final class FutureTypeConverter implements TypeConverter { + + private static final Log LOG = LogFactory.getLog(FutureTypeConverter.class); + + private final TypeConverter converter; + + public FutureTypeConverter(TypeConverter converter) { + this.converter = converter; + } + + private T doConvertTo(Class type, Exchange exchange, Object value) throws Exception { + // do not convert to stream cache + if (StreamCache.class.isAssignableFrom(value.getClass())) { + return null; + } + + if (Future.class.isAssignableFrom(value.getClass())) { + + Future future = (Future) value; + + if (future.isCancelled()) { + return null; + } + + // do some trace logging as the get is blocking until the response is ready + if (LOG.isTraceEnabled()) { + LOG.trace("Getting future response"); + } + + Object body; + try { + body = future.get(); + } catch (ExecutionException e) { + exchange.setException(e); + throw e; + } + if (LOG.isTraceEnabled()) { + LOG.trace("Got future response"); + } + + if (body == null) { + return null; + } + + Class from = body.getClass(); + + // maybe from is already the type we want + if (from.isAssignableFrom(type)) { + return type.cast(from); + } else if (body instanceof Exchange) { + Exchange result = (Exchange) body; + body = ExchangeHelper.extractResultBody(result, result.getPattern()); + } + + // no then try to lookup a type converter + return converter.convertTo(type, exchange, body); + } + + return null; + } + + public T convertTo(Class type, Object value) { + return convertTo(type, null, value); + } + + public T convertTo(Class type, Exchange exchange, Object value) { + try { + return doConvertTo(type, exchange, value); + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + + public T mandatoryConvertTo(Class type, Object value) throws NoTypeConversionAvailableException { + return mandatoryConvertTo(type, null, value); + } + + public T mandatoryConvertTo(Class type, Exchange exchange, Object value) throws NoTypeConversionAvailableException { + T answer; + try { + answer = doConvertTo(type, exchange, value); + } catch (Exception e) { + throw new NoTypeConversionAvailableException(value, type, e); + } + + if (answer == null) { + throw new NoTypeConversionAvailableException(value, type); + } + + return answer; + } +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/converter/FutureTypeConverter.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java Thu May 7 10:42:40 2009 @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.model; + +import java.util.concurrent.ExecutorService; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlAttribute; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.XmlTransient; + +import org.apache.camel.Processor; +import org.apache.camel.processor.AsyncProcessor; +import org.apache.camel.spi.RouteContext; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; + +/** + * Represents an XML <async/> element + * + * @version $Revision$ + */ +@XmlRootElement(name = "async") +@XmlAccessorType(XmlAccessType.FIELD) +public class AsyncDefinition extends OutputDefinition { + + @XmlTransient + private ExecutorService executorService; + @XmlAttribute(required = false) + private String executorServiceRef; + @XmlAttribute(required = false) + private Integer poolSize; + @XmlAttribute(required = false) + private Boolean waitForTaskToComplete = Boolean.TRUE; + + @Override + public Processor createProcessor(RouteContext routeContext) throws Exception { + if (executorServiceRef != null) { + executorService = routeContext.lookup(executorServiceRef, ExecutorService.class); + } + if (executorService == null && poolSize != null) { + executorService = ExecutorServiceHelper.newScheduledThreadPool(poolSize, "AsyncProcessor", true); + } + Processor childProcessor = routeContext.createProcessor(this); + return new AsyncProcessor(childProcessor, executorService, waitForTaskToComplete); + } + + @Override + public String getLabel() { + return "async"; + } + + @Override + public String getShortName() { + return "async"; + } + + @Override + public String toString() { + return "Async[" + getOutputs() + "]"; + } + + /** + * Setting the executor service for executing the multicasting action. + * + * @return the builder + */ + public AsyncDefinition executorService(ExecutorService executorService) { + setExecutorService(executorService); + return this; + } + + /** + * Setting the core pool size for the underlying {@link java.util.concurrent.ExecutorService}. + * + * @return the builder + */ + public AsyncDefinition poolSize(int poolSize) { + setPoolSize(poolSize); + return this; + } + + /** + * Setting to whether to wait for async tasks to be complete before continuing original route. + *

+ * Is default true + * + * @param complete whether to wait or not + * @return the builder + */ + public AsyncDefinition waitForTaskToComplete(boolean complete) { + setWaitForTaskToComplete(complete); + return this; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public void setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + } + + public Integer getPoolSize() { + return poolSize; + } + + public void setPoolSize(Integer poolSize) { + this.poolSize = poolSize; + } + + public Boolean getWaitForTaskToComplete() { + return waitForTaskToComplete; + } + + public void setWaitForTaskToComplete(Boolean waitForTaskToComplete) { + this.waitForTaskToComplete = waitForTaskToComplete; + } +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AsyncDefinition.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Thu May 7 10:42:40 2009 @@ -670,6 +670,33 @@ } /** + * Breaks the route into asynchronous. The caller thread will end and the OUT message will + * contain a {@link java.util.concurrent.Future} handle so you can get the real response + * later using this handle. + * + * @return the builder + */ + public AsyncDefinition async() { + AsyncDefinition answer = new AsyncDefinition(); + addOutput(answer); + return answer; + } + + /** + * Breaks the route into asynchronous. The caller thread will end and the OUT message will + * contain a {@link java.util.concurrent.Future} handle so you can get the real response + * later using this handle. + * + * @param poolSize the core pool size + * @return the builder + */ + public AsyncDefinition async(int poolSize) { + AsyncDefinition answer = async(); + answer.setPoolSize(poolSize); + return answer; + } + + /** * Ends the current block * * @return the builder Added: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java (added) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java Thu May 7 10:42:40 2009 @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.concurrent.ExecutorServiceHelper; + +/** + * Async processor that turns the processing going forward into async mode. + *

+ * The original caller thread will receive a Future<Exchange> in the OUT message body. + * It can then later use this handle to obtain the async response. + *

+ * Camel also provides type converters so you can just ask to get the desired object type and Camel + * will automatic wait for the async task to complete to return the response. + * + * @version $Revision$ + */ +public class AsyncProcessor extends DelegateProcessor implements Processor { + + private static final int DEFAULT_THREADPOOL_SIZE = 5; + private ExecutorService executorService; + private boolean waitTaskComplete; + + public AsyncProcessor(Processor output, ExecutorService executorService, boolean waitTaskComplete) { + super(output); + this.executorService = executorService; + this.waitTaskComplete = waitTaskComplete; + } + + public void process(final Exchange exchange) throws Exception { + final Processor output = getProcessor(); + if (output == null) { + // no output then return + return; + } + + // use a new copy of the exchange to route async + final Exchange copy = exchange.newCopy(); + + // let it execute async and return the Future + Callable task = new Callable() { + public Exchange call() throws Exception { + // must use a copy of the original exchange for processing async + output.process(copy); + return copy; + } + }; + + // sumbit the task + Future future = getExecutorService().submit(task); + + // TODO: Support exchange headers for wait and timeout values, see Exchange constants + + if (waitTaskComplete) { + // wait for task to complete + Exchange response = future.get(); + // if we are out capable then set the response on the original exchange + if (ExchangeHelper.isOutCapable(exchange)) { + ExchangeHelper.copyResults(exchange, response); + } + } else { + // no we do not expect a reply so lets continue, set a handle to the future task + // in case end user need it later + exchange.getOut().setBody(future); + } + } + + public ExecutorService getExecutorService() { + if (executorService == null) { + executorService = createExecutorService(); + } + return executorService; + } + + private ExecutorService createExecutorService() { + return ExecutorServiceHelper.newScheduledThreadPool(DEFAULT_THREADPOOL_SIZE, "AsyncProcessor", true); + } + + protected void doStop() throws Exception { + super.doStop(); + if (executorService != null) { + executorService.shutdown(); + } + } + +} Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/AsyncProcessor.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java Thu May 7 10:42:40 2009 @@ -20,6 +20,7 @@ import java.util.Map; import org.apache.camel.CamelContext; +import org.apache.camel.CamelExecutionException; import org.apache.camel.Endpoint; import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; @@ -31,7 +32,6 @@ import org.apache.camel.NoSuchPropertyException; import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.TypeConverter; -import static org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException; /** * Some helper methods for working with {@link Exchange} objects @@ -397,21 +397,22 @@ } /** - * Extracts the body from the given result. + * Extracts the body from the given exchange. *

* If the exchange pattern is provided it will try to honor it and retrive the body * from either IN or OUT according to the pattern. * - * @param exchange the result - * @param pattern exchange pattern if given, can be null - * @return the result, can be null. + * @param exchange the exchange + * @param pattern exchange pattern if given, can be null + * @return the result body, can be null. + * @throws CamelExecutionException if the processing of the exchange failed */ public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) { Object answer = null; if (exchange != null) { // rethrow if there was an exception if (exchange.getException() != null) { - throw wrapRuntimeCamelException(exchange.getException()); + throw new CamelExecutionException("Exception occured during execution ", exchange, exchange.getException()); } // result could have a fault message Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Thu May 7 10:42:40 2009 @@ -18,8 +18,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; /** * Helper for {@link java.util.concurrent.ExecutorService} to construct executors using a thread factory that @@ -38,7 +38,7 @@ * Creates a new thread name with the given prefix */ protected static String getThreadName(String name) { - return "Camel " + name + " thread:" + nextThreadCounter(); + return "Camel thread " + nextThreadCounter() + ": " + name; } protected static synchronized int nextThreadCounter() { Modified: camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index?rev=772598&r1=772597&r2=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index (original) +++ camel/trunk/camel-core/src/main/resources/org/apache/camel/model/jaxb.index Thu May 7 10:42:40 2009 @@ -15,6 +15,7 @@ ## limitations under the License. ## ------------------------------------------------------------------------ AggregateDefinition +AsyncDefinition BeanDefinition CatchDefinition ChoiceDefinition Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java (from r772297, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java) URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java&r1=772297&r2=772598&rev=772598&view=diff ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformProcessorTest.java (original) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/TransformXpathTest.java Thu May 7 10:42:40 2009 @@ -16,17 +16,44 @@ */ package org.apache.camel.processor; +import java.io.File; + +import org.w3c.dom.NodeList; + +import org.apache.camel.ContextTestSupport; import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Based on user forum trouble + */ +public class TransformXpathTest extends ContextTestSupport { + + public void testTransformWithXpath() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedMessageCount(1); + mock.message(0).body().isInstanceOf(NodeList.class); -public class TransformProcessorTest extends TransformViaDSLTest { + String xml = context.getTypeConverter().convertTo(String.class, new File("src/test/resources/org/apache/camel/processor/students.xml")); + + template.sendBody("direct:start", xml); + + assertMockEndpointsSatisfied(); + + NodeList list = mock.getReceivedExchanges().get(0).getIn().getBody(NodeList.class); + assertEquals(2, list.getLength()); + + assertEquals("Claus", context.getTypeConverter().convertTo(String.class, list.item(0).getTextContent().trim())); + assertEquals("Hadrian", context.getTypeConverter().convertTo(String.class, list.item(1).getTextContent().trim())); + } protected RouteBuilder createRouteBuilder() { return new RouteBuilder() { public void configure() { - // START SNIPPET: example - from("direct:start").transform(body().append(" World!")).to("mock:result"); - // END SNIPPET: example + from("direct:start") + .transform().xpath("//students/student") + .to("mock:result"); } }; } -} +} \ No newline at end of file Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java Thu May 7 10:42:40 2009 @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; + +/** + * Unit test to verify that error handling using async() also works as expected. + * + * @version $Revision$ + */ +public class AsyncErrorHandlerTest extends ContextTestSupport { + + public void testAsyncErrorHandler() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + + MockEndpoint mock = getMockEndpoint("mock:dead"); + mock.expectedMessageCount(1); + mock.message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE); + mock.message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2); + + template.sendBody("direct:in", "Hello World"); + + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + public void configure() throws Exception { + errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false)); + + from("direct:in") + .async(2) + .to("mock:foo") + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + throw new Exception("Forced exception by unit test"); + } + }); + } + }; + } + +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncErrorHandlerTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java Thu May 7 10:42:40 2009 @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import java.util.concurrent.Future; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class AsyncNoWaitRouteTest extends ContextTestSupport { + + private String route = ""; + + @Override + protected void setUp() throws Exception { + super.setUp(); + route = ""; + } + + public void testAsyncNoWaitRouteExchange() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + // send an in out to the direct endpoint using the classic API + Exchange exchange = template.send("direct:start", new Processor() { + public void process(Exchange exchange) throws Exception { + exchange.setPattern(ExchangePattern.InOut); + exchange.getIn().setBody("Hello"); + } + }); + + // we should run before the async processor that sets B + route += "A"; + + // as it turns into a async route later we get a Future in the IN body + Object out = exchange.getOut().getBody(); + assertIsInstanceOf(Future.class, out); + + // cast to future + Future future = (Future) out; + + assertFalse("Should not be done", future.isDone()); + + assertMockEndpointsSatisfied(); + + assertEquals("AB", route); + + // get the response from the future + String response = context.getTypeConverter().convertTo(String.class, future); + assertEquals("Bye World", response); + } + + public void testAsyncNoWaitRoute() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + // send a request reply to the direct start endpoint + Object out = template.requestBody("direct:start", "Hello"); + + // we should run before the async processor that sets B + route += "A"; + + // as it turns into a async route later we get a Future as response + assertIsInstanceOf(Future.class, out); + + // cast to future + Future future = (Future) out; + + assertFalse("Should not be done", future.isDone()); + + assertMockEndpointsSatisfied(); + + assertEquals("AB", route); + + // get the response from the future + String response = context.getTypeConverter().convertTo(String.class, future); + assertEquals("Bye World", response); + } + + public void testAsyncRouteNoWaitWithTypeConverted() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + // send a request reply to the direct start endpoint, but will use + // future type converter that will wait for the response, even though the async + // is set to not wait. As the type converter will wait for us + String response = template.requestBody("direct:start", "Hello", String.class); + + // we should wait for the async response as we ask for the result as a String body + route += "A"; + + assertMockEndpointsSatisfied(); + + assertEquals("Bye World", response); + assertEquals("BA", route); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // we start this route async + from("direct:start") + // we play a bit with the message + .transform(body().append(" World")) + // now turn the route into async from this point forward + // the caller will have a Future returned as response in OUT + // to be used to grap the async response when he fell like it + // we do not want to wait for tasks to be complete so we instruct Camel + // to not wait, and therefore Camel returns the Future handle we + // can use to get the result when we want + .async().waitForTaskToComplete(false) + // from this point forward this is the async route doing its work + // so we do a bit of delay to simulate heavy work that takes time + .to("mock:foo") + .delay(100) + // and we also work with the message so we can prepare a response + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + route += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + exchange.getOut().setBody("Bye World"); + } + // and we use mocks for unit testing + }).to("mock:result"); + } + }; + } +} Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncNoWaitRouteTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java Thu May 7 10:42:40 2009 @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import java.util.concurrent.Future; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class AsyncRouteNoWaitWithErrorTest extends ContextTestSupport { + + private String route = ""; + + @Override + protected void setUp() throws Exception { + super.setUp(); + route = ""; + } + + public void testAsyncRouteWithError() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedMessageCount(0); + + // send a request reply to the direct start endpoint + Object out = template.requestBody("direct:start", "Hello"); + + // we should run before the async processor that sets B + route += "A"; + + // as it turns into a async route later we get a Future as response + assertIsInstanceOf(Future.class, out); + + // cast to future + Future future = (Future) out; + + assertFalse("Should not be done", future.isDone()); + + assertMockEndpointsSatisfied(); + + // get the response from the future + try { + String response = context.getTypeConverter().convertTo(String.class, future); + fail("Should have thrown an exception"); + } catch (CamelExecutionException e) { + // expected an execution exception + assertEquals("Damn forced by unit test", e.getCause().getMessage()); + } + + assertEquals("AB", route); + } + + public void testAsyncRouteWithTypeConverted() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedMessageCount(0); + + // send a request reply to the direct start endpoint, but will use + // future type converter that will wait for the response + try { + String response = template.requestBody("direct:start", "Hello", String.class); + fail("Should have thrown an exception"); + } catch (CamelExecutionException e) { + // expected an execution exception + assertEquals("Damn forced by unit test", e.getCause().getMessage()); + } + + // we should wait for the async response as we ask for the result as a String body + route += "A"; + + assertMockEndpointsSatisfied(); + + assertEquals("BA", route); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // we start this route async + from("direct:start") + // we play a bit with the message + .transform(body().append(" World")) + // now turn the route into async from this point forward + // the caller will have a Future returned as response in OUT + // to be used to grap the async response when he fell like it + .async().waitForTaskToComplete(false) + // from this point forward this is the async route doing its work + // so we do a bit of delay to simulate heavy work that takes time + .to("mock:foo") + .delay(100) + // and we also work with the message so we can prepare a response + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + route += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + throw new IllegalArgumentException("Damn forced by unit test"); + } + // and we use mocks for unit testing + }).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteNoWaitWithErrorTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java?rev=772598&view=auto ============================================================================== --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java (added) +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java Thu May 7 10:42:40 2009 @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.async; + +import java.util.concurrent.Future; + +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.builder.RouteBuilder; + +/** + * @version $Revision$ + */ +public class AsyncRouteTest extends ContextTestSupport { + + private String route = ""; + + @Override + protected void setUp() throws Exception { + super.setUp(); + route = ""; + } + + public void testAsyncRoute() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + // send a request reply to the direct start endpoint + // it will wait for the async response so we get the full response + Object out = template.requestBody("direct:start", "Hello"); + + // we should run before the async processor that sets B + route += "A"; + + // as it turns into a async route later we get a Future as response + assertIsInstanceOf(String.class, out); + + assertMockEndpointsSatisfied(); + + assertEquals("BA", route); + + // get the response from the future + String response = context.getTypeConverter().convertTo(String.class, out); + assertEquals("Bye World", response); + } + + public void testAsyncRouteWithTypeConverted() throws Exception { + getMockEndpoint("mock:foo").expectedBodiesReceived("Hello World"); + getMockEndpoint("mock:result").expectedBodiesReceived("Bye World"); + + // send a request reply to the direct start endpoint, but will use + // future type converter that will wait for the response + String response = template.requestBody("direct:start", "Hello", String.class); + + // we should wait for the async response as we ask for the result as a String body + route += "A"; + + assertMockEndpointsSatisfied(); + + assertEquals("Bye World", response); + assertEquals("BA", route); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + // we start this route async + from("direct:start") + // we play a bit with the message + .transform(body().append(" World")) + // now turn the route into async from this point forward + // the caller will have a Future returned as response in OUT + // to be used to grap the async response when he fell like it + .async() + // from this point forward this is the async route doing its work + // so we do a bit of delay to simulate heavy work that takes time + .to("mock:foo") + .delay(100) + // and we also work with the message so we can prepare a response + .process(new Processor() { + public void process(Exchange exchange) throws Exception { + route += "B"; + assertEquals("Hello World", exchange.getIn().getBody()); + exchange.getOut().setBody("Bye World"); + } + // and we use mocks for unit testing + }).to("mock:result"); + } + }; + } +} \ No newline at end of file Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncRouteTest.java ------------------------------------------------------------------------------ svn:keywords = Rev Date