camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r634749 - in /activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model: AggregatorType.java ProcessorType.java
Date Fri, 07 Mar 2008 17:13:38 GMT
Author: jstrachan
Date: Fri Mar  7 09:13:38 2008
New Revision: 634749

URL: http://svn.apache.org/viewvc?rev=634749&view=rev
Log:
made it easier to use a custom aggregation collection as discussed in this thread :  http://www.nabble.com/AggregationCollection-implementation-tp15891263s22882p15891263.html

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=634749&r1=634748&r2=634749&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
Fri Mar  7 09:13:38 2008
@@ -35,6 +35,7 @@
 import org.apache.camel.impl.RouteContext;
 import org.apache.camel.model.language.ExpressionType;
 import org.apache.camel.processor.Aggregator;
+import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
@@ -46,13 +47,15 @@
 public class AggregatorType extends ExpressionNode {
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
+    @XmlTransient
+    private AggregationCollection aggregationCollection;
     @XmlAttribute(required = false)
     private Integer batchSize;
     @XmlAttribute(required = false)
     private Long batchTimeout;
     @XmlAttribute(required = false)
     private String strategyRef;
-    @XmlElement(name="completedPredicate", required = false)
+    @XmlElement(name = "completedPredicate", required = false)
     private CompletedPredicate completedPredicate;
 
     public AggregatorType() {
@@ -77,30 +80,35 @@
     }
 
     @SuppressWarnings("unchecked")
-	@Override
+    @Override
     public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws
Exception {
         Endpoint from = routeContext.getEndpoint();
         final Processor processor = routeContext.createProcessor(this);
 
-        AggregationStrategy strategy = getAggregationStrategy();
-        if (strategy == null && strategyRef != null) {
-            strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
-        }
-        if (strategy == null) {
-            strategy = new UseLatestAggregationStrategy();
-        }
-        Expression aggregateExpression = getExpression().createExpression(routeContext);
-
-        Predicate predicate = null;
-        if (completedPredicate != null) {
-            predicate = completedPredicate.createPredicate(routeContext);
-        }
         final Aggregator service;
-        if (predicate != null) {
-            service = new Aggregator(from, processor, aggregateExpression, strategy, predicate);
+        if (aggregationCollection != null) {
+            service = new Aggregator(from, processor, aggregationCollection);
         }
         else {
-            service = new Aggregator(from, processor, aggregateExpression, strategy);
+            AggregationStrategy strategy = getAggregationStrategy();
+            if (strategy == null && strategyRef != null) {
+                strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
+            }
+            if (strategy == null) {
+                strategy = new UseLatestAggregationStrategy();
+            }
+            Expression aggregateExpression = getExpression().createExpression(routeContext);
+
+            Predicate predicate = null;
+            if (completedPredicate != null) {
+                predicate = completedPredicate.createPredicate(routeContext);
+            }
+            if (predicate != null) {
+                service = new Aggregator(from, processor, aggregateExpression, strategy,
predicate);
+            }
+            else {
+                service = new Aggregator(from, processor, aggregateExpression, strategy);
+            }
         }
 
         if (batchSize != null) {
@@ -118,6 +126,14 @@
         };
 
         routes.add(route);
+    }
+
+    public AggregationCollection getAggregationCollection() {
+        return aggregationCollection;
+    }
+
+    public void setAggregationCollection(AggregationCollection aggregationCollection) {
+        this.aggregationCollection = aggregationCollection;
     }
 
     public AggregationStrategy getAggregationStrategy() {

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=634749&r1=634748&r2=634749&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
Fri Mar  7 09:13:38 2008
@@ -50,6 +50,7 @@
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 import org.apache.camel.processor.idempotent.MessageIdRepository;
 import org.apache.camel.spi.DataFormat;
@@ -368,7 +369,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
@@ -385,7 +386,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
@@ -401,7 +402,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
@@ -419,7 +420,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/splitter.html">Splitter</a>
      * pattern where an expression is evaluated to iterate through each of the
      * parts of a message and then each part is then send to some endpoint.
@@ -436,7 +437,7 @@
     }
     
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
      * pattern where a list of expressions are evaluated to be able to compare
      * the message exchanges to reorder them. e.g. you may wish to sort by some
@@ -453,7 +454,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
      * pattern where an expression is evaluated to be able to compare the
      * message exchanges to reorder them. e.g. you may wish to sort by some
@@ -467,7 +468,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
      * pattern where a list of expressions are evaluated to be able to compare
      * the message exchanges to reorder them. e.g. you may wish to sort by some
@@ -483,7 +484,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
      * pattern where a list of expressions are evaluated to be able to compare
      * the message exchanges to reorder them. e.g. you may wish to sort by some
@@ -501,7 +502,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates an <a
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
      * pattern where a batch of messages are processed (up to a maximum amount
      * or until some timeout is reached) and messages for the same correlation
@@ -526,7 +527,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates an <a
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
      * pattern where a batch of messages are processed (up to a maximum amount
      * or until some timeout is reached) and messages for the same correlation
@@ -539,10 +540,7 @@
      * discarded). Another idea is to combine line item messages together into a
      * single invoice message.
      *
-     * @param correlationExpression the expression used to calculate the
-     *                              correlation key. For a JMS message this could be the
-     *                              expression <code>header("JMSDestination")</code>
or
-     *                              <code>header("JMSCorrelationID")</code>
+     * @param aggregationStrategy the strategy used for the aggregation
      */
     public ExpressionClause<AggregatorType> aggregator(AggregationStrategy aggregationStrategy)
{
         AggregatorType answer = new AggregatorType();
@@ -552,7 +550,21 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates an <a
+     * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
+     * pattern using a custom aggregation collection implementation.
+     *
+     * @param aggregationCollection the collection used to perform the aggregation
+     */
+    public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection)
{
+        AggregatorType answer = new AggregatorType();
+        answer.setAggregationCollection(aggregationCollection);
+        addOutput(answer);
+        return ExpressionClause.createAndSetExpression(answer);
+    }
+
+    /**
+     * Creates the <a
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
      * pattern where a batch of messages are processed (up to a maximum amount
      * or until some timeout is reached) and messages for the same correlation
@@ -577,7 +589,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
      * pattern where a batch of messages are processed (up to a maximum amount
      * or until some timeout is reached) and messages for the same correlation
@@ -602,7 +614,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
      * where an expression is used to calculate the time which the message will
      * be dispatched on
@@ -616,7 +628,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
      * where an expression is used to calculate the time which the message will
      * be dispatched on
@@ -635,7 +647,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
      * where an expression is used to calculate the time which the message will
      * be dispatched on
@@ -648,7 +660,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
      * where a fixed amount of milliseconds are used to delay processing of a
      * message exchange
@@ -661,7 +673,7 @@
     }
 
     /**
-     * A builder for the <a
+     * Creates the <a
      * href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
      * where an expression is used to calculate the time which the message will
      * be dispatched on



Mime
View raw message