Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3A6D010DA6 for ; Mon, 12 Aug 2013 13:26:25 +0000 (UTC) Received: (qmail 72470 invoked by uid 500); 12 Aug 2013 13:26:25 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 72354 invoked by uid 500); 12 Aug 2013 13:26:24 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 72334 invoked by uid 99); 12 Aug 2013 13:26:21 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Aug 2013 13:26:21 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 82C918BC7A9; Mon, 12 Aug 2013 13:26:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Mon, 12 Aug 2013 13:26:22 -0000 Message-Id: <5fbf433bb9044aac907bf4bd600e9a04@git.apache.org> In-Reply-To: <67841338be1a4148b1ed39256f5688d4@git.apache.org> References: <67841338be1a4148b1ed39256f5688d4@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: CAMEL-6627: Splitter should close iterator when exception occurred (eg when done) to ensure not locking any files etc on windows etc. CAMEL-6627: Splitter should close iterator when exception occurred (eg when done) to ensure not locking any files etc on windows etc. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/76d8bee1 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/76d8bee1 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/76d8bee1 Branch: refs/heads/camel-2.11.x Commit: 76d8bee132f1dcbecdf2c3cdcb0479f0fa725d18 Parents: 001b95b Author: Claus Ibsen Authored: Mon Aug 12 14:47:18 2013 +0200 Committer: Claus Ibsen Committed: Mon Aug 12 15:25:39 2013 +0200 ---------------------------------------------------------------------- .../camel/processor/MulticastProcessor.java | 28 ++-- .../org/apache/camel/processor/Splitter.java | 130 +++++++++++-------- 2 files changed, 94 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/76d8bee1/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java index ecc682f..bd93719 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.camel.processor; +import java.io.Closeable; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -57,6 +58,7 @@ import org.apache.camel.util.AsyncProcessorHelper; import org.apache.camel.util.CastUtils; import org.apache.camel.util.EventHelper; import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.KeyValueHolder; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.ServiceHelper; @@ -198,7 +200,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor public boolean process(Exchange exchange, AsyncCallback callback) { final AtomicExchange result = new AtomicExchange(); - final Iterable pairs; + Iterable pairs = null; try { boolean sync = true; @@ -222,14 +224,14 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor exchange.setException(e); // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted // and do the done work - doDone(exchange, null, callback, true, false); + doDone(exchange, null, pairs, callback, true, false); return true; } // multicasting was processed successfully // and do the done work Exchange subExchange = result.get() != null ? result.get() : null; - doDone(exchange, subExchange, callback, true, true); + doDone(exchange, subExchange, pairs, callback, true, true); return true; } @@ -603,7 +605,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor result.set(subExchange); } // and do the done work - doDone(original, subExchange, callback, false, true); + doDone(original, subExchange, pairs, callback, false, true); return; } @@ -613,7 +615,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); // and do the done work - doDone(original, subExchange, callback, false, true); + doDone(original, subExchange, pairs, callback, false, true); return; } @@ -647,7 +649,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor result.set(subExchange); } // and do the done work - doDone(original, subExchange, callback, false, true); + doDone(original, subExchange, pairs, callback, false, true); return; } @@ -658,7 +660,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // wrap in exception to explain where it failed subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); // and do the done work - doDone(original, subExchange, callback, false, true); + doDone(original, subExchange, pairs, callback, false, true); return; } @@ -667,7 +669,7 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor // do the done work subExchange = result.get() != null ? result.get() : null; - doDone(original, subExchange, callback, false, true); + doDone(original, subExchange, pairs, callback, false, true); } }); } finally { @@ -733,11 +735,19 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor * * @param original the original exchange * @param subExchange the current sub exchange, can be null for the synchronous part + * @param pairs the pairs with the exchanges to process * @param callback the callback * @param doneSync the doneSync parameter to call on callback * @param exhaust whether or not error handling is exhausted */ - protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync, boolean exhaust) { + protected void doDone(Exchange original, Exchange subExchange, final Iterable pairs, + AsyncCallback callback, boolean doneSync, boolean exhaust) { + + // we are done so close the pairs iterator + if (pairs != null && pairs instanceof Closeable) { + IOHelper.close((Closeable) pairs, "pairs", LOG); + } + // cleanup any per exchange aggregation strategy removeAggregationStrategyFromExchange(original); if (original.getException() != null || subExchange != null && subExchange.getException() != null) { http://git-wip-us.apache.org/repos/asf/camel/blob/76d8bee1/camel-core/src/main/java/org/apache/camel/processor/Splitter.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java index 6afd235..8c9e69f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java +++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java @@ -122,69 +122,89 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac } private Iterable createProcessorExchangePairsIterable(final Exchange exchange, final Object value) { - final Iterator iterator = ObjectHelper.createIterator(value); - return new Iterable() { - // create a copy which we use as master to copy during splitting - // this avoids any side effect reflected upon the incoming exchange - private final Exchange copy = copyExchangeNoAttachments(exchange, true); - private final RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; - - public Iterator iterator() { - return new Iterator() { - private int index; - private boolean closed; - - public boolean hasNext() { - if (closed) { - return false; - } + return new SplitterIterable(exchange, value); + } - boolean answer = iterator.hasNext(); - if (!answer) { - // we are now closed - closed = true; - // nothing more so we need to close the expression value in case it needs to be - if (value instanceof Closeable) { - IOHelper.close((Closeable) value, value.getClass().getName(), LOG); - } else if (value instanceof Scanner) { - // special for Scanner as it does not implement Closeable - Scanner scanner = (Scanner) value; - scanner.close(); - - IOException ioException = scanner.ioException(); - if (ioException != null) { - throw new RuntimeCamelException("Scanner aborted because of an IOException!", ioException); - } - } - } - return answer; + private final class SplitterIterable implements Iterable, Closeable { + + // create a copy which we use as master to copy during splitting + // this avoids any side effect reflected upon the incoming exchange + final Object value; + final Iterator iterator; + private final Exchange copy; + private final RouteContext routeContext; + + private SplitterIterable(Exchange exchange, Object value) { + this.value = value; + this.iterator = ObjectHelper.createIterator(value); + this.copy = copyExchangeNoAttachments(exchange, true); + this.routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; + } + + @Override + public Iterator iterator() { + return new Iterator() { + private int index; + private boolean closed; + + public boolean hasNext() { + if (closed) { + return false; } - public ProcessorExchangePair next() { - Object part = iterator.next(); - // create a correlated copy as the new exchange to be routed in the splitter from the copy - // and do not share the unit of work - Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); - // if we share unit of work, we need to prepare the child exchange - if (isShareUnitOfWork()) { - prepareSharedUnitOfWork(newExchange, copy); + boolean answer = iterator.hasNext(); + if (!answer) { + // we are now closed + closed = true; + // nothing more so we need to close the expression value in case it needs to be + try { + close(); + } catch (IOException e) { + throw new RuntimeCamelException("Scanner aborted because of an IOException!", e); } - if (part instanceof Message) { - newExchange.setIn((Message) part); - } else { - Message in = newExchange.getIn(); - in.setBody(part); - } - return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); } + return answer; + } - public void remove() { - throw new UnsupportedOperationException("Remove is not supported by this iterator"); + public ProcessorExchangePair next() { + Object part = iterator.next(); + // create a correlated copy as the new exchange to be routed in the splitter from the copy + // and do not share the unit of work + Exchange newExchange = ExchangeHelper.createCorrelatedCopy(copy, false); + // if we share unit of work, we need to prepare the child exchange + if (isShareUnitOfWork()) { + prepareSharedUnitOfWork(newExchange, copy); } - }; - } + if (part instanceof Message) { + newExchange.setIn((Message) part); + } else { + Message in = newExchange.getIn(); + in.setBody(part); + } + return createProcessorExchangePair(index++, getProcessors().iterator().next(), newExchange, routeContext); + } + + public void remove() { + throw new UnsupportedOperationException("Remove is not supported by this iterator"); + } + }; + } + + @Override + public void close() throws IOException { + if (value instanceof Closeable) { + IOHelper.close((Closeable) value, value.getClass().getName(), LOG); + } else if (value instanceof Scanner) { + // special for Scanner as it does not implement Closeable + Scanner scanner = (Scanner) value; + scanner.close(); - }; + IOException ioException = scanner.ioException(); + if (ioException != null) { + throw ioException; + } + } + } } private Iterable createProcessorExchangePairsList(Exchange exchange, Object value) {