camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/2] camel git commit: CAMEL-9193: Aggregator in preCompletion mode should also timeout if the new group does not receive further messages
Date Tue, 06 Oct 2015 08:44:42 GMT
Repository: camel
Updated Branches:
  refs/heads/master ff51e64d1 -> c216b4f9f


CAMEL-9193: Aggregator in preCompletion mode should also timeout if the new group does not
receive further messages


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2e578a07
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2e578a07
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2e578a07

Branch: refs/heads/master
Commit: 2e578a07aa3288ba8fa4adb737c4fb8c04c9cbb2
Parents: ff51e64
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Oct 6 10:16:55 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Oct 6 10:19:24 2015 +0200

----------------------------------------------------------------------
 .../processor/aggregate/AggregateProcessor.java |  5 ++++-
 ...gatePreCompleteAwareStrategyTimeoutTest.java | 22 ++++++++++++++++++++
 camel-core/src/test/resources/log4j.properties  |  4 ++--
 3 files changed, 28 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2e578a07/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 5c400f6..822e831 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -438,6 +438,8 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             originalExchange = null;
             // and reset the size to 1
             size = 1;
+            // make sure to track timeout as we just restart the correlation group when we
are in pre completion mode
+            trackTimeout(key, newExchange);
         }
 
         // aggregate the exchanges
@@ -495,7 +497,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             batchConsumerCorrelationKeys.clear();
             // we have already submitted to completion, so answer should be null
             answer = null;
-        } else {
+        } else if (answer != null) {
             // we are complete for this exchange
             answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete);
             answer = onCompletion(key, originalExchange, answer, false);
@@ -650,6 +652,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
         if (!fromTimeout && timeoutMap != null) {
             // cleanup timeout map if it was a incoming exchange which triggered the timeout
(and not the timeout checker)
+            LOG.trace("Removing correlation key {} from timeout", key);
             timeoutMap.remove(key);
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/2e578a07/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
index abfda10..ffcb03f 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
@@ -40,6 +40,28 @@ public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSup
         assertMockEndpointsSatisfied();
     }
 
+    public void testAggregatePreCompleteTimeoutOnlyOneInLastGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testAggregatePreCompleteTimeoutOnlyOneInFirstGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("X");
+
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {

http://git-wip-us.apache.org/repos/asf/camel/blob/2e578a07/camel-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/camel-core/src/test/resources/log4j.properties b/camel-core/src/test/resources/log4j.properties
index e91928c..f8ecded 100644
--- a/camel-core/src/test/resources/log4j.properties
+++ b/camel-core/src/test/resources/log4j.properties
@@ -23,7 +23,7 @@ log4j.logger.org.apache.camel.customlogger=TRACE, file2
 
 #log4j.logger.org.apache.camel.impl.converter=WARN
 #log4j.logger.org.apache.camel.management=DEBUG
-log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
+#log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.impl.converter.DefaultTypeConverter=TRACE
 #log4j.logger.org.apache.camel.impl.converter=DEBUG
 
@@ -49,7 +49,7 @@ log4j.logger.org.apache.camel.impl.DefaultPackageScanClassResolver=WARN
 #log4j.logger.org.apache.camel.processor.loadbalancer=TRACE
 #log4j.logger.org.apache.camel.processor.Delayer=TRACE
 #log4j.logger.org.apache.camel.processor.Throttler=TRACE
-#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=DEBUG
+#log4j.logger.org.apache.camel.processor.aggregate.AggregateProcessor=TRACE
 #log4j.logger.org.apache.camel.impl=TRACE
 #log4j.logger.org.apache.camel.util.FileUtil=TRACE
 #log4j.logger.org.apache.camel.util.AsyncProcessorHelper=TRACE


Mime
View raw message