camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1457414 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java
Date Sun, 17 Mar 2013 10:17:46 GMT
Author: davsclaus
Date: Sun Mar 17 10:17:46 2013
New Revision: 1457414

URL: http://svn.apache.org/r1457414
Log:
CAMEL-6172: Aggregate EIP - Completion from batch consumer should trigger first

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java
      - copied, changed from r1457400, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1457414&r1=1457413&r2=1457414&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sun Mar 17 10:17:46 2013
@@ -354,6 +354,18 @@ public class AggregateProcessor extends 
      * @return <tt>null</tt> if not completed, otherwise a String with the type
that triggered the completion
      */
     protected String isCompleted(String key, Exchange exchange) {
+        // batch consumer completion must always run first
+        if (isCompletionFromBatchConsumer()) {
+            batchConsumerCorrelationKeys.add(key);
+            batchConsumerCounter.incrementAndGet();
+            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
+            if (size > 0 && batchConsumerCounter.intValue() >= size) {
+                // batch consumer is complete then reset the counter
+                batchConsumerCounter.set(0);
+                return "consumer";
+            }
+        }
+
         if (getCompletionPredicate() != null) {
             boolean answer = getCompletionPredicate().matches(exchange);
             if (answer) {
@@ -403,17 +415,6 @@ public class AggregateProcessor extends 
             addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
         }
 
-        if (isCompletionFromBatchConsumer()) {
-            batchConsumerCorrelationKeys.add(key);
-            batchConsumerCounter.incrementAndGet();
-            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
-            if (size > 0 && batchConsumerCounter.intValue() >= size) {
-                // batch consumer is complete then reset the counter
-                batchConsumerCounter.set(0);
-                return "consumer";
-            }
-        }
-
         // not complete
         return null;
     }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java
(from r1457400, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTest.java&r1=1457400&r2=1457414&rev=1457414&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionSizeAndBatchConsumerTest.java
Sun Mar 17 10:17:46 2013
@@ -22,18 +22,25 @@ import org.apache.camel.builder.RouteBui
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.BodyInAggregatingStrategy;
 
-public class AggregateExpressionTest extends ContextTestSupport {
+public class AggregateCompletionSizeAndBatchConsumerTest extends ContextTestSupport {
 
     public void testAggregateExpressionSize() throws Exception {
         MockEndpoint result =  getMockEndpoint("mock:result");
-        result.expectedBodiesReceived("A+A", "B+B", "Z");
+        // A+A+A gets completed by size, the others by consumer
+        result.expectedBodiesReceived("A+A+A", "A", "B+B", "Z");
+        result.message(0).property(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("size");
+        result.message(1).property(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("consumer");
+        result.message(2).property(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("consumer");
+        result.message(3).property(Exchange.AGGREGATED_COMPLETED_BY).isEqualTo("consumer");
 
         template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "A");
         template.sendBody("direct:start", "B");
         template.sendBody("direct:start", "A");
         template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "A");
         // send the last one with the batch size property
-        template.sendBodyAndProperty("direct:start", "Z", Exchange.BATCH_SIZE, 5);
+        template.sendBodyAndProperty("direct:start", "Z", Exchange.BATCH_SIZE, 7);
 
         assertMockEndpointsSatisfied();
     }
@@ -43,11 +50,9 @@ public class AggregateExpressionTest ext
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // START SNIPPET: e1
-                from("direct:start")                    
-                    .aggregate(body(), new BodyInAggregatingStrategy()).completionFromBatchConsumer()
-                    .to("mock:result");
-                // END SNIPPET: e1
+                from("direct:start")
+                    .aggregate(body(), new BodyInAggregatingStrategy()).completionSize(3).completionFromBatchConsumer()
+                    .to("log:result", "mock:result");
             }
         };
     }



Mime
View raw message