camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r910860 [1/2] - in /camel/trunk: camel-core/src/main/java/org/apache/camel/impl/ camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/main/java/org/apache/camel/processor/aggre...
Date Wed, 17 Feb 2010 08:27:27 GMT
Author: davsclaus
Date: Wed Feb 17 08:27:25 2010
New Revision: 910860

URL: http://svn.apache.org/viewvc?rev=910860&view=rev
Log:
CAMEL-1686: Overhaul of Aggregator. Work in progress.

Removed:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAggregatorRejectedPutBackTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/MyReverseAggregationCollection.java
    camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedCollection.java
    camel/trunk/components/camel-rss/src/test/java/org/apache/camel/component/rss/RssCustomAggregatorTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringAggregatorWithCustomCollectionTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-collection.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/multicastAggregator.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/view/DotViewTest.java
    camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/SpringAggregateFromWireTapTest.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator-custom-strategy.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator.xml
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/scattergather/scatter-gather.xml
    camel/trunk/components/camel-web/src/main/java/org/apache/camel/web/util/AggregateDefinitionRenderer.java
    camel/trunk/components/camel-web/src/test/java/org/apache/camel/web/groovy/AggregateDSLTest.java
    camel/trunk/examples/camel-example-cafe/src/main/java/org/apache/camel/example/cafe/CafeRouteBuilder.java
    camel/trunk/examples/camel-example-cafe/src/main/resources/META-INF/spring/camel-context.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ProducerCache.java Wed Feb 17 08:27:25 2010
