camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [1/3] camel git commit: CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external source.
Date Sat, 21 Mar 2015 09:04:03 GMT
Repository: camel
Updated Branches:
  refs/heads/master ae8ce7379 -> 4bb4be751


CAMEL-7434: Aggregate now allows using a controller to control completion of groups from external
source.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/53500cf7
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/53500cf7
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/53500cf7

Branch: refs/heads/master
Commit: 53500cf7aca91373fad5e3f77c5537f493351df7
Parents: ae8ce73
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sat Mar 21 08:31:43 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sat Mar 21 08:31:43 2015 +0100

----------------------------------------------------------------------
 .../apache/camel/model/AggregateDefinition.java |  41 +++++++-
 .../aggregate/AggregateController.java          |  56 ++++++++++
 .../processor/aggregate/AggregateProcessor.java |  57 +++++++++-
 .../aggregate/DefaultAggregateController.java   |  71 +++++++++++++
 .../aggregator/AggregateControllerTest.java     | 103 +++++++++++++++++++
 5 files changed, 324 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
index f5d3746..087bc74 100644
--- a/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
@@ -35,6 +35,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.processor.CamelInternalProcessor;
+import org.apache.camel.processor.aggregate.AggregateController;
 import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategyBeanAdapter;
@@ -113,6 +114,10 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
     private Boolean discardOnCompletionTimeout;
     @XmlAttribute
     private Boolean forceCompletionOnStop;
+    @XmlTransient
+    private AggregateController aggregateController;
+    @XmlAttribute
+    private String aggregateControllerRef;
     @XmlElementRef
     private List<ProcessorDefinition<?>> outputs = new ArrayList<ProcessorDefinition<?>>();
 
@@ -182,7 +187,7 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
             shutdownThreadPool = true;
         }
 
-        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(),
internal,
+        AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(),
getId(), internal,
                 correlation, strategy, threadPool, shutdownThreadPool);
 
         AggregationRepository repository = createAggregationRepository(routeContext);
@@ -190,6 +195,10 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
             answer.setAggregationRepository(repository);
         }
 
+        if (getAggregateController() == null && getAggregateControllerRef() != null)
{
+            setAggregateController(routeContext.mandatoryLookup(getAggregateControllerRef(),
AggregateController.class));
+        }
+
         // this EIP supports using a shared timeout checker thread pool or fallback to create
a new thread pool
         boolean shutdownTimeoutThreadPool = false;
         ScheduledExecutorService timeoutThreadPool = timeoutCheckerExecutorService;
@@ -264,6 +273,9 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         } else {
             answer.setOptimisticLockRetryPolicy(optimisticLockRetryPolicy);
         }
+        if (getAggregateController() != null) {
+            answer.setAggregateController(getAggregateController());
+        }
         return answer;
     }
 
@@ -613,6 +625,22 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         this.forceCompletionOnStop = forceCompletionOnStop;
     }
 
+    public AggregateController getAggregateController() {
+        return aggregateController;
+    }
+
+    public void setAggregateController(AggregateController aggregateController) {
+        this.aggregateController = aggregateController;
+    }
+
+    public String getAggregateControllerRef() {
+        return aggregateControllerRef;
+    }
+
+    public void setAggregateControllerRef(String aggregateControllerRef) {
+        this.aggregateControllerRef = aggregateControllerRef;
+    }
+
     // Fluent API
     //-------------------------------------------------------------------------
 
@@ -900,7 +928,16 @@ public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition
         setTimeoutCheckerExecutorServiceRef(executorServiceRef);
         return this;
     }
-    
+
+    /**
+     * To use a {@link org.apache.camel.processor.aggregate.AggregateController} to allow
external sources to control
+     * this aggregator.
+     */
+    public AggregateDefinition aggregateController(AggregateController aggregateController)
{
+        setAggregateController(aggregateController);
+        return this;
+    }
+
     // Section - Methods from ExpressionNode
     // Needed to copy methods from ExpressionNode here so that I could specify the
     // correlation expression as optional in JAXB

