camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1056744 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/MulticastProcessor.java main/java/org/apache/camel/util/concurrent/AtomicException.java test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
Date Sat, 08 Jan 2011 17:29:01 GMT
Author: davsclaus
Date: Sat Jan  8 17:29:00 2011
New Revision: 1056744

URL: http://svn.apache.org/viewvc?rev=1056744&view=rev
Log:
CAMEL-3497: Fixed splitter using too much memory when using parallel mode. Now it aggregtes
on-the-fly which means memory consumption is low and it now runs much faster overall in performance.

Added:
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java
      - copied, changed from r1056651, camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1056744&r1=1056743&r2=1056744&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Sat Jan  8 17:29:00 2011
@@ -26,6 +26,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -58,6 +59,7 @@ import org.apache.camel.util.KeyValueHol
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.ServiceHelper;
 import org.apache.camel.util.StopWatch;
+import org.apache.camel.util.concurrent.AtomicException;
 import org.apache.camel.util.concurrent.AtomicExchange;
 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
 import org.apache.commons.logging.Log;
@@ -143,6 +145,7 @@ public class MulticastProcessor extends 
     private final boolean streaming;
     private final boolean stopOnException;
     private final ExecutorService executorService;
+    private ExecutorService aggregationExecutorService;
     private final long timeout;
     private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new
ConcurrentHashMap<PreparedErrorHandler, Processor>();
 
@@ -229,10 +232,9 @@ public class MulticastProcessor extends 
     }
 
     protected void doProcessParallel(final Exchange original, final AtomicExchange result,
final Iterable<ProcessorExchangePair> pairs,
-                                     final boolean streaming, final AsyncCallback callback)
throws InterruptedException, ExecutionException {
-        final CompletionService<Exchange> completion;
-        final AtomicBoolean running = new AtomicBoolean(true);
+                                     final boolean streaming, final AsyncCallback callback)
throws Exception {
 
+        final CompletionService<Exchange> completion;
         if (streaming) {
             // execute tasks in parallel+streaming and aggregate in the order they are finished
(out of order sequence)
             completion = new ExecutorCompletionService<Exchange>(executorService);
@@ -241,9 +243,28 @@ public class MulticastProcessor extends 
             completion = new SubmitOrderedCompletionService<Exchange>(executorService);
         }
 
+        // when parallel then aggregate on the fly
+        final AtomicBoolean running = new AtomicBoolean(true);
         final AtomicInteger total = new AtomicInteger(0);
+        final AtomicBoolean allTasksSubmitted = new AtomicBoolean();
+        final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1);
+        final AtomicException executionException = new AtomicException();
 
         final Iterator<ProcessorExchangePair> it = pairs.iterator();
+
+        if (it.hasNext()) {
+            // issue task to execute in separate thread so it can aggregate on-the-fly
+            // while we submit new tasks, and those tasks complete concurrently
+            // this allows us to optimize work and reduce memory consumption
+            AggregateOnTheFlyTask task = new AggregateOnTheFlyTask(result, original, total,
completion, running,
+                    aggregationOnTheFlyDone, allTasksSubmitted, executionException);
+
+            // and start the task using the aggregation execution service
+            aggregationExecutorService.submit(task);
+        }
+
+        LOG.trace("Starting to submit parallel tasks");
+
         while (it.hasNext()) {
             final ProcessorExchangePair pair = it.next();
             final Exchange subExchange = pair.getExchange();
@@ -285,90 +306,181 @@ public class MulticastProcessor extends 
             total.incrementAndGet();
         }
 
-        // TODO: in streaming mode we need to aggregate on-the-fly
+        // signal all tasks has been submitted
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Signaling that all " + total.get() + " tasks has been submitted.");
+        }
+        allTasksSubmitted.set(true);
 