@@ -150,10 +150,6 @@
         }
 
         try {
-            // now lets dispatch
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(">>>> " + endpoint + " " + exchange);
-            }
             // invoke the callback
             return callback.doInProducer(producer, exchange, pattern);
         } finally {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Wed Feb 17 08:27:25 2010
@@ -18,7 +18,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlAttribute;
@@ -32,11 +31,10 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.model.language.ExpressionDefinition;
-import org.apache.camel.processor.Aggregator;
-import org.apache.camel.processor.aggregate.AggregationCollection;
+import org.apache.camel.processor.UnitOfWorkProcessor;
+import org.apache.camel.processor.aggregate.AggregateProcessor;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 import org.apache.camel.spi.RouteContext;
 
 /**
@@ -47,32 +45,26 @@
 @XmlRootElement(name = "aggregate")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> {
-    @XmlElement(name = "correlationExpression", required = false)
+    @XmlElement(name = "correlationExpression", required = true)
     private ExpressionSubElementDefinition correlationExpression;
+    @XmlElement(name = "completionPredicate", required = false)
+    private ExpressionSubElementDefinition completionPredicate;
     @XmlTransient
     private ExpressionDefinition expression;
     @XmlElementRef
     private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
     @XmlTransient
     private AggregationStrategy aggregationStrategy;
-    @XmlTransient
-    private AggregationCollection aggregationCollection;
-    @XmlAttribute(required = false)
-    private Integer batchSize;
-    @XmlAttribute(required = false)
-    private Integer outBatchSize;
+    @XmlAttribute(required = true)
+    private String strategyRef;
     @XmlAttribute(required = false)
-    private Long batchTimeout;
+    private Integer completionSize;
     @XmlAttribute(required = false)
-    private String strategyRef;
+    private Long completionTimeout;
     @XmlAttribute(required = false)
-    private String collectionRef;    
+    private Boolean completionFromBatchConsumer;
     @XmlAttribute(required = false)
     private Boolean groupExchanges;
-    @XmlAttribute(required = false)
-    private Boolean batchSizeFromConsumer;
-    @XmlElement(name = "completionPredicate", required = false)
-    private ExpressionSubElementDefinition completionPredicate;
 
     public AggregateDefinition() {
     }
@@ -125,65 +117,32 @@
         return clause;
     }
     
-    protected Aggregator createAggregator(RouteContext routeContext) throws Exception {
-        final Processor processor = routeContext.createProcessor(this);
+    protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
+        Processor processor = routeContext.createProcessor(this);
+        // wrap the aggregated route in a unit of work processor
+        processor = new UnitOfWorkProcessor(routeContext, processor);
+
+        Expression correlation = getExpression().createExpression(routeContext);
+        AggregationStrategy strategy = createAggregationStrategy(routeContext);
+
+        AggregateProcessor answer = new AggregateProcessor(processor, correlation, strategy);
+
+        if (getCompletionPredicate() != null) {
+            Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
+            answer.setCompletionPredicate(predicate);
+        }
 
-        final Aggregator aggregator;
-        if (getAggregationCollection() == null) {
-            setAggregationCollection(createAggregationCollection(routeContext));
-        }
-        
-        if (aggregationCollection != null) {
-            // create the aggregator using the collection
-            // pre configure the collection if its expression and strategy is not set, then
-            // use the ones that is pre configured with this type
-            if (aggregationCollection.getCorrelationExpression() == null) {
-                aggregationCollection.setCorrelationExpression(getExpression());
-            }
-            if (aggregationCollection.getAggregationStrategy() == null) {
-                AggregationStrategy strategy = createAggregationStrategy(routeContext);
-                aggregationCollection.setAggregationStrategy(strategy);
-            }
-            aggregator = new Aggregator(processor, aggregationCollection);
-        } else {
-            // create the aggregator using a default collection
-            AggregationStrategy strategy = createAggregationStrategy(routeContext);
-
-            if (getExpression() == null) {
-                throw new IllegalArgumentException("You need to specify an expression or "
-                                                   + "aggregation collection for this aggregator: " + this);
-            }
-            
-            Expression aggregateExpression = getExpression().createExpression(routeContext);           
-
-            Predicate predicate = null;
-            if (getCompletionPredicate() != null) {
-                predicate = getCompletionPredicate().createPredicate(routeContext);
-            }
-            if (predicate != null) {
-                aggregator = new Aggregator(processor, aggregateExpression, strategy, predicate);
-            } else {
-                aggregator = new Aggregator(processor, aggregateExpression, strategy);
-            }
-        }
-        
-        if (batchSize != null) {
-            aggregator.setBatchSize(batchSize);
-        }
-        if (batchTimeout != null) {
-            aggregator.setBatchTimeout(batchTimeout);
-        }
-        if (outBatchSize != null) {
-            aggregator.setOutBatchSize(outBatchSize);
+        if (getCompletionSize() != null) {
+            answer.setCompletionSize(getCompletionSize());
         }
-        if (groupExchanges != null) {
-            aggregator.setGroupExchanges(groupExchanges);
+        if (getCompletionTimeout() != null) {
+            answer.setCompletionTimeout(getCompletionTimeout());
         }
-        if (batchSizeFromConsumer != null) {
-            aggregator.setBatchConsumer(batchSizeFromConsumer);
+        if (isCompletionFromBatchConsumer() != null) {
+            answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
         }
 
-        return aggregator;
+        return answer;
     }
 
     private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
@@ -191,35 +150,16 @@
         if (strategy == null && strategyRef != null) {
             strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
         }
-        // pick a default strategy
+        if (strategy == null && groupExchanges != null && groupExchanges) {
+            // if grouped exchange is enabled then use special strategy for that
+            strategy = new GroupedExchangeAggregationStrategy();
+        }
         if (strategy == null) {
-            if (groupExchanges != null && groupExchanges) {
-                // if grouped exchange is enabled then use special strategy for that
-                strategy = new GroupedExchangeAggregationStrategy();
-            } else {
-                // fallback to use latest
-                strategy = new UseLatestAggregationStrategy();
-            }
+            throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
         }
         return strategy;
     }
 
-    private AggregationCollection createAggregationCollection(RouteContext routeContext) {
-        AggregationCollection collection = getAggregationCollection();
-        if (collection == null && collectionRef != null) {
-            collection = routeContext.lookup(collectionRef, AggregationCollection.class);
-        }
-        return collection;
-    }    
-    
-    public AggregationCollection getAggregationCollection() {
-        return aggregationCollection;
-    }
-
-    public void setAggregationCollection(AggregationCollection aggregationCollection) {
-        this.aggregationCollection = aggregationCollection;
-    }
-
     public AggregationStrategy getAggregationStrategy() {
         return aggregationStrategy;
     }
@@ -228,55 +168,39 @@
         this.aggregationStrategy = aggregationStrategy;
     }
 
-    public Integer getBatchSize() {
-        return batchSize;
-    }
-
-    public void setBatchSize(Integer batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public Integer getOutBatchSize() {
-        return outBatchSize;
-    }
-
-    public void setOutBatchSize(Integer outBatchSize) {
-        this.outBatchSize = outBatchSize;
+    public String getAggregationStrategyRef() {
+        return strategyRef;
     }
 
-    public Long getBatchTimeout() {
-        return batchTimeout;
+    public void setAggregationStrategyRef(String aggregationStrategyRef) {
+        this.strategyRef = aggregationStrategyRef;
     }
 
-    public void setBatchTimeout(Long batchTimeout) {
-        this.batchTimeout = batchTimeout;
+    public Integer getCompletionSize() {
+        return completionSize;
     }
 
-    public String getStrategyRef() {
-        return strategyRef;
+    public void setCompletionSize(Integer completionSize) {
+        this.completionSize = completionSize;
     }
 
-    public void setStrategyRef(String strategyRef) {
-        this.strategyRef = strategyRef;
+    public Long getCompletionTimeout() {
+        return completionTimeout;
     }
 
-    public String getCollectionRef() {
-        return collectionRef;
+    public void setCompletionTimeout(Long completionTimeout) {
+        this.completionTimeout = completionTimeout;
     }
 
-    public void setCollectionRef(String collectionRef) {
-        this.collectionRef = collectionRef;
+    public ExpressionSubElementDefinition getCompletionPredicate() {
+        return completionPredicate;
     }
 
     public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
         this.completionPredicate = completionPredicate;
     }
 
-    public ExpressionSubElementDefinition getCompletionPredicate() {
-        return completionPredicate;
-    }
-
-    public Boolean getGroupExchanges() {
+    public Boolean isGroupExchanges() {
         return groupExchanges;
     }
 
@@ -284,12 +208,12 @@
         this.groupExchanges = groupExchanges;
     }
 
-    public Boolean getBatchSizeFromConsumer() {
-        return batchSizeFromConsumer;
+    public Boolean isCompletionFromBatchConsumer() {
+        return completionFromBatchConsumer;
     }
 
-    public void setBatchSizeFromConsumer(Boolean batchSizeFromConsumer) {
-        this.batchSizeFromConsumer = batchSizeFromConsumer;
+    public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
+        this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
     // Fluent API
@@ -298,56 +222,36 @@
     /**
      * Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
      * and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
-     * as total by setting the exchange property {@link org.apache.camel.Exchange#BATCH_SIZE}.
-     *
-     * @return builder
-     */
-    public AggregateDefinition batchSizeFromConsumer() {
-        setBatchSizeFromConsumer(true);
-        return this;
-    }
-
-    /**
-     * Sets the in batch size for number of exchanges received
+     * as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
      *
-     * @param batchSize  the batch size
      * @return builder
      */
-    public AggregateDefinition batchSize(int batchSize) {
-        setBatchSize(batchSize);
+    public AggregateDefinition completionFromBatchConsumer() {
+        setCompletionFromBatchConsumer(true);
         return this;
     }
 
     /**
-     * Sets the out batch size for number of exchanges sent
+     * Sets the completion size, which is the number of aggregated exchanges which would
+     * cause the aggregate to consider the group as complete and send out the aggregated exchange.
      *
-     * @param batchSize  the batch size
+     * @param completionSize  the completion size
      * @return builder
      */
-    public AggregateDefinition outBatchSize(int batchSize) {
-        setOutBatchSize(batchSize);
+    public AggregateDefinition completionSize(int completionSize) {
+        setCompletionSize(completionSize);
         return this;
     }
 
     /**
-     * Sets the batch timeout
+     * Sets the completion timeout, which would cause the aggregate to consider the group as complete
+     * and send out the aggregated exchange.
      *
-     * @param batchTimeout  the timeout in millis
+     * @param completionTimeout  the timeout in millis
      * @return the builder
      */
-    public AggregateDefinition batchTimeout(long batchTimeout) {
-        setBatchTimeout(batchTimeout);
-        return this;
-    }
-
-    /**
-     * Sets the aggregate collection to use
-     *
-     * @param aggregationCollection  the aggregate collection to use
-     * @return the builder
-     */
-    public AggregateDefinition aggregationCollection(AggregationCollection aggregationCollection) {
-        setAggregationCollection(aggregationCollection);
+    public AggregateDefinition completionTimeout(long completionTimeout) {
+        setCompletionTimeout(completionTimeout);
         return this;
     }
 
@@ -363,24 +267,13 @@
     }
 
     /**
-     * Sets the aggregate collection to use
-     *
-     * @param collectionRef  reference to the aggregate collection to lookup in the registry
-     * @return the builder
-     */
-    public AggregateDefinition collectionRef(String collectionRef) {
-        setCollectionRef(collectionRef);
-        return this;
-    }
-
-    /**
      * Sets the aggregate strategy to use
      *
-     * @param strategyRef  reference to the strategy to lookup in the registry
+     * @param aggregationStrategyRef  reference to the strategy to lookup in the registry
      * @return the builder
      */
-    public AggregateDefinition strategyRef(String strategyRef) {
-        setStrategyRef(strategyRef);
+    public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
+        setAggregationStrategyRef(aggregationStrategyRef);
         return this;
     }
 
@@ -412,6 +305,7 @@
      * Sets the predicate used to determine if the aggregation is completed
      *
      * @param predicate  the predicate
+     * @return the builder
      */
     public AggregateDefinition completionPredicate(Predicate predicate) {
         checkNoCompletedPredicate();
@@ -436,7 +330,7 @@
     // Section - Methods from ExpressionNode
     // Needed to copy methods from ExpressionNode here so that I could specify the
     // correlation expression as optional in JAXB
-    
+
     public ExpressionDefinition getExpression() {
         if (expression == null && correlationExpression != null) {
             expression = correlationExpression.getExpressionType();            

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Wed Feb 17 08:27:25 2010
@@ -44,14 +44,12 @@
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.builder.ExpressionClause;
 import org.apache.camel.builder.ProcessorBuilder;
-import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.model.language.ConstantExpression;
 import org.apache.camel.model.language.ExpressionDefinition;
 import org.apache.camel.model.language.LanguageExpression;
 import org.apache.camel.processor.DefaultChannel;
 import org.apache.camel.processor.InterceptEndpointProcessor;
 import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.interceptor.Delayer;
 import org.apache.camel.processor.interceptor.HandleFault;
@@ -1379,20 +1377,6 @@
      * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
      * Creates an aggregator allowing you to combine a number of messages together into a single message.
      *
-     * @param aggregationCollection the collection used to perform the aggregation
-     * @return the builder
-     */
-    public AggregateDefinition aggregate(AggregationCollection aggregationCollection) {
-        AggregateDefinition answer = new AggregateDefinition();
-        answer.setAggregationCollection(aggregationCollection);
-        addOutput(answer);
-        return answer;
-    }
-
-    /**
-     * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
-     * Creates an aggregator allowing you to combine a number of messages together into a single message.
-     *
      * @param correlationExpression the expression used to calculate the
      *                              correlation key. For a JMS message this could be the
      *                              expression <code>header("JMSDestination")</code> or

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Wed Feb 17 08:27:25 2010
@@ -46,11 +46,11 @@
  * A base class for any kind of {@link Processor} which implements some kind of batch processing.
  * 
  * @version $Revision$
+ * @deprecated will be removed in Camel 2.4
  */
+@Deprecated
 public class BatchProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
 
-    // TODO: Should aggregate on the fly as well
-
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Wed Feb 17 08:27:25 2010
@@ -35,6 +35,8 @@
  */
 public class Resequencer extends BatchProcessor implements Traceable {
 
+    // TODO: Rework to avoid using BatchProcessor
+
     public Resequencer(Processor processor, Expression expression) {
         this(processor, createSet(expression));
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Wed Feb 17 08:27:25 2010
@@ -31,6 +31,7 @@
 import org.apache.camel.Processor;
 import org.apache.camel.impl.LoggingExceptionHandler;
 import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.processor.Traceable;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.DefaultTimeoutMap;
 import org.apache.camel.util.ExchangeHelper;
@@ -43,11 +44,25 @@
 import org.apache.commons.logging.LogFactory;
 
 /**
- * <a href="http://camel.apache.org/aggregator.html">Aggregator</a> EIP pattern.
+ * An implementation of the <a
+ * href="http://camel.apache.org/aggregator2.html">Aggregator</a>
+ * pattern where a batch of messages are processed (up to a maximum amount or
+ * until some timeout is reached) and messages for the same correlation key are
+ * combined together using some kind of {@link AggregationStrategy}
+ * (by default the latest message is used) to compress many message exchanges
+ * into a smaller number of exchanges.
+ * <p/>
+ * A good example of this is stock market data; you may be receiving 30,000
+ * messages/second and you may want to throttle it right down so that multiple
+ * messages for the same stock are combined (or just the latest message is used
+ * and older prices are discarded). Another idea is to combine line item messages
+ * together into a single invoice message.
  *
  * @version $Revision$
  */
-public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor> {
+public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable {
+
+    // TODO: Add support for parallelProcessing, setting custom ExecutorService like multicast
 
     private static final Log LOG = LogFactory.getLog(AggregateProcessor.class);
 
@@ -63,14 +78,15 @@
     // options
     private boolean ignoreBadCorrelationKeys;
     private boolean closeCorrelationKeyOnCompletion;
-    private boolean useBatchSizeFromConsumer;
     private int concurrentConsumers = 1;
 
     // different ways to have completion triggered
     private boolean eagerCheckCompletion;
     private Predicate completionPredicate;
     private long completionTimeout;
-    private int completionAggregatedSize;
+    private int completionSize;
+    private boolean completionFromBatchConsumer;
+    private int batchConsumerCounter;
 
     public AggregateProcessor(Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy) {
         ObjectHelper.notNull(processor, "processor");
@@ -86,6 +102,10 @@
         return "AggregateProcessor[to: " + processor + "]";
     }
 
+    public String getTraceLabel() {
+        return "aggregate[" + correlationExpression + "]";
+    }
+
     public List<Processor> next() {
         if (!hasNext()) {
             return null;
@@ -121,63 +141,68 @@
             }
         }
 
-        // if batch consumer is enabled then we need to adjust the batch size
-        // with the size from the batch consumer
-        if (isUseBatchSizeFromConsumer()) {
-            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
-            if (size > 0 && size != completionAggregatedSize) {
-                completionAggregatedSize = size;
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Using batch consumer completion, so setting completionAggregatedSize to: " + completionAggregatedSize);
-                }
-            }
+        doAggregation(key, exchange);
+    }
+
+    private synchronized Exchange doAggregation(Object key, Exchange exchange) {
+        // TODO: lock this based on keys so we can run in parallel groups
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("+++ start +++ onAggregation for key " + key);
         }
 
+        Exchange answer;
         Exchange oldExchange = aggregationRepository.get(key);
         Exchange newExchange = exchange;
 
         Integer size = 1;
         if (oldExchange != null) {
-            size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, Integer.class);
-            ObjectHelper.notNull(size, Exchange.AGGREGATED_SIZE + " on " + oldExchange);
+            size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class);
             size++;
         }
 
         // check if we are complete
         boolean complete = false;
         if (isEagerCheckCompletion()) {
-            complete = isCompleted(key, exchange, size);
+            // put the current aggregated size on the exchange so its avail during completion check
+            newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+            complete = isCompleted(key, newExchange);
+            // remove it afterwards
+            newExchange.removeProperty(Exchange.AGGREGATED_SIZE);
         }
 
         // prepare the exchanges for aggregation and aggregate it
         ExchangeHelper.prepareAggregation(oldExchange, newExchange);
-        newExchange = onAggregation(oldExchange, newExchange);
-        newExchange.setProperty(Exchange.AGGREGATED_SIZE, size);
+        answer = onAggregation(oldExchange, exchange);
+        answer.setProperty(Exchange.AGGREGATED_SIZE, size);
 
         // maybe we should check completion after the aggregation
         if (!isEagerCheckCompletion()) {
-            // use the new aggregated exchange when testing
-            complete = isCompleted(key, newExchange, size);
+            // put the current aggregated size on the exchange so its avail during completion check
+            answer.setProperty(Exchange.AGGREGATED_SIZE, size);
+            complete = isCompleted(key, answer);
         }
 
         // only need to update aggregation repository if we are not complete
-        if (!complete && !newExchange.equals(oldExchange)) {
+        if (!complete) {
             if (LOG.isTraceEnabled()) {
-                LOG.trace("Put exchange:" + newExchange + " with correlation key:"  + key);
+                LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key);
             }
-            aggregationRepository.add(key, newExchange);
+            aggregationRepository.add(key, answer);
         }
 
         if (complete) {
-            onCompletion(key, newExchange);
+            onCompletion(key, answer, false);
         }
-    }
 
-    protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
-        return aggregationStrategy.aggregate(oldExchange, newExchange);
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("+++ end +++ onAggregation for key " + key + " with size " + size);
+        }
+
+        return answer;
     }
 
-    protected boolean isCompleted(Object key, Exchange exchange, int size) {
+    protected boolean isCompleted(Object key, Exchange exchange) {
         if (getCompletionPredicate() != null) {
             boolean answer = getCompletionPredicate().matches(exchange);
             if (answer) {
@@ -185,27 +210,44 @@
             }
         }
 
-        if (getCompletionAggregatedSize() > 0) {
-            if (size >= getCompletionAggregatedSize()) {
+        if (getCompletionSize() > 0) {
+            int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class);
+            if (size >= getCompletionSize()) {
                 return true;
             }
         }
 
         if (getCompletionTimeout() > 0) {
             // timeout is used so use the timeout map to keep an eye on this
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Updating correlation key " + key + " to timeout after " + getCompletionTimeout() + " ms.");
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Updating correlation key " + key + " to timeout after "
+                        + getCompletionTimeout() + " ms. as exchange received: " + exchange);
             }
             timeoutMap.put(key, exchange, getCompletionTimeout());
         }
 
+        if (isCompletionFromBatchConsumer()) {
+            batchConsumerCounter++;
+            int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class);
+            if (size > 0 && batchConsumerCounter >= size) {
+                // batch consumer is complete
+                batchConsumerCounter = 0;
+                return true;
+            }
+        }
+
         return false;
     }
 
-    protected void onCompletion(Object key, final Exchange exchange) {
-        // remove from repository and timeout map as its completed
+    protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) {
+        return aggregationStrategy.aggregate(oldExchange, newExchange);
+    }
+
+    protected void onCompletion(Object key, final Exchange exchange, boolean fromTimeout) {
+        // remove from repository as its completed
         aggregationRepository.remove(key);
-        if (timeoutMap != null) {
+        if (!fromTimeout && timeoutMap != null) {
+            // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker)
             timeoutMap.remove(key);
         }
 
@@ -270,12 +312,12 @@
         this.completionTimeout = completionTimeout;
     }
 
-    public int getCompletionAggregatedSize() {
-        return completionAggregatedSize;
+    public int getCompletionSize() {
+        return completionSize;
     }
 
-    public void setCompletionAggregatedSize(int completionAggregatedSize) {
-        this.completionAggregatedSize = completionAggregatedSize;
+    public void setCompletionSize(int completionSize) {
+        this.completionSize = completionSize;
     }
 
     public boolean isIgnoreBadCorrelationKeys() {
@@ -294,12 +336,12 @@
         this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
     }
 
-    public boolean isUseBatchSizeFromConsumer() {
-        return useBatchSizeFromConsumer;
+    public boolean isCompletionFromBatchConsumer() {
+        return completionFromBatchConsumer;
     }
 
-    public void setUseBatchSizeFromConsumer(boolean useBatchSizeFromConsumer) {
-        this.useBatchSizeFromConsumer = useBatchSizeFromConsumer;
+    public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) {
+        this.completionFromBatchConsumer = completionFromBatchConsumer;
     }
 
     public int getConcurrentConsumers() {
@@ -334,14 +376,14 @@
             if (log.isDebugEnabled()) {
                 log.debug("Completion timeout triggered for correlation key: " + entry.getKey());
             }
-            onCompletion(entry.getKey(), entry.getValue());
+            onCompletion(entry.getKey(), entry.getValue(), true);
             return true;
         }
     }
 
     @Override
     protected void doStart() throws Exception {
-        if (getCompletionTimeout() <= 0 && getCompletionAggregatedSize() <= 0 && getCompletionPredicate() == null) {
+        if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null) {
             throw new IllegalStateException("At least one of the completions options"
                     + " [completionTimeout, completionAggregatedSize, completionPredicate] must be set");
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Wed Feb 17 08:27:25 2010
@@ -31,6 +31,7 @@
  *
  * @version $Revision$
  */
+@Deprecated
 public interface AggregationCollection extends Collection<Exchange> {
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java Wed Feb 17 08:27:25 2010
@@ -36,6 +36,7 @@
  *
  * @version $Revision$
  */
+@Deprecated
 public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection {
 
     private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Wed Feb 17 08:27:25 2010
@@ -30,6 +30,7 @@
  *
  * @version $Revision$
  */
+@Deprecated
 public class PredicateAggregationCollection extends DefaultAggregationCollection {
     private Predicate aggregationCompletedPredicate;
     private List<Exchange> collection = new ArrayList<Exchange>();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/DefaultTimeoutMap.java Wed Feb 17 08:27:25 2010
@@ -16,14 +16,14 @@
  */
 package org.apache.camel.util;
 
-import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Service;
 import org.apache.commons.logging.Log;
@@ -38,11 +38,11 @@
 
     protected final transient Log log = LogFactory.getLog(getClass());
 
-    private final Map<K, TimeoutMapEntry<K, V>> map = new HashMap<K, TimeoutMapEntry<K, V>>();
-    private final SortedSet<TimeoutMapEntry<K, V>> index = new TreeSet<TimeoutMapEntry<K, V>>();
+    private final ConcurrentMap<K, TimeoutMapEntry<K, V>> map = new ConcurrentHashMap<K, TimeoutMapEntry<K, V>>();
     private final ScheduledExecutorService executor;
     private final long purgePollTime;
     private final long initialDelay = 1000L;
+    private final Lock lock = new ReentrantLock();
 
     public DefaultTimeoutMap() {
         this(null, 1000L);
@@ -56,82 +56,91 @@
 
     public V get(K key) {
         TimeoutMapEntry<K, V> entry;
-        synchronized (map) {
+        lock.lock();
+        try {
             entry = map.get(key);
             if (entry == null) {
                 return null;
             }
-            index.remove(entry);
             updateExpireTime(entry);
-            index.add(entry);
+        } finally {
+            lock.unlock();
         }
         return entry.getValue();
     }
 
     public void put(K key, V value, long timeoutMillis) {
         TimeoutMapEntry<K, V> entry = new TimeoutMapEntry<K, V>(key, value, timeoutMillis);
-        synchronized (map) {
-            TimeoutMapEntry<K, V> oldValue = map.put(key, entry);
-            if (oldValue != null) {
-                index.remove(oldValue);
-            }
+        lock.lock();
+        try {
+            map.put(key, entry);
             updateExpireTime(entry);
-            index.add(entry);
+        } finally {
+            lock.unlock();
         }
     }
 
     public void remove(K id) {
-        synchronized (map) {
-            TimeoutMapEntry entry = map.remove(id);
-            if (entry != null) {
-                index.remove(entry);
-            }
+        lock.lock();
+        try {
+            map.remove(id);
+        } finally {
+            lock.unlock();
         }
     }
 
     public Object[] getKeys() {
-        Object[] keys = null;
-        synchronized (map) {
+        Object[] keys;
+        lock.lock();
+        try {
             Set<K> keySet = map.keySet();
             keys = new Object[keySet.size()];
             keySet.toArray(keys);
+        } finally {
+            lock.unlock();
         }
         return keys;
     }
     
     public int size() {
-        synchronized (map) {
-            return map.size();
-        }
+        return map.size();
     }
 
     /**
      * The timer task which purges old requests and schedules another poll
      */
     public void run() {
-        purge();
+        if (log.isTraceEnabled()) {
+            log.trace("Running purge task to see if any entries has been timed out");
+        }
+        try {
+            purge();
+        } catch (Throwable t) {
+            // must catch and log exception otherwise the executor will now schedule next run
+            log.error("Exception occurred during purge task", t);
+        }
     }
 
     public void purge() {
+        if (log.isTraceEnabled()) {
+            log.debug("There are " + map.size() + " in the timeout map");
+        }
         long now = currentTime();
-        synchronized (map) {
-            for (Iterator<TimeoutMapEntry<K, V>> iter = index.iterator(); iter.hasNext();) {
-                TimeoutMapEntry<K, V> entry = iter.next();
-                if (entry == null) {
-                    break;
-                }
-                if (entry.getExpireTime() < now) {
-                    if (isValidForEviction(entry)) {
+
+        lock.lock();
+        try {
+            for (Map.Entry<K, TimeoutMapEntry<K, V>> entry : map.entrySet()) {
+                if (entry.getValue().getExpireTime() < now) {
+                    if (isValidForEviction(entry.getValue())) {
                         if (log.isDebugEnabled()) {
                             log.debug("Evicting inactive request for correlationID: " + entry);
                         }
-                        map.remove(entry.getKey());
-                        iter.remove();
+                        map.remove(entry.getKey(), entry.getValue());
                     }
-                } else {
-                    break;
                 }
             }
+        } finally {
+            lock.unlock();
         }
     }
 
@@ -182,6 +191,5 @@
             executor.shutdown();
         }
         map.clear();
-        index.clear();
     }
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentAggregateBatchConsumerTest.java Wed Feb 17 08:27:25 2010
@@ -39,7 +39,9 @@
                     .setHeader("id", simple("${file:onlyname.noext}"))
                     .threads(20)
                     .beanRef("business")
-                    .aggregate(header("country"), new MyBusinessTotal()).batchSizeFromConsumer().batchTimeout(60000).to("mock:result");
+                    .aggregate(header("country"), new MyBusinessTotal())
+                        .completionTimeout(2000L)
+                        .to("mock:result");
             }
         });
 
@@ -61,7 +63,9 @@
                 from("file://target/concurrent?delay=60000&initialDelay=2500")
                     .setHeader("id", simple("${file:onlyname.noext}"))
                     .beanRef("business")
-                    .aggregate(header("country"), new MyBusinessTotal()).batchSizeFromConsumer().batchTimeout(60000).to("mock:result");
+                    .aggregate(header("country"), new MyBusinessTotal())
+                        .completionTimeout(2000L)
+                        .to("mock:result");
             }
         });
 
@@ -73,7 +77,7 @@
         assertMockEndpointsSatisfied();
 
         long delta = System.currentTimeMillis() - start;
-        LOG.debug("Time taken sequentiel: " + delta);
+        LOG.debug("Time taken sequential: " + delta);
     }
 
 }
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentTest.java Wed Feb 17 08:27:25 2010
@@ -68,7 +68,10 @@
                     .setHeader("id", simple("${file:onlyname.noext}"))
                     .threads(20)
                     .beanRef("business")
-                    .aggregate(header("country"), new MyBusinessTotal()).batchSize(10).batchTimeout(60000).to("mock:result");
+                    .log("Country is ${in.header.country}")
+                    .aggregate(header("country"), new MyBusinessTotal())
+                        .completionTimeout(2000L)
+                        .to("mock:result");
             }
         });
 
@@ -90,7 +93,9 @@
                 from("file://target/concurrent?delay=60000&initialDelay=2500")
                     .setHeader("id", simple("${file:onlyname.noext}"))
                     .beanRef("business")
-                    .aggregate(header("country"), new MyBusinessTotal()).batchSize(10).batchTimeout(60000).to("mock:result");
+                    .aggregate(header("country"), new MyBusinessTotal())
+                        .completionTimeout(2000L)
+                        .to("mock:result");
             }
         });
 
@@ -142,7 +147,7 @@
             }
             Integer add = newExchange.getIn().getBody(Integer.class);
             int total = current.intValue() + add.intValue();
-            LOG.debug("Aggregated sum so far: " + total + " for country: " + country);
+            LOG.info("Aggregated sum so far: " + total + " for country: " + country);
             answer.getIn().setBody(total);
             return answer;
         }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Wed Feb 17 08:27:25 2010
@@ -16,9 +16,13 @@
  */
 package org.apache.camel.processor;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * @version $Revision$
@@ -44,19 +48,15 @@
         testSendALargeBatch("direct:predicate");
     }
     
-    public void testOutBatchPredicate() throws Exception {
-        testSendALargeBatch("direct:outBatchPredicate");
-    }
-
-    public void testOutBatchWithNoInBatching() throws Exception {
-        testSendALargeBatch("direct:outBatchNoInBatching");
-    }
-    
     public void testOneMessage() throws Exception {
         MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
 
         resultEndpoint.expectedMessageCount(1);
-        template.sendBodyAndHeader("direct:predicate", "test", "aggregated", 5);
+        Map headers = new HashMap();
+        headers.put("cheese", 123);
+        headers.put("aggregated", 5);
+
+        template.sendBodyAndHeaders("direct:predicate", "test", headers);
         resultEndpoint.assertIsSatisfied();
     }
 
@@ -84,25 +84,20 @@
 
                 // START SNIPPET: ex
                 // in this route we aggregate all from direct:state based on the header id cheese
-                from("direct:start").aggregate(header("cheese")).to("mock:result");
-
-                from("seda:header").setHeader("visited", constant(true)).aggregate(header("cheese")).to("mock:result");
+                from("direct:start")
+                    .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L)
+                        .to("mock:result");
+
+                from("seda:header").setHeader("visited", constant(true))
+                    .aggregate(header("cheese"), new UseLatestAggregationStrategy()).completionTimeout(1000L)
+                        .to("mock:result");
 
                 // in this sample we aggregate using our own strategy with a completion predicate
                 // stating that the aggregated header is equal to 5.