http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
new file mode 100644
index 0000000..dab9d6e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateController.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+/**
+ * A controller which allows controlling a {@link org.apache.camel.processor.aggregate.AggregateProcessor}
from an
+ * external source, such as Java API or JMX. This can be used to force completion of aggregated
groups, and more.
+ */
+public interface AggregateController {
+
+    /**
+     * Callback when the aggregate processor is started.
+     *
+     * @param id        the aggregator id
+     * @param processor the aggregate processor
+     */
+    void onStart(String id, AggregateProcessor processor);
+
+    /**
+     * Callback when the aggregate processor is stopped.
+     *
+     * @param id        the aggregator id
+     * @param processor the aggregate processor
+     */
+    void onStop(String id, AggregateProcessor processor);
+
+    /**
+     * To force completing a specific group by its key.
+     *
+     * @param key the key
+     * @return <tt>1</tt> if the group was forced completed, <tt>0</tt>
if the group does not exists
+     */
+    int forceCompletionOfGroup(String key);
+
+    /**
+     * To force complete of all groups
+     *
+     * @return number of groups that was forced completed
+     */
+    int forceCompletionOfAllGroups();
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
index 244501a..33c97b4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
@@ -47,6 +47,7 @@ import org.apache.camel.TimeoutMap;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.AggregationRepository;
 import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.spi.HasId;
 import org.apache.camel.spi.OptimisticLockingAggregationRepository;
 import org.apache.camel.spi.RecoverableAggregationRepository;
 import org.apache.camel.spi.ShutdownPrepared;
@@ -79,7 +80,7 @@ import org.slf4j.LoggerFactory;
  * and older prices are discarded). Another idea is to combine line item messages
  * together into a single invoice message.
  */
-public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>,
Traceable, ShutdownPrepared {
+public class AggregateProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>,
Traceable, ShutdownPrepared, HasId {
 
     public static final String AGGREGATE_TIMEOUT_CHECKER = "AggregateTimeoutChecker";
 
@@ -87,9 +88,11 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     private final Lock lock = new ReentrantLock();
     private final CamelContext camelContext;
+    private final String id;
     private final Processor processor;
     private AggregationStrategy aggregationStrategy;
     private Expression correlationExpression;
+    private AggregateController aggregateController;
     private final ExecutorService executorService;
     private final boolean shutdownExecutorService;
     private OptimisticLockRetryPolicy optimisticLockRetryPolicy = new OptimisticLockRetryPolicy();
@@ -131,15 +134,17 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
 
     private ProducerTemplate deadLetterProducerTemplate;
 
-    public AggregateProcessor(CamelContext camelContext, Processor processor,
+    public AggregateProcessor(CamelContext camelContext, String id, Processor processor,
                               Expression correlationExpression, AggregationStrategy aggregationStrategy,
                               ExecutorService executorService, boolean shutdownExecutorService)
{
         ObjectHelper.notNull(camelContext, "camelContext");
+        ObjectHelper.notNull(id, "id");
         ObjectHelper.notNull(processor, "processor");
         ObjectHelper.notNull(correlationExpression, "correlationExpression");
         ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy");
         ObjectHelper.notNull(executorService, "executorService");
         this.camelContext = camelContext;
+        this.id = id;
         this.processor = processor;
         this.correlationExpression = correlationExpression;
         this.aggregationStrategy = aggregationStrategy;
@@ -148,6 +153,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         this.exceptionHandler = new LoggingExceptionHandler(camelContext, getClass());
     }
 
+    public String getId() {
+        return id;
+    }
+
     @Override
     public String toString() {
         return "AggregateProcessor[to: " + processor + "]";
@@ -773,6 +782,14 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         this.correlationExpression = correlationExpression;
     }
 
+    public AggregateController getAggregateController() {
+        return aggregateController;
+    }
+
+    public void setAggregateController(AggregateController aggregateController) {
+        this.aggregateController = aggregateController;
+    }
+
     /**
      * On completion task which keeps the booking of the in progress up to date
      */
@@ -1112,6 +1129,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
             restoreTimeoutMapFromAggregationRepository();
             ServiceHelper.startService(timeoutMap);
         }
+
+        if (aggregateController != null) {
+            aggregateController.onStart(id, this);
+        }
     }
 
     @Override
@@ -1120,6 +1141,10 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         // as this is handled in the prepareShutdown method which is also invoked when stopping
a route
         // and is better suited for preparing to shutdown than this doStop method is
 
+        if (aggregateController != null) {
+            aggregateController.onStop(id, this);
+        }
+
         if (recoverService != null) {
             camelContext.getExecutorServiceManager().shutdown(recoverService);
         }
@@ -1184,6 +1209,34 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor
         super.doShutdown();
     }
 
