Return-Path: Delivered-To: apmail-camel-commits-archive@www.apache.org Received: (qmail 21455 invoked from network); 14 Nov 2009 17:10:32 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 14 Nov 2009 17:10:32 -0000 Received: (qmail 3993 invoked by uid 500); 14 Nov 2009 17:10:31 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 3948 invoked by uid 500); 14 Nov 2009 17:10:31 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 3939 invoked by uid 99); 14 Nov 2009 17:10:31 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Nov 2009 17:10:31 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 14 Nov 2009 17:10:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 684BC238888E; Sat, 14 Nov 2009 17:10:07 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r836215 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: model/InterceptDefinition.java processor/BatchProcessor.java processor/OnCompletionProcessor.java processor/RecipientList.java Date: Sat, 14 Nov 2009 17:10:07 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091114171007.684BC238888E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Sat Nov 14 17:10:06 2009 New Revision: 836215 URL: http://svn.apache.org/viewvc?rev=836215&view=rev Log: CAMEL-2170: Ensured using UnitOfWork for EIPs that emit new Exchanges on new routes which didnt have that beforehand Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java?rev=836215&r1=836214&r2=836215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/InterceptDefinition.java Sat Nov 14 17:10:06 2009 @@ -95,7 +95,7 @@ @Override public String toString() { - return "intercept[" + (interceptedTarget != null ? interceptedTarget : output) + "]"; + return "intercept[" + (interceptedTarget != null ? interceptedTarget : output) + "]"; } }); @@ -108,7 +108,7 @@ /** * Applies this interceptor only if the given predicate is true * - * @param predicate the predicate + * @param predicate the predicate * @return the builder */ public ChoiceDefinition when(Predicate predicate) { Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=836215&r1=836214&r2=836215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Sat Nov 14 17:10:06 2009 @@ -67,7 +67,9 @@ public BatchProcessor(Processor processor, Collection collection) { ObjectHelper.notNull(processor, "processor"); ObjectHelper.notNull(collection, "collection"); - this.processor = processor; + + // wrap processor in UnitOfWork so what we send out of the batch runs in a UoW + this.processor = new UnitOfWorkProcessor(processor); this.collection = collection; this.sender = new BatchSender(); } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java?rev=836215&r1=836214&r2=836215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/OnCompletionProcessor.java Sat Nov 14 17:10:06 2009 @@ -45,7 +45,8 @@ private Predicate onWhen; public OnCompletionProcessor(Processor processor, boolean onCompleteOnly, boolean onFailureOnly, Predicate onWhen) { - this.processor = processor; + // wrap processor in UnitOfWork so what we send out runs in a UoW + this.processor = new UnitOfWorkProcessor(processor); this.onCompleteOnly = onCompleteOnly; this.onFailureOnly = onFailureOnly; this.onWhen = onWhen; @@ -91,7 +92,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Processing onComplete: " + copy); } - processor.process(copy); + doProcess(processor, copy); return copy; } }); @@ -118,8 +119,7 @@ if (LOG.isDebugEnabled()) { LOG.debug("Processing onFailure: " + copy); } - - processor.process(copy); + doProcess(processor, copy); return copy; } }); @@ -139,6 +139,21 @@ } /** + * Processes the exchange by the processors + * + * @param processor the processor + * @param exchange the exchange + */ + protected static void doProcess(Processor processor, Exchange exchange) { + try { + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } + } + + + /** * Prepares the {@link Exchange} to send as onCompletion. * * @param exchange the current exchange @@ -175,6 +190,6 @@ } public String getTraceLabel() { - return "OnCompletion"; + return "onCompletion"; } } Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=836215&r1=836214&r2=836215&view=diff ============================================================================== --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java (original) +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java Sat Nov 14 17:10:06 2009 @@ -81,7 +81,9 @@ Object recipient = iter.next(); Endpoint endpoint = resolveEndpoint(exchange, recipient); Producer producer = getProducerCache(exchange).getProducer(endpoint); - processors.add(producer); + // wrap in unit of work + Processor target = new UnitOfWorkProcessor(producer); + processors.add(target); } MulticastProcessor mp = new MulticastProcessor(processors, new UseLatestAggregationStrategy()); mp.process(exchange);