-                from("direct:predicate").aggregate(header("cheese"), new MyAggregationStrategy()).
-                        completionPredicate(header("aggregated").isEqualTo(5)).to("mock:result");
-                
-                // this sample is similar to the one above but it also illustrates the use of outBatchSize 
-                // to send exchanges to mock:endpoint in batches of 10.  
-                from("direct:outBatchPredicate").aggregate(header("cheese"), new MyAggregationStrategy()).
-                        completionPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result");
+                from("direct:predicate")
+                    .aggregate(header("cheese"), new MyAggregationStrategy()).completionPredicate(header("aggregated").isEqualTo(5))
+                        .to("mock:result");
                 // END SNIPPET: ex
-
-                // turning off in batching (batchSize = 1) is a good way to test "out" batching.  Don't include
-                // in wiki snippet as it may not be a good example to follow.
-                from("direct:outBatchNoInBatching").aggregate(header("cheese"), new MyAggregationStrategy()).
-                        completionPredicate(header("aggregated").isEqualTo(5)).batchSize(1).outBatchSize(10).to("mock:result");
             }
         };
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java Wed Feb 17 08:27:25 2010
@@ -94,7 +94,9 @@
                     .to("seda:aggregate");
                 
                 // collect and re-assemble the validated OrderItems into an order again
