Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B617C90C2 for ; Thu, 15 Mar 2012 08:09:21 +0000 (UTC) Received: (qmail 77152 invoked by uid 500); 15 Mar 2012 08:09:21 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 77068 invoked by uid 500); 15 Mar 2012 08:09:20 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 77058 invoked by uid 99); 15 Mar 2012 08:09:20 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Mar 2012 08:09:20 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Mar 2012 08:09:16 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 77076238890B; Thu, 15 Mar 2012 08:08:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1300835 - in /camel/branches/camel-2.9.x: ./ camel-core/src/main/java/org/apache/camel/processor/loadbalancer/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-solr/ Date: Thu, 15 Mar 2012 08:08:55 -0000 To: commits@camel.apache.org From: davsclaus@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120315080855.77076238890B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: davsclaus Date: Thu Mar 15 08:08:54 2012 New Revision: 1300835 URL: http://svn.apache.org/viewvc?rev=1300835&view=rev Log: Failover EIP - Should use defensive copy of exchange before failover to avoid side effects Added: camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java - copied unchanged from r1300831, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java camel/branches/camel-2.9.x/components/camel-solr/ (props changed) Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 15 08:08:54 2012 @@ -1 +1 @@ -/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155,1298447,1298795,1298821,1298993,1299383,1299399,1300720-1300722,1300805 +/camel/trunk:1243046,1243057,1243234,1244518,1244644,1244859,1244861,1244864,1244870,1244872,1245021,1291555,1291727,1291848,1291864,1292114,1292384,1292725,1292760,1292767,1293079,1293268,1293288,1293330,1293590,1293828,1293852,1293855,1294130,1294482,1294502,1294533,1294588,1294639,1294709,1294909,1294976,1295073,1295108,1295120,1296653,1296790,1298125,1298155,1298447,1298795,1298821,1298993,1299383,1299399,1300720-1300722,1300805,1300831 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1300835&r1=1300834&r2=1300835&view=diff ============================================================================== --- camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java (original) +++ camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java Thu Mar 15 08:08:54 2012 @@ -26,6 +26,7 @@ import org.apache.camel.Processor; import org.apache.camel.Traceable; import org.apache.camel.util.AsyncProcessorConverterHelper; import org.apache.camel.util.AsyncProcessorHelper; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; /** @@ -86,6 +87,10 @@ public class FailOverLoadBalancer extend * @return true to failover */ protected boolean shouldFailOver(Exchange exchange) { + if (exchange == null) { + return false; + } + boolean answer = false; if (exchange.getException() != null) { @@ -108,12 +113,15 @@ public class FailOverLoadBalancer extend return answer; } - public boolean process(Exchange exchange, AsyncCallback callback) { + public boolean process(final Exchange exchange, final AsyncCallback callback) { final List processors = getProcessors(); final AtomicInteger index = new AtomicInteger(); final AtomicInteger attempts = new AtomicInteger(); boolean first = true; + // use a copy of the original exchange before failover to avoid populating side effects + // directly into the original exchange + Exchange copy = null; // get the next processor if (isRoundRobin()) { @@ -124,7 +132,7 @@ public class FailOverLoadBalancer extend } log.trace("Failover starting with endpoint index {}", index); - while (first || shouldFailOver(exchange)) { + while (first || shouldFailOver(copy)) { if (!first) { attempts.incrementAndGet(); // are we exhausted by attempts? @@ -153,12 +161,12 @@ public class FailOverLoadBalancer extend } } - // try again but prepare exchange before we failover - prepareExchangeForFailover(exchange); + // try again but copy original exchange before we failover + copy = prepareExchangeForFailover(exchange); Processor processor = processors.get(index.get()); // process the exchange - boolean sync = processExchange(processor, exchange, attempts, index, callback, processors); + boolean sync = processExchange(processor, exchange, copy, attempts, index, callback, processors); // continue as long its being processed synchronously if (!sync) { @@ -171,8 +179,11 @@ public class FailOverLoadBalancer extend log.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId()); } + // and copy the current result to original so it will contain this result of this eip + if (copy != null) { + ExchangeHelper.copyResults(exchange, copy); + } log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - callback.done(true); return true; } @@ -181,35 +192,23 @@ public class FailOverLoadBalancer extend * Prepares the exchange for failover * * @param exchange the exchange + * @return a copy of the exchange to use for failover */ - protected void prepareExchangeForFailover(Exchange exchange) { - if (exchange.getException() != null) { - if (log.isDebugEnabled()) { - log.debug("Failover due {} for exchangeId: {}", exchange.getException().getMessage(), exchange.getExchangeId()); - } - - // clear exception so we can try failover - exchange.setException(null); - } - - exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null); - exchange.setProperty(Exchange.FAILURE_HANDLED, null); - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, null); - exchange.getIn().removeHeader(Exchange.REDELIVERED); - exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER); - exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER); + protected Exchange prepareExchangeForFailover(Exchange exchange) { + // use a copy of the exchange to avoid side effects on the original exchange + return ExchangeHelper.createCopy(exchange, true); } - private boolean processExchange(Processor processor, Exchange exchange, + private boolean processExchange(Processor processor, Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List processors) { if (processor == null) { - throw new IllegalStateException("No processors could be chosen to process " + exchange); + throw new IllegalStateException("No processors could be chosen to process " + copy); } - log.debug("Processing failover at attempt {} for {}", attempts, exchange); + log.debug("Processing failover at attempt {} for {}", attempts, copy); AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor); - return AsyncProcessorHelper.process(albp, exchange, new FailOverAsyncCallback(exchange, attempts, index, callback, processors)); + return AsyncProcessorHelper.process(albp, copy, new FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors)); } /** @@ -219,13 +218,15 @@ public class FailOverLoadBalancer extend private final class FailOverAsyncCallback implements AsyncCallback { private final Exchange exchange; + private Exchange copy; private final AtomicInteger attempts; private final AtomicInteger index; private final AsyncCallback callback; private final List processors; - private FailOverAsyncCallback(Exchange exchange, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List processors) { + private FailOverAsyncCallback(Exchange exchange, Exchange copy, AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, List processors) { this.exchange = exchange; + this.copy = copy; this.attempts = attempts; this.index = index; this.callback = callback; @@ -238,7 +239,7 @@ public class FailOverLoadBalancer extend return; } - while (shouldFailOver(exchange)) { + while (shouldFailOver(copy)) { attempts.incrementAndGet(); // are we exhausted by attempts? if (maximumFailoverAttempts > -1 && attempts.get() > maximumFailoverAttempts) { @@ -263,11 +264,11 @@ public class FailOverLoadBalancer extend } // try again but prepare exchange before we failover - prepareExchangeForFailover(exchange); + copy = prepareExchangeForFailover(exchange); Processor processor = processors.get(index.get()); // try to failover using the next processor - doneSync = processExchange(processor, exchange, attempts, index, callback, processors); + doneSync = processExchange(processor, exchange, copy, attempts, index, callback, processors); if (!doneSync) { log.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); // the remainder of the failover will be completed async @@ -276,8 +277,11 @@ public class FailOverLoadBalancer extend } } + // and copy the current result to original so it will contain this result of this eip + if (copy != null) { + ExchangeHelper.copyResults(exchange, copy); + } log.debug("Failover complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - // signal callback we are done callback.done(false); }; Propchange: camel/branches/camel-2.9.x/components/camel-solr/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Thu Mar 15 08:08:54 2012 @@ -1 +1 @@ -/camel/trunk/components/camel-solr:1227197-1298579,1298795,1298821,1298993,1299399,1300805 +/camel/trunk/components/camel-solr:1227197-1298579,1298795,1298821,1298993,1299399,1300805,1300831