camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject camel git commit: CAMEL-8233: Option parallelAggregate is not in use when using parallel procession as well
Date Mon, 12 Jan 2015 15:52:25 GMT
Repository: camel
Updated Branches:
  refs/heads/master 9ea07f899 -> 8a28aa443


CAMEL-8233: Option parallelAggregate is not in use when using parallel procession as well


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8a28aa44
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8a28aa44
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8a28aa44

Branch: refs/heads/master
Commit: 8a28aa443457e393163cde9281fad96876fb2d6d
Parents: 9ea07f8
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Jan 12 16:51:29 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Jan 12 16:52:18 2015 +0100

----------------------------------------------------------------------
 .../camel/processor/MulticastProcessor.java       | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/8a28aa44/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
index 6c1a54c..ee41a20 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
@@ -482,8 +482,11 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
                     }
 
                     // we got a result so aggregate it
-                    AggregationStrategy strategy = getAggregationStrategy(subExchange);
-                    doAggregate(strategy, result, subExchange);
+                    if (parallelAggregate) {
+                        doAggregateInternal(getAggregationStrategy(subExchange), result,
subExchange);
+                    } else {
+                        doAggregate(getAggregationStrategy(subExchange), result, subExchange);
+                    }
                 }
 
                 aggregated++;
@@ -814,11 +817,13 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     /**
-     * Aggregate the {@link Exchange} with the current result
+     * Aggregate the {@link Exchange} with the current result.
+     * This method is synchronized and is called directly when parallelAggregate is disabled
(by default).
      *
      * @param strategy the aggregation strategy to use
      * @param result   the current result
      * @param exchange the exchange to be added to the result
+     * @see #doAggregateInternal(org.apache.camel.processor.aggregate.AggregationStrategy,
org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange)
      */
     protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange
result, Exchange exchange) {
         doAggregateInternal(strategy, result, exchange);
@@ -829,9 +834,10 @@ public class MulticastProcessor extends ServiceSupport implements AsyncProcessor
      * This method is unsynchronized and is called directly when parallelAggregate is enabled.
      * In all other cases, this method is called from the doAggregate which is a synchronized
method
      *
-     * @param strategy
-     * @param result
-     * @param exchange
+     * @param strategy the aggregation strategy to use
+     * @param result   the current result
+     * @param exchange the exchange to be added to the result
+     * @see #doAggregate(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange,
org.apache.camel.Exchange)
      */
     protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result,
Exchange exchange) {
         if (strategy != null) {


Mime
View raw message