camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1457415 - in /camel/branches/camel-2.10.x: ./ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/test/java/org/apache/camel/processor/aggregator/
Date Sun, 17 Mar 2013 10:18:28 GMT
Author: davsclaus
Date: Sun Mar 17 10:18:28 2013
New Revision: 1457415

URL: http://svn.apache.org/r1457415
Log:
CAMEL-6172: Aggregate EIP - Completion from batch consumer should trigger first

Added:
    camel/branches/camel-2.10.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java
      - copied unchanged from r1457414, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java
Modified:
    camel/branches/camel-2.10.x/   (props changed)
    camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
  Merged /camel/trunk:r1457414

Propchange: camel/branches/camel-2.10.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1457415&r1=1457414&r2=1457415&view=diff
==============================================================================
--- camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++ camel/branches/camel-2.10.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sun Mar 17 10:18:28 2013
@@ -301,6 +301,18 @@ public class AggregateProcessor extends 
      * @return <tt>null</tt> if not completed, otherwise a String with the type
that triggered the completion
      */
     protected String isCompleted(String key, Exchange exchange) {
+        // batch consumer completion must always run first
+        if (isCompletionFromBatchConsumer()) {
+            batchConsumerCorrelationKeys.add(key);
+            batchConsumerCounter.incrementAndGet();
+            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
+            if (size > 0 && batchConsumerCounter.intValue() >= size) {
+                // batch consumer is complete then reset the counter
+                batchConsumerCounter.set(0);
+                return "consumer";
+            }
+        }
+
         if (getCompletionPredicate() != null) {
             boolean answer = getCompletionPredicate().matches(exchange);
             if (answer) {
@@ -350,17 +362,6 @@ public class AggregateProcessor extends 
             addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
         }
 
-        if (isCompletionFromBatchConsumer()) {
-            batchConsumerCorrelationKeys.add(key);
-            batchConsumerCounter.incrementAndGet();
-            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
-            if (size > 0 && batchConsumerCounter.intValue() >= size) {
-                // batch consumer is complete then reset the counter
-                batchConsumerCounter.set(0);
-                return "consumer";
-            }
-        }
-
         // not complete
         return null;
     }



Mime
View raw message