-                from("seda:aggregate").aggregate(new MyOrderAggregationStrategy()).header("orderId").to("mock:result");
+                from("seda:aggregate")
+                    .aggregate(new MyOrderAggregationStrategy()).header("orderId").completionTimeout(1000L)
+                        .to("mock:result");
                 // END SNIPPET: e2
             }
         };

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionPredicateTest.java Wed Feb 17 08:27:25 2010
@@ -125,7 +125,7 @@
             public void configure() throws Exception {
                 from("direct:start")
                     .aggregate(header("id"), new BodyInAggregatingStrategy())
-                            .completionPredicate(body().contains("END")).batchTimeout(20000)
+                            .completionPredicate(body().contains("END")).completionTimeout(20000)
                         .to("mock:aggregated");
             }
         };

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateFromWireTapTest.java Wed Feb 17 08:27:25 2010
@@ -20,6 +20,7 @@
 import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.BodyInAggregatingStrategy;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 
 public class AggregateFromWireTapTest extends ContextTestSupport {
@@ -50,10 +51,10 @@
                 from("direct:tap")
                     // just use a constant correlation expression as we want to agg everything
                     // in the same group. set batch size to two which means to fire when we
-                    // have received 2 incoming messages, if not the timeout of 5 sec will kick in
-                    .aggregate(new MyAggregationStrategy()).constant(true).batchSize(2)
-                       .batchTimeout(5000L)
-                        .to("direct:aggregated")
+                    // have aggregated 2 messages, if not the timeout of 5 sec will kick in
+                    .aggregate(constant(true), new MyAggregationStrategy())
+                        .completionSize(2).completionTimeout(5000L)
+                            .to("direct:aggregated")
                     .end();
 
                 from("direct:aggregated")

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeBatchSizeTest.java Wed Feb 17 08:27:25 2010
@@ -74,9 +74,9 @@
                 // our route is aggregating from the direct queue and sending the response to the mock
                 from("direct:start")
                     // aggregated all use same expression
-                    .aggregate().constant(true).batchSize(2)
+                    .aggregate().constant(true).completionSize(2)
                     // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
+                    .completionTimeout(500L)
                     // group the exchanges so we get one single exchange containing all the others
                     .groupExchanges()
                     .to("mock:result");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeMultipleCorrelationTest.java Wed Feb 17 08:27:25 2010
@@ -41,6 +41,9 @@
         template.sendBodyAndHeader("direct:start", "150", "foo", "A");
         template.sendBodyAndHeader("direct:start", "200", "foo", "B");
         template.sendBodyAndHeader("direct:start", "180", "foo", "B");
+
+        // to force B to timeout first as A is added last
+        Thread.sleep(100);
         template.sendBodyAndHeader("direct:start", "120", "foo", "A");
 
         assertMockEndpointsSatisfied();
@@ -50,18 +53,19 @@
 
         assertEquals(3, grouped.size());
 
-        assertEquals("100", grouped.get(0).getIn().getBody(String.class));
-        assertEquals("150", grouped.get(1).getIn().getBody(String.class));
-        assertEquals("120", grouped.get(2).getIn().getBody(String.class));
+        // B timeout first
+        assertEquals("130", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("200", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("180", grouped.get(2).getIn().getBody(String.class));
 
         out = result.getExchanges().get(1);
         grouped = out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
 
         assertEquals(3, grouped.size());
 
-        assertEquals("130", grouped.get(0).getIn().getBody(String.class));
-        assertEquals("200", grouped.get(1).getIn().getBody(String.class));
-        assertEquals("180", grouped.get(2).getIn().getBody(String.class));
+        assertEquals("100", grouped.get(0).getIn().getBody(String.class));
+        assertEquals("150", grouped.get(1).getIn().getBody(String.class));
+        assertEquals("120", grouped.get(2).getIn().getBody(String.class));
         // END SNIPPET: e2
     }
 
@@ -74,10 +78,10 @@
                 from("direct:start")
                     // aggregate all using the foo header
                     .aggregate().header("foo")
-                    // wait for 1 seconds to aggregate
-                    .batchTimeout(1000L)
                     // group the exchanges so we get one single exchange containing all the others
                     .groupExchanges()
+                    // wait for 1 seconds to aggregate
+                    .completionTimeout(1000L)
                     .to("mock:result");
                 // END SNIPPET: e1
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateGroupedExchangeTest.java Wed Feb 17 08:27:25 2010
@@ -67,7 +67,7 @@
                     // aggregate all using same expression
                     .aggregate().constant(true)
                     // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
+                    .completionTimeout(500L)
                     // group the exchanges so we get one single exchange containing all the others
                     .groupExchanges()
                     .to("mock:result");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateLostGroupIssueTest.java Wed Feb 17 08:27:25 2010
@@ -68,7 +68,7 @@
                         oldExchange.getIn().setBody(oldBody + "," + newBody);
                         return oldExchange;
                     }
-                }).batchSize(10).batchTimeout(2000L)
+                }).completionSize(10).completionTimeout(2000L)
                         .to("log:aggregated")
                         .to("mock:result");
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateMultipleSourceTest.java Wed Feb 17 08:27:25 2010
@@ -56,7 +56,7 @@
                 from("seda:baz").to("direct:aggregate");
 
                 from("direct:aggregate")