-        // its to hard to do parallel async routing so we let the caller thread be synchronously
-        // and have it pickup the replies and do the aggregation
-        boolean timedOut = false;
-        boolean stoppedOnException = false;
-        final StopWatch watch = new StopWatch();
-        for (int i = 0; i < total.intValue(); i++) {
-            Future<Exchange> future;
-            if (timedOut) {
-                // we are timed out but try to grab if some tasks has been completed
-                // poll will return null if no tasks is present
-                future = completion.poll();
-            } else if (timeout > 0) {
-                long left = timeout - watch.taken();
-                if (left < 0) {
-                    left = 0;
-                }
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Polling completion task #" + i + " using timeout " + left
+ " millis.");
-                }
-                future = completion.poll(left, TimeUnit.MILLISECONDS);
-            } else {
-                // take will wait until the task is complete
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Polling completion task #" + i);
+        // wait for aggregation to be done
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Waiting for on-the-fly aggregation to complete aggregating " + total.get()
+ " responses.");
+        }
+        aggregationOnTheFlyDone.await();
+
+        // did we fail for whatever reason, if so throw that caused exception
+        if (executionException.get() != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Parallel processing failed due " + executionException.get().getMessage());
+            }
+            throw executionException.get();
+        }
+
+        // no everything is okay so we are done
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Done parallel processing " + total + " exchanges");
+        }
+    }
+
+    /**
+     * Task to aggregate on-the-fly for completed tasks when using parallel processing.
+     * <p/>
+     * This ensures lower memory consumption as we do not need to keep all completed tasks
in memory
+     * before we perform aggregation. Instead this separate thread will run and aggregate
when new
+     * completed tasks is done.
+     * <p/>
+     * The logic is fairly complex as this implementation has to keep track how far it got,
and also
+     * signal back to the <i>main</t> thread when its done, so the <i>main</t>
thread can continue
+     * processing when the entire splitting is done.
+     */
+    private final class AggregateOnTheFlyTask implements Runnable {
+
+        private final AtomicExchange result;
+        private final Exchange original;
+        private final AtomicInteger total;
+        private final CompletionService<Exchange> completion;
+        private final AtomicBoolean running;
+        private final CountDownLatch aggregationOnTheFlyDone;
+        private final AtomicBoolean allTasksSubmitted;
+        private final AtomicException executionException;
+
+        private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger
total,
+                                      CompletionService<Exchange> completion, AtomicBoolean
running,
+                                      CountDownLatch aggregationOnTheFlyDone, AtomicBoolean
allTasksSubmitted,
+                                      AtomicException executionException) {
+            this.result = result;
+            this.original = original;
+            this.total = total;
+            this.completion = completion;
+            this.running = running;
+            this.aggregationOnTheFlyDone = aggregationOnTheFlyDone;
+            this.allTasksSubmitted = allTasksSubmitted;
+            this.executionException = executionException;
+        }
+
+        public void run() {
+            LOG.trace("Aggregate on the fly task +++ started +++");
+
+            try {
+                aggregateOnTheFly();
+            } catch (Throwable e) {
+                if (e instanceof Exception) {
+                    executionException.set((Exception) e);
+                } else {
+                    executionException.set(ObjectHelper.wrapRuntimeCamelException(e));
                 }
-                future = completion.take();
             }
 
-            if (future == null && timedOut) {
-                // we are timed out and no more tasks complete so break out
-                break;
-            } else if (future == null) {
-                // timeout occurred
-                AggregationStrategy strategy = getAggregationStrategy(null);
-                if (strategy instanceof TimeoutAwareAggregationStrategy) {
-                    // notify the strategy we timed out
-                    Exchange oldExchange = result.get();
-                    if (oldExchange == null) {
-                        // if they all timed out the result may not have been set yet, so
use the original exchange
-                        oldExchange = original;
+            // must signal we are done so the latch can open and let the other thread continue
processing
+            LOG.trace("Signaling we are done aggregating on the fly");
+            aggregationOnTheFlyDone.countDown();
+
+            LOG.trace("Aggregate on the fly task +++ done +++");
+        }
+
+        private void aggregateOnTheFly() throws InterruptedException, ExecutionException
{
+            // its to hard to do parallel async routing so we let the caller thread be synchronously
+            // and have it pickup the replies and do the aggregation
+            boolean timedOut = false;
+            boolean stoppedOnException = false;
+            final StopWatch watch = new StopWatch();
+            int aggregated = 0;
+            boolean done = false;
+            // not a for loop as on the fly may still run
+            while (!done) {
+                // check if we have already aggregate everything
+                if (allTasksSubmitted.get() && aggregated >= total.get() || stoppedOnException)
{
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Done aggregating " + aggregated + " exchanges on the fly.");
+                    }
+                    break;
+                }
+
+                Future<Exchange> future;
+                if (timedOut) {
+                    // we are timed out but try to grab if some tasks has been completed
+                    // poll will return null if no tasks is present
+                    future = completion.poll();
+                } else if (timeout > 0) {
+                    long left = timeout - watch.taken();
+                    if (left < 0) {
+                        left = 0;
+                    }
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Polling completion task #" + aggregated + " using timeout
" + left + " millis.");
                     }
-                    ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, i,
total.intValue(), timeout);
+                    future = completion.poll(left, TimeUnit.MILLISECONDS);
                 } else {
-                    // log a WARN we timed out since it will not be aggregated and the Exchange
will be lost
-                    LOG.warn("Parallel processing timed out after " + timeout + " millis
for number " + i + ". This task will be cancelled and will not be aggregated.");
+                    // take will wait until the task is complete
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Polling completion task #" + aggregated);
+                    }
+                    future = completion.take();
                 }
-                timedOut = true;
-            } else {
-                // there is a result to aggregate
-                Exchange subExchange = future.get();
 
-                // Decide whether to continue with the multicast or not; similar logic to
the Pipeline
-                Integer number = getExchangeIndex(subExchange);
-                boolean continueProcessing = PipelineHelper.continueProcessing(subExchange,
"Parallel processing failed for number " + number, LOG);
-                if (stopOnException && !continueProcessing) {
-                    // we want to stop on exception and an exception or failure occurred
-                    // this is similar to what the pipeline does, so we should do the same
to not surprise end users
-                    // so we should set the failed exchange as the result and break out
-                    result.set(subExchange);
-                    stoppedOnException = true;
+                if (future == null && timedOut) {
+                    // we are timed out and no more tasks complete so break out
                     break;
+                } else if (future == null) {
+                    // timeout occurred
+                    AggregationStrategy strategy = getAggregationStrategy(null);
+                    if (strategy instanceof TimeoutAwareAggregationStrategy) {
+                        // notify the strategy we timed out
+                        Exchange oldExchange = result.get();
+                        if (oldExchange == null) {
+                            // if they all timed out the result may not have been set yet,
so use the original exchange
+                            oldExchange = original;
+                        }
+                        ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange,
aggregated, total.intValue(), timeout);
+                    } else {
+                        // log a WARN we timed out since it will not be aggregated and the
Exchange will be lost
+                        LOG.warn("Parallel processing timed out after " + timeout + " millis
for number " + aggregated + ". This task will be cancelled and will not be aggregated.");
+                    }
+                    timedOut = true;
+                } else {
+                    // there is a result to aggregate
+                    Exchange subExchange = future.get();
+
+                    // Decide whether to continue with the multicast or not; similar logic
to the Pipeline
+                    Integer number = getExchangeIndex(subExchange);
+                    boolean continueProcessing = PipelineHelper.continueProcessing(subExchange,
"Parallel processing failed for number " + number, LOG);
+                    if (stopOnException && !continueProcessing) {
+                        // we want to stop on exception and an exception or failure occurred
+                        // this is similar to what the pipeline does, so we should do the
same to not surprise end users
+                        // so we should set the failed exchange as the result and break out
+                        result.set(subExchange);
+                        stoppedOnException = true;
+                        break;
+                    }
+
+                    // we got a result so aggregate it
+                    AggregationStrategy strategy = getAggregationStrategy(subExchange);
+                    doAggregate(strategy, result, subExchange);
                 }
 
-                // we got a result so aggregate it
-                AggregationStrategy strategy = getAggregationStrategy(subExchange);
-                doAggregate(strategy, result, subExchange);
+                aggregated++;
             }
-        }
 
-        if (timedOut || stoppedOnException) {
-            if (timedOut && LOG.isDebugEnabled()) {
-                LOG.debug("Cancelling tasks due timeout after " + timeout + " millis.");
-            }
-            if (stoppedOnException && LOG.isDebugEnabled()) {
-                LOG.debug("Cancelling tasks due stopOnException.");
+            if (timedOut || stoppedOnException) {
+                if (timedOut && LOG.isDebugEnabled()) {
+                    LOG.debug("Cancelling tasks due timeout after " + timeout + " millis.");
+                }
+                if (stoppedOnException && LOG.isDebugEnabled()) {
+                    LOG.debug("Cancelling tasks due stopOnException.");
+                }
+                // cancel tasks as we timed out (its safe to cancel done tasks)
+                running.set(false);
             }
-            // cancel tasks as we timed out (its safe to cancel done tasks)
-            running.set(false);
-        }
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Done parallel processing " + total + " exchanges");
         }
     }
 
