Repository: camel
Updated Branches:
refs/heads/master 1950e7785 -> f3c4fc2a2
CAMEL-9252 add GroupedMessageAggregationStrategy
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37ef29c0
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37ef29c0
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37ef29c0
Branch: refs/heads/master
Commit: 37ef29c05b41743ab2bb04ac1f14404454332424
Parents: 1950e77
Author: khaing211 <khaing211@gmail.com>
Authored: Sat Oct 24 10:39:10 2015 -0700
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Dec 18 17:56:53 2015 +0100
----------------------------------------------------------------------
.../GroupedMessageAggregationStrategy.java | 36 ++++++++++++++
.../aggregator/AggregateGroupMessageTest.java | 51 ++++++++++++++++++++
2 files changed, 87 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/37ef29c0/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
new file mode 100644
index 0000000..b04dbf0
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/GroupedMessageAggregationStrategy.java
@@ -0,0 +1,36 @@
+package org.apache.camel.processor.aggregate;
+
+import java.util.List;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.impl.DefaultExchange;
+
+/**
+ * Aggregate all {@link Message} into a single combined Exchange holding all the
+ * aggregated messages in a {@link List} of {@link Message} as the message body.
+ *
+ * This aggregation strategy can used in combination with @{link
+ * org.apache.camel.processor.Splitter} to batch messages
+ *
+ * @version
+ */
+public class GroupedMessageAggregationStrategy extends AbstractListAggregationStrategy<Message>
{
+
+ @Override
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ // for the first time we must create a new empty exchange as the
+ // holder, as the outgoing exchange
+ // must not be one of the grouped exchanges, as that causes a
+ // endless circular reference
+ oldExchange = new DefaultExchange(newExchange);
+ }
+ return super.aggregate(oldExchange, newExchange);
+ }
+
+ @Override
+ public Message getValue(Exchange exchange) {
+ return exchange.getIn();
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/37ef29c0/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
new file mode 100644
index 0000000..4d7b41c
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupMessageTest.java
@@ -0,0 +1,51 @@
+package org.apache.camel.processor.aggregator;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy;
+
+public class AggregateGroupMessageTest extends ContextTestSupport {
+
+ @SuppressWarnings("unchecked")
+ public void testGrouped() throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+
+ result.expectedMessageCount(1);
+
+ template.sendBody("direct:start", "100");
+ template.sendBody("direct:start", "150");
+ template.sendBody("direct:start", "130");
+ template.sendBody("direct:start", "200");
+ template.sendBody("direct:start", "190");
+
+ assertMockEndpointsSatisfied();
+
+ Exchange out = result.getExchanges().get(0);
+ List<Message> grouped = out.getIn().getBody(List.class);
+
+ assertEquals(5, grouped.size());
+
+ assertEquals("100", grouped.get(0).getBody(String.class));
+ assertEquals("150", grouped.get(1).getBody(String.class));
+ assertEquals("130", grouped.get(2).getBody(String.class));
+ assertEquals("200", grouped.get(3).getBody(String.class));
+ assertEquals("190", grouped.get(4).getBody(String.class));
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from("direct:start")
+ .aggregate(constant(true), new GroupedMessageAggregationStrategy())
+ .completionTimeout(500L)
+ .to("mock:result");
+ }
+ };
+ }
+}
|