-                    .aggregate(header("type"), new MyAggregationStrategy()).batchSize(25).batchTimeout(5000)
+                    .aggregate(header("type"), new MyAggregationStrategy()).completionSize(25).completionTimeout(5000)
                         .to("mock:result")
                     .end();
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Wed Feb 17 08:27:25 2010
@@ -139,7 +139,7 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
-        ap.setCompletionAggregatedSize(3);
+        ap.setCompletionSize(3);
         ap.setEagerCheckCompletion(eager);
         ap.start();
 
@@ -365,35 +365,45 @@
         AggregationStrategy as = new BodyInAggregatingStrategy();
 
         AggregateProcessor ap = new AggregateProcessor(done, corr, as);
-        ap.setCompletionAggregatedSize(100);
-        ap.setUseBatchSizeFromConsumer(true);
+        ap.setCompletionSize(100);
+        ap.setCompletionFromBatchConsumer(true);
 
         ap.start();
 
         Exchange e1 = new DefaultExchange(context);
         e1.getIn().setBody("A");
         e1.getIn().setHeader("id", 123);
+        e1.setProperty(Exchange.BATCH_INDEX, 0);
         e1.setProperty(Exchange.BATCH_SIZE, 2);
+        e1.setProperty(Exchange.BATCH_COMPLETE, false);
 
         Exchange e2 = new DefaultExchange(context);
         e2.getIn().setBody("B");
         e2.getIn().setHeader("id", 123);