+    public int forceCompletionOfGroup(String key) {
+        // must acquire the shared aggregation lock to be able to trigger force completion
+        int total = 0;
+
+        if (!optimisticLocking) { lock.lock(); }
+        try {
+            Exchange exchange = aggregationRepository.get(camelContext, key);
+            if (exchange != null) {
+                total = 1;
+                LOG.trace("Force completion triggered for correlation key: {}", key);
+                // indicate it was completed by a force completion request
+                exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "forceCompletion");
+                Exchange answer = onCompletion(key, exchange, exchange, false);
+                if (answer != null) {
+                    onSubmitCompletion(key, answer);
+                }
+            }
+        } finally {
+            if (!optimisticLocking) { lock.unlock(); }
+        }
+        LOG.trace("Completed force completion of group {}", key);
+
+        if (total > 0) {
+            LOG.debug("Forcing completion of group {} with {} exchanges", key, total);
+        }
+        return total;
+    }
+
     public int forceCompletionOfAllGroups() {
 
         // only run if CamelContext has been fully started or is stopping

http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
new file mode 100644
index 0000000..7bb3448
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregateController.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.api.management.ManagedOperation;
+import org.apache.camel.api.management.ManagedResource;
+import org.apache.camel.support.ServiceSupport;
+
+/**
+ * A default {@link org.apache.camel.processor.aggregate.AggregateController} that offers
Java and JMX API.
+ */
+@ManagedResource(description = "Aggregation controller")
+public class DefaultAggregateController extends ServiceSupport implements AggregateController
{
+
+    private AggregateProcessor processor;
+    private String id;
+
+    public void onStart(String id, AggregateProcessor processor) {
+        this.id = id;
+        this.processor = processor;
+    }
+
+    public void onStop(String id, AggregateProcessor processor) {
+        this.id = id;
+        this.processor = null;
+    }
+
+    @ManagedOperation(description = "To force completion a group on the aggregator")
+    public int forceCompletionOfGroup(String key) {
+        if (processor != null) {
+            return processor.forceCompletionOfGroup(key);
+        } else {
+            return 0;
+        }
+    }
+
+    @ManagedOperation(description = "To force completion all groups on the aggregator")
+    public int forceCompletionOfAllGroups() {
+        if (processor != null) {
+            return processor.forceCompletionOfAllGroups();
+        } else {
+            return 0;
+        }
+    }
+
+    protected void doStart() throws Exception {
+        // noop
+    }
+
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+    public String toString() {
+        return "DefaultAggregateController[" + id + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/53500cf7/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
new file mode 100644
index 0000000..e00fdfd
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateControllerTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.AggregateController;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.DefaultAggregateController;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class AggregateControllerTest extends ContextTestSupport {
+
+    private AggregateController controller = new DefaultAggregateController();
+
+    @Test
+    public void testForceCompletionOfAll() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(2);
+        getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3",
"test2test4");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
"forceCompletion");
+
+        int groups = controller.forceCompletionOfAllGroups();
+        assertEquals(2, groups);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testForceCompletionOfGroup() throws Exception {
+        getMockEndpoint("mock:aggregated").expectedMessageCount(0);
+
+        template.sendBodyAndHeader("direct:start", "test1", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test2", "id", "2");
+        template.sendBodyAndHeader("direct:start", "test3", "id", "1");
+        template.sendBodyAndHeader("direct:start", "test4", "id", "2");
+
+        assertMockEndpointsSatisfied();
+
+        getMockEndpoint("mock:aggregated").expectedMessageCount(1);
+        getMockEndpoint("mock:aggregated").expectedBodiesReceivedInAnyOrder("test1test3");
+        getMockEndpoint("mock:aggregated").expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY,
"forceCompletion");
+
+        int groups = controller.forceCompletionOfGroup("1");
+        assertEquals(1, groups);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregate(header("id"), new MyAggregationStrategy()).aggregateController(controller)
+                        .completionSize(10)
+                    .to("mock:aggregated");
+            }
+        };
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
\ No newline at end of file


Mime
View raw message