camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/3] git commit: CAMEL-6775: Optimized aggergate eip to send out aggregated exchange outside lock.
Date Mon, 23 Sep 2013 10:24:15 GMT
Updated Branches:
  refs/heads/master ef33795ed -> ca90341e9


CAMEL-6775: Optimized aggergate eip to send out aggregated exchange outside lock.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/7bc8fa56
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/7bc8fa56
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/7bc8fa56

Branch: refs/heads/master
Commit: 7bc8fa560abbac137356fd84bda2e9c142b9749a
Parents: ef33795
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Sep 23 08:16:52 2013 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Sep 23 08:17:01 2013 +0200

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java | 52 +++++++++++++++-----
 1 file changed, 41 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/7bc8fa56/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 8e2a26f..86094bc 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -212,6 +212,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
         // when optimist locking is enabled we keep trying until we succeed
         if (optimisticLocking) {
+            Exchange aggregated = null;
             boolean exhaustedRetries = true;
             int attempt = 0;
             do {
@@ -220,7 +221,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 // the aggregated output runs in another unit of work
                 Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false);
                 try {
-                    doAggregation(key, copy);
+                    aggregated = doAggregation(key, copy);
                     exhaustedRetries = false;
                     break;
                 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException
e) {
@@ -233,6 +234,9 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             if (exhaustedRetries) {
                 throw new CamelExchangeException("Exhausted optimistic locking retry attempts,
tried " + attempt + " times", exchange,
                         new OptimisticLockingAggregationRepository.OptimisticLockingException());
+            } else if (aggregated != null) {
+                // we are completed so submit to completion
+                onSubmitCompletion(key, aggregated);
             }
         } else {
             // copy exchange, and do not share the unit of work
@@ -242,12 +246,18 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             // when memory based then its fast using synchronized, but if the aggregation
repository is IO
             // bound such as JPA etc then concurrent aggregation per correlation key could
             // improve performance as we can run aggregation repository get/add in parallel
+            Exchange aggregated = null;
             lock.lock();
             try {
-                doAggregation(key, copy);
+                aggregated = doAggregation(key, copy);
             } finally {
                 lock.unlock();
             }
+
+            // we are completed so do that work outside the lock
+            if (aggregated != null) {
+                onSubmitCompletion(key, aggregated);
+            }
         }
 
         // check for the special header to force completion of all groups (inclusive of the
message)
@@ -262,10 +272,13 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
      * <p/>
      * This method <b>must</b> be run synchronized as we cannot aggregate the
same correlation key
      * in parallel.
+     * <p/>
+     * The returned {@link Exchange} should be send downstream using the {@link #onSubmitCompletion(String,
org.apache.camel.Exchange)}
+     * method which sends out the aggregated and completed {@link Exchange}.
      *
      * @param key      the correlation key
      * @param newExchange the exchange
-     * @return the aggregated exchange
+     * @return the aggregated exchange which is complete, or <tt>null</tt> if
not yet complete
      * @throws org.apache.camel.CamelExchangeException is thrown if error aggregating
      */
     private Exchange doAggregation(String key, Exchange newExchange) throws CamelExchangeException
{
@@ -319,6 +332,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         // only need to update aggregation repository if we are not complete
         if (complete == null) {
             doAggregationRepositoryAdd(newExchange.getContext(), key, originalExchange, answer);
+            // we are not complete so the answer should be null
+            answer = null;
         } else {
             // if batch consumer completion is enabled then we need to complete the group
             if ("consumer".equals(complete)) {
@@ -334,13 +349,16 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                     if (batchAnswer != null) {
                         batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
                         onCompletion(batchKey, originalExchange, batchAnswer, false);
+                        onSubmitCompletion(key, batchAnswer);
                     }
                 }
                 batchConsumerCorrelationKeys.clear();
+                // we have already submitted to completion, so answer should be null
+                answer = null;
             } else {
                 // we are complete for this exchange
                 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
-                onCompletion(key, originalExchange, answer, false);
+                answer = onCompletion(key, originalExchange, answer, false);
             }
         }
 
@@ -448,7 +466,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         return aggregationStrategy.aggregate(oldExchange, newExchange);
     }
 
-    protected void onCompletion(final String key, final Exchange original, final Exchange
aggregated, boolean fromTimeout) {
+    protected Exchange onCompletion(final String key, final Exchange original, final Exchange
aggregated, boolean fromTimeout) {
         // store the correlation key as property before we remove so the repository has that
information
         if (original != null) {
             original.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key);
@@ -477,6 +495,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
         }
 
+        Exchange answer;
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
             // discard due timeout
             LOG.debug("Aggregation for correlation key {} discarding aggregated exchange:
{}", key, aggregated);
@@ -484,10 +503,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             aggregationRepository.confirm(aggregated.getContext(), aggregated.getExchangeId());
             // and remove redelivery state as well
             redeliveryState.remove(aggregated.getExchangeId());
+            // the completion was from timeout and we should just discard it
+            answer = null;
         } else {
             // the aggregated exchange should be published (sent out)
-            onSubmitCompletion(key, aggregated);
+            answer = aggregated;
         }
+
+        return answer;
     }
 
     private void onSubmitCompletion(final String key, final Exchange exchange) {
@@ -734,8 +757,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         this.correlationExpression = correlationExpression;
     }
 
-
-
     /**
      * On completion task which keeps the booking of the in progress up to date
      */
@@ -815,7 +836,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                 // indicate it was completed by timeout
                 answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout");
                 try {
-                    onCompletion(key, answer, answer, true);
+                    answer = onCompletion(key, answer, answer, true);
+                    if (answer != null) {
+                        onSubmitCompletion(key, answer);
+                    }
                 } catch (OptimisticLockingAggregationRepository.OptimisticLockingException
e) {
                     evictionStolen = true;
                 }
@@ -860,7 +884,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                             // indicate it was completed by interval
                             exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
                             try {
-                                onCompletion(key, exchange, exchange, false);
+                                Exchange answer = onCompletion(key, exchange, exchange, false);
+                                if (answer != null) {
+                                    onSubmitCompletion(key, answer);
+                                }
                             } catch (OptimisticLockingAggregationRepository.OptimisticLockingException
e) {
                                 stolenInterval = true;
                             }
@@ -1167,7 +1194,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
                         LOG.trace("Force completion triggered for correlation key: {}", key);
                         // indicate it was completed by a force completion request
                         exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
-                        onCompletion(key, exchange, exchange, false);
+                        Exchange answer = onCompletion(key, exchange, exchange, false);
+                        if (answer != null) {
+                            onSubmitCompletion(key, answer);
+                        }
                     }
                 }
             } finally {


Mime
View raw message