+        e2.setProperty(Exchange.BATCH_INDEX, 1);
         e2.setProperty(Exchange.BATCH_SIZE, 2);
+        e2.setProperty(Exchange.BATCH_COMPLETE, true);
 
         Exchange e3 = new DefaultExchange(context);
         e3.getIn().setBody("C");
         e3.getIn().setHeader("id", 123);
+        e3.setProperty(Exchange.BATCH_INDEX, 0);
         e3.setProperty(Exchange.BATCH_SIZE, 3);
+        e3.setProperty(Exchange.BATCH_COMPLETE, false);
 
         Exchange e4 = new DefaultExchange(context);
         e4.getIn().setBody("D");
         e4.getIn().setHeader("id", 123);
+        e4.setProperty(Exchange.BATCH_INDEX, 1);
         e4.setProperty(Exchange.BATCH_SIZE, 3);
+        e4.setProperty(Exchange.BATCH_COMPLETE, false);
 
         Exchange e5 = new DefaultExchange(context);
         e5.getIn().setBody("E");
         e5.getIn().setHeader("id", 123);
+        e5.setProperty(Exchange.BATCH_INDEX, 2);
         e5.setProperty(Exchange.BATCH_SIZE, 3);
+        e5.setProperty(Exchange.BATCH_COMPLETE, true);
 
         ap.process(e1);
         ap.process(e2);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java Wed Feb 17 08:27:25 2010
