camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/5] camel git commit: CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.
Date Mon, 23 Mar 2015 10:54:16 GMT
CAMEL-7433: Allow aggregation strategy to determine pre complete when using aggregator.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/efaa7bf7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/efaa7bf7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/efaa7bf7

Branch: refs/heads/master
Commit: efaa7bf71a674ac7a98d43b9c187860b04eef9ad
Parents: 7973ac5
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Mon Mar 23 10:25:09 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Mon Mar 23 11:56:03 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  1 +
 .../processor/aggregate/AggregateProcessor.java | 89 ++++++++++++++------
 .../AggregatePreCompleteAwareStrategyTest.java  | 54 ++++++++++++
 ...gatePreCompleteAwareStrategyTimeoutTest.java | 54 ++++++++++++
 .../AggregatePredicateAwareStrategyTest.java    | 53 ------------
 5 files changed, 172 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index 942d69b..cfcb027 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -41,6 +41,7 @@ import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
 import org.apache.camel.processor.aggregate.OptimisticLockRetryPolicy;
+import org.apache.camel.processor.aggregate.PreCompletionAwareAggregationStrategy;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.RouteContext;

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index b365442..fbec104 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -92,6 +92,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     private final Processor processor;
     private String id;
     private AggregationStrategy aggregationStrategy;
+    private boolean preCompletion;
     private Expression correlationExpression;
     private AggregateController aggregateController;
     private final ExecutorService executorService;
@@ -376,6 +377,7 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         LOG.trace("onAggregation +++ start +++ with correlation key: {}", key);
 
         List<Exchange> list = new ArrayList<Exchange>();
+        String complete = null;
 
         Exchange answer;
         Exchange originalExchange = aggregationRepository.get(newExchange.getContext(), key);
@@ -396,31 +398,36 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         ExchangeHelper.prepareAggregation(oldExchange, newExchange);
 
         // check if we are pre complete