@@ -754,6 +866,9 @@ public class MulticastProcessor extends 
         if (timeout > 0 && !isParallelProcessing()) {
             throw new IllegalArgumentException("Timeout is used but ParallelProcessing has
not been enabled");
         }
+        if (isParallelProcessing()) {
+            aggregationExecutorService = getCamelContext().getExecutorServiceStrategy().newCachedThreadPool(this,
"AggregationTask");
+        }
         ServiceHelper.startServices(processors);
     }
 

Copied: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java
(from r1056651, camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java?p2=camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java&p1=camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java&r1=1056651&r2=1056744&rev=1056744&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicExchange.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/AtomicException.java
Sat Jan  8 17:29:00 2011
@@ -18,12 +18,10 @@ package org.apache.camel.util.concurrent
 
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.camel.Exchange;
-
 /**
- * Convenience class for holding an {@link Exchange} in a thread-safe way
+ * Convenience class for holding an {@link Exception} in a thread-safe way
  */
 @SuppressWarnings("serial")
-public class AtomicExchange extends AtomicReference<Exchange> {
+public class AtomicException extends AtomicReference<Exception> {
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java?rev=1056744&r1=1056743&r2=1056744&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterParallelBigFileTest.java
Sat Jan  8 17:29:00 2011
@@ -84,7 +84,9 @@ public class SplitterParallelBigFileTest
 
                 from("file:target/split")
                     .split(body().tokenize("\n")).streaming().parallelProcessing()
-                        .to("log:split?groupSize=1000");
+                        .to("log:split?groupSize=1000")
+                    .end()
+                    .log("Done splitting ${file:name}");
             }
         };
     }



Mime
View raw message