@@ -19,6 +19,7 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * Unit test to verify that aggregate by timeout only also works.
@@ -29,7 +30,7 @@
 
     public void testAggregateTimeoutOnly() throws Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
-        // by default the use latest aggregatation strategy is used so we get message 9
+        // by default the use latest aggregation strategy is used so we get message 9
         result.expectedBodiesReceived("Message 9");
         // should take 3 seconds to complete this one
         result.setMinimumResultWaitTime(2500);
@@ -49,7 +50,7 @@
                 // START SNIPPET: e1
                 from("direct:start")
                     // aggregate every 3th second and disable the batch size so we set it to 0
-                    .aggregate(header("id")).batchTimeout(3000).batchSize(0)
+                    .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).completionSize(0)
                     .to("mock:result");
                 // END SNIPPET: e1
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorAndOnExceptionTest.java Wed Feb 17 08:27:25 2010
@@ -22,7 +22,6 @@
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
-import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
 
 /**
  * Unit test inspired by user forum.
@@ -49,14 +48,12 @@
                 onException(CamelException.class).maximumRedeliveries(2);
 
                 from("seda:start")
-                    .aggregate(new PredicateAggregationCollection(header("id"),
+                    .aggregate(header("id"),
                         new AggregationStrategy() {
                             public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                                 return newExchange;
                             }
-                        },
-                        property(Exchange.AGGREGATED_SIZE).isEqualTo(2)))
-                    .batchTimeout(500L)
+                        }).completionSize(2).completionTimeout(500L)
                     .to("mock:result");
             }
         };

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBeanThrowExceptionTest.java Wed Feb 17 08:27:25 2010
@@ -18,6 +18,7 @@
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * Unit test with timer, splitter, aggregator and tracer.
@@ -35,7 +36,8 @@
             public void configure() throws Exception {
 
                 from("direct:start").
-                    aggregate(header("id")).
+                    aggregate(header("id"), new UseLatestAggregationStrategy()).
+                        completionTimeout(2000L).
                         bean(AggregatorBeanThrowExceptionTest.class, "fooDoesNotExistMethod").
                         to("log:foo");
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java Wed Feb 17 08:27:25 2010
@@ -90,7 +90,7 @@
                             LOG.debug("Index: " + newIndex + ". Total so far: " + total);
                             return answer;
                         }
-                    }).batchTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
+                    }).completionTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
                     .to("direct:foo");
 
                 from("direct:foo").setBody().header("total").to("mock:result");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionHandleTest.java Wed Feb 17 08:27:25 2010
@@ -21,6 +21,7 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * Based on CAMEL-1546
@@ -63,7 +64,7 @@
                 onException(IllegalArgumentException.class).handled(true).to("mock:handled");
 
                 from("direct:start")
-                    .aggregate(header("id"))
+                    .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(1000L)
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             String body = exchange.getIn().getBody(String.class);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java Wed Feb 17 08:27:25 2010
@@ -22,6 +22,7 @@
 import org.apache.camel.builder.ExpressionBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * Based on CAMEL-1546
@@ -65,7 +66,7 @@
 
                 from("direct:start")
                     .aggregate(header("id"))
-                    .batchTimeout(500)
+                    .completionTimeout(500)
                     .aggregationStrategy(new AggregationStrategy() {
                     
                         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
@@ -87,8 +88,8 @@
                             }
                             return ExpressionBuilder.headerExpression("id").evaluate(exchange, type);
                         }
-                    })
-                    .batchTimeout(500)
+                    }, new UseLatestAggregationStrategy())
+                    .completionTimeout(500)
                     .to("mock:result");
             }
         };

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionTest.java Wed Feb 17 08:27:25 2010
@@ -21,6 +21,7 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 public class AggregatorExceptionTest extends ContextTestSupport {
 
@@ -45,8 +46,8 @@
                 errorHandler(deadLetterChannel("mock:error"));
 
                 from("direct:start")
-                    .aggregate(header("id"))
-                    .batchSize(5)
+                    .aggregate(header("id"), new UseLatestAggregationStrategy())
+                    .completionSize(5)
                     .process(new Processor() {
                         public void process(Exchange exchange) throws Exception {
                             throw new java.lang.NoSuchMethodError(exceptionString);   

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorTimerAndTracerTest.java Wed Feb 17 08:27:25 2010
@@ -19,6 +19,7 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * Unit test with timer, splitter, aggregator and tracer.
@@ -39,7 +40,7 @@
                 getContext().setTracing(true);
                 
                 from("seda:splitted").
-                    aggregate(header("id")).
+                    aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(1000L).
                     to("mock:foo").
                     to("mock:result");
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java Wed Feb 17 08:27:25 2010
@@ -158,7 +158,7 @@
 
                 from("direct:joinSurnames")
                         .aggregate(header(SURNAME_HEADER),
-                                surnameAggregator).setHeader(TYPE_HEADER,
+                                surnameAggregator).completionTimeout(2000L).setHeader(TYPE_HEADER,
                         constant(BROTHERS_TYPE)).to("direct:joinBrothers");
 
                 // Join all brothers lists and remove surname and type headers
@@ -166,7 +166,7 @@
                         from("direct:joinBrothers").aggregate(header(TYPE_HEADER),
                                 brothersAggregator);
 
-                agg.setBatchTimeout(2000L);
+                agg.setCompletionTimeout(2000L);
                 agg.removeHeader(SURNAME_HEADER)
                         .removeHeader(TYPE_HEADER)
                         .to("mock:result");

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java Wed Feb 17 08:27:25 2010
@@ -32,8 +32,7 @@
         MockEndpoint result = getMockEndpoint("mock:result");
 
         // we expect to find the two winners with the highest bid
-        result.expectedMessageCount(2);
-        result.expectedBodiesReceived("200", "150");
+        result.expectedBodiesReceivedInAnyOrder("200", "150");
 
         // then we sent all the message at once
         template.sendBodyAndHeader("direct:start", "100", "id", "1");
@@ -55,8 +54,8 @@
                 from("direct:start")
                     // aggregated by header id and use our own strategy how to aggregate
                     .aggregate(new MyAggregationStrategy()).header("id")
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
+                    // wait for 1 seconds to aggregate
+                    .completionTimeout(1000L)
                     .to("mock:result");
                 // END SNIPPET: e1
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java Wed Feb 17 08:27:25 2010
@@ -17,9 +17,9 @@
 package org.apache.camel.processor.aggregator;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
  * Unit test for DefaultAggregatorCollection.
@@ -32,11 +32,7 @@
 
         // we expect 4 messages grouped by the latest message only
         result.expectedMessageCount(4);
-        result.expectedBodiesReceived("Message 1d", "Message 2b", "Message 3c", "Message 4");
-        result.message(0).property(Exchange.AGGREGATED_SIZE).isEqualTo(4);
-        result.message(1).property(Exchange.AGGREGATED_SIZE).isEqualTo(2);
-        result.message(2).property(Exchange.AGGREGATED_SIZE).isEqualTo(3);
-        result.message(3).property(Exchange.AGGREGATED_SIZE).isEqualTo(1);
+        result.expectedBodiesReceivedInAnyOrder("Message 1d", "Message 2b", "Message 3c", "Message 4");
 
         // then we sent all the message at once
         template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
@@ -64,9 +60,9 @@
                     // aggregated by header id
                     // as we have not configured more on the aggregator it will default to aggregate the
                     // latest exchange only
-                    .aggregate().header("id")
+                    .aggregate().header("id").aggregationStrategy(new UseLatestAggregationStrategy())
                     // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
+                    .completionTimeout(500L)
                     .to("mock:result");
                 // END SNIPPET: e1
             }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java?rev=910860&r1=910859&r2=910860&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java Wed Feb 17 08:27:25 2010
@@ -18,11 +18,8 @@
 package org.apache.camel.processor.aggregator;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationCollection;
-import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
@@ -60,20 +57,11 @@
         return new RouteBuilder() {
             public void configure() throws Exception {
                 // START SNIPPET: e1
-                // create the aggregation collection we will use.
-                // - we will correlate the received message based on the id header
-                // - as we will just keep the latest message we use the latest strategy
-                // - and finally we stop aggregate if we receive 2 or more messages
-                AggregationCollection ag = new PredicateAggregationCollection(header("id"),
-                    new UseLatestAggregationStrategy(),
-                    property(Exchange.AGGREGATED_SIZE).isEqualTo(3));
-
                 // our route is aggregating from the direct queue and sending the response to the mock
                 from("direct:start")
                     // we use the collection based aggregator we already have configured
-                    .aggregate(ag)
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
+                    .aggregate(header("id"), new UseLatestAggregationStrategy())
+                    .completionSize(3)
                     .to("mock:result");
                 // END SNIPPET: e1
             }



Mime
View raw message