-        boolean preComplete;
-        try {
-            // put the current aggregated size on the exchange so its avail during completion
check
-            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
-            preComplete = onPreCompletionAggregation(oldExchange, newExchange);
-            // remove it afterwards
-            newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
-        } catch (Throwable e) {
-            // must catch any exception from aggregation
-            throw new CamelExchangeException("Error occurred during preComplete", newExchange,
e);
-        }
-
-        // check if we are complete
-        String complete = null;
-        if (!preComplete && isEagerCheckCompletion()) {
+        if (preCompletion) {
+            try {
+                // put the current aggregated size on the exchange so its avail during completion
check
+                newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+                complete = isPreCompleted(key, oldExchange, newExchange);
+                // make sure to track timeouts if not complete
+                if (complete == null) {
+                    trackTimeout(key, newExchange);
+                }
+                // remove it afterwards
+                newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
+            } catch (Throwable e) {
+                // must catch any exception from aggregation
+                throw new CamelExchangeException("Error occurred during preComplete", newExchange,
e);
+            }
+        } else if (isEagerCheckCompletion()) {
             // put the current aggregated size on the exchange so its avail during completion
check
             newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
             complete = isCompleted(key, newExchange);
+            // make sure to track timeouts if not complete
+            if (complete == null) {
+                trackTimeout(key, newExchange);
+            }
             // remove it afterwards
             newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
         }
 
-        if (preComplete) {
+        if (preCompletion && complete != null) {
             // need to pre complete the current group before we aggregate
-            doAggregationComplete("strategy", list, key, originalExchange, oldExchange);
+            doAggregationComplete(complete, list, key, originalExchange, oldExchange);
             // as we complete the current group eager, we should indicate the new group is
not complete
             complete = null;
             // and clear old/original exchange as we start on a new group
@@ -445,8 +452,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         answer.setProperty(Exchange.AGGREGATED_SIZE, size);
 
         // maybe we should check completion after the aggregation
-        if (!isEagerCheckCompletion()) {
+        if (!preCompletion && !isEagerCheckCompletion()) {
             complete = isCompleted(key, answer);
+            // make sure to track timeouts if not complete
+            if (complete == null) {
+                trackTimeout(key, newExchange);
+            }
         }
 
         if (complete == null) {
@@ -515,6 +526,22 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
     }
 
     /**
+     * Tests whether the given exchanges is pre-complete or not
+     *
+     * @param key      the correlation key
+     * @param oldExchange   the existing exchange
+     * @param newExchange the incoming exchange
+     * @return <tt>null</tt> if not pre-completed, otherwise a String with the
type that triggered the pre-completion
+     */
+    protected String isPreCompleted(String key, Exchange oldExchange, Exchange newExchange)
{
+        boolean complete = false;
+        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+            complete = ((PreCompletionAwareAggregationStrategy) aggregationStrategy).preComplete(oldExchange,
newExchange);
+        }
+        return complete ? "strategy" : null;
+    }
+
+    /**
      * Tests whether the given exchange is complete or not
      *
      * @param key      the correlation key
@@ -564,6 +591,11 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
         }
 
+        // not complete
+        return null;
+    }
+
+    protected void trackTimeout(String key, Exchange exchange) {
         // timeout can be either evaluated based on an expression or from a fixed value
         // expression takes precedence
         boolean timeoutSet = false;
@@ -586,9 +618,6 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             }
             addExchangeToTimeoutMap(key, exchange, getCompletionTimeout());
         }
-
-        // not complete
-        return null;
     }
 
     protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
@@ -1182,11 +1211,19 @@ public class AggregateProcessor extends ServiceSupport implements
AsyncProcessor
 
     @Override
     protected void doStart() throws Exception {
-        if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 &&
getCompletionSize() <= 0 && getCompletionPredicate() == null
-                && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression()
== null
-                && getCompletionSizeExpression() == null) {
-            throw new IllegalStateException("At least one of the completions options"
-                    + " [completionTimeout, completionInterval, completionSize, completionPredicate,
completionFromBatchConsumer] must be set");
+        if (aggregationStrategy instanceof PreCompletionAwareAggregationStrategy) {
+            preCompletion = true;
+            LOG.info("PreCompletionAwareAggregationStrategy detected. Aggregator {} is in
pre-completion mode.", getId());
+        }
+
+        if (!preCompletion) {
+            // if not in pre completion mode then check we configured the completion required
+            if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0
&& getCompletionSize() <= 0 && getCompletionPredicate() == null
+                    && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression()
== null
+                    && getCompletionSizeExpression() == null) {
+                throw new IllegalStateException("At least one of the completions options"
+                        + " [completionTimeout, completionInterval, completionSize, completionPredicate,
completionFromBatchConsumer] must be set");
+            }
         }
 
         if (getCloseCorrelationKeyOnCompletion() != null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
new file mode 100644
index 0000000..f965c90
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version 
+ */
+public class AggregatePreCompleteAwareStrategyTest extends ContextTestSupport {
+
+    public void testAggregatePreComplete() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy())
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
new file mode 100644
index 0000000..abfda10
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePreCompleteAwareStrategyTimeoutTest.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
+
+/**
+ * @version 
+ */
+public class AggregatePreCompleteAwareStrategyTimeoutTest extends ContextTestSupport {
+
+    public void testAggregatePreCompleteTimeout() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E", "X+F");
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+        template.sendBodyAndHeader("direct:start", "X", "id", 123);
+        template.sendBodyAndHeader("direct:start", "F", "id", 123);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionTimeout(1000)
+                        .to("mock:aggregated");
+            }
+        };
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/efaa7bf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
deleted file mode 100644
index 74fe19b..0000000
--- a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatePredicateAwareStrategyTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.processor.BodyInPreCompleteAggregatingStrategy;
-
-/**
- * @version 
- */
-public class AggregatePredicateAwareStrategyTest extends ContextTestSupport {
-
-    public void testAggregatePreComplete() throws Exception {
-        getMockEndpoint("mock:aggregated").expectedBodiesReceived("A+B+C", "X+D+E");
-
-        template.sendBodyAndHeader("direct:start", "A", "id", 123);
-        template.sendBodyAndHeader("direct:start", "B", "id", 123);
-        template.sendBodyAndHeader("direct:start", "C", "id", 123);
-        template.sendBodyAndHeader("direct:start", "X", "id", 123);
-        template.sendBodyAndHeader("direct:start", "D", "id", 123);
-        template.sendBodyAndHeader("direct:start", "E", "id", 123);
-        template.sendBodyAndHeader("direct:start", "X", "id", 123);
-
-        assertMockEndpointsSatisfied();
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                from("direct:start")
-                    .aggregate(header("id"), new BodyInPreCompleteAggregatingStrategy()).completionSize(5)
-                        .to("mock:aggregated");
-            }
-        };
-    }
-}
\ No newline at end of file


Mime
View raw message