camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r552848 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ main/java/org/apache/camel/util/ test/java/org/apache/camel/processor/
Date Tue, 03 Jul 2007 14:16:24 GMT
Author: jstrachan
Date: Tue Jul  3 07:16:23 2007
New Revision: 552848

URL: http://svn.apache.org/viewvc?view=rev&rev=552848
Log:
added an aggregator pattern implementation so multiple messages for the same correlation key can be combined into a single message message for CAMEL-7. Also a partial first cut of CAMEL-63 for implementing delayed message delivery

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/AggregatorBuilder.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java   (contents, props changed)
      - copied, changed from r552772, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java   (with props)
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/package.html
      - copied, changed from r552772, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java   (contents, props changed)
      - copied, changed from r552772, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/AggregatorBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/AggregatorBuilder.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/AggregatorBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/AggregatorBuilder.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,116 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.Route;
+import org.apache.camel.Service;
+import org.apache.camel.processor.Aggregator;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+
+import java.util.List;
+
+/**
+ * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
+ * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
+ * and messages for the same correlation key are combined together using some kind of
+ * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
+ * into a smaller number of exchanges.
+ * <p/>
+ * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
+ * throttle it right down so that multiple messages for the same stock are combined (or just the latest
+ * message is used and older prices are discarded). Another idea is to combine line item messages together
+ * into a single invoice message.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class AggregatorBuilder extends FromBuilder {
+    private final Expression correlationExpression;
+    private long batchTimeout = 1000L;
+    private int batchSize = 50000;
+    private AggregationStrategy aggregationStrategy = new UseLatestAggregationStrategy();
+
+    public AggregatorBuilder(FromBuilder builder, Expression correlationExpression) {
+        super(builder);
+        this.correlationExpression = correlationExpression;
+    }
+
+    @Override
+    public Route createRoute() throws Exception {
+        final Processor processor = super.createProcessor();
+        final Aggregator service = new Aggregator(getFrom(), processor, correlationExpression, aggregationStrategy);
+
+        return new Route<Exchange>(getFrom()) {
+            protected void addServices(List<Service> list) throws Exception {
+                list.add(service);
+            }
+
+            @Override
+            public String toString() {
+                return "AggregatorRoute[" + getEndpoint() + " -> " + processor + "]";
+            }
+        };
+    }
+
+    // Builder methods
+    //-------------------------------------------------------------------------
+    public AggregatorBuilder aggregationStrategy(AggregationStrategy aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
+    }
+
+    public AggregatorBuilder batchSize(int batchSize) {
+        setBatchSize(batchSize);
+        return this;
+    }
+
+    public AggregatorBuilder batchTimeout(int batchTimeout) {
+        setBatchTimeout(batchTimeout);
+        return this;
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
+    }
+
+    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public long getBatchTimeout() {
+        return batchTimeout;
+    }
+
+    public void setBatchTimeout(long batchTimeout) {
+        this.batchTimeout = batchTimeout;
+    }
+
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/AggregatorBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,46 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.processor.DelayerProcessor;
+
+/**
+ * 
+ * @version $Revision: 1.1 $
+ */
+public class DelayerBuilder extends FromBuilder {
+    private final Expression<Exchange> processAtExpression;
+    private long delay;
+    private long batchTimeout = 0L;
+    private int batchSize = 1;
+
+    public DelayerBuilder(FromBuilder builder, Expression<Exchange> processAtExpression, long delay) {
+        super(builder);
+        this.delay = delay;
+        this.processAtExpression = processAtExpression;
+    }
+
+    @Override
+    public Processor createProcessor() throws Exception {
+        final Processor processor = super.createProcessor();
+        return new DelayerProcessor(processor, processAtExpression, delay);
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DelayerBuilder.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java?view=diff&rev=552848&r1=552847&r2=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/FromBuilder.java Tue Jul  3 07:16:23 2007
@@ -28,6 +28,7 @@
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.idempotent.IdempotentConsumer;
 import org.apache.camel.processor.idempotent.MessageIdRepository;
 import org.apache.camel.spi.Policy;
@@ -241,6 +242,79 @@
             list.add(expression);
         }
         return resequencer(list);
+    }
+
+    /**
+     * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
+     * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
+     * and messages for the same correlation key are combined together using some kind of
+     * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
+     * into a smaller number of exchanges.
+     * <p/>
+     * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
+     * throttle it right down so that multiple messages for the same stock are combined (or just the latest
+     * message is used and older prices are discarded). Another idea is to combine line item messages together
+     * into a single invoice message.
+     *
+     * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
+     * be the expression <code>header("JMSDestination")</code> or  <code>header("JMSCorrelationID")</code>
+     */
+    @Fluent
+    public AggregatorBuilder aggregator(Expression correlationExpression) {
+        AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression);
+        setRouteBuilder(answer);
+        return answer;
+    }
+
+    /**
+     * A builder for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
+     * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
+     * and messages for the same correlation key are combined together using some kind of
+     * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
+     * into a smaller number of exchanges.
+     * <p/>
+     * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
+     * throttle it right down so that multiple messages for the same stock are combined (or just the latest
+     * message is used and older prices are discarded). Another idea is to combine line item messages together
+     * into a single invoice message.
+     *
+     * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
+     * be the expression <code>header("JMSDestination")</code> or  <code>header("JMSCorrelationID")</code>
+     */
+    @Fluent
+    public AggregatorBuilder aggregator(Expression correlationExpression, AggregationStrategy strategy) {
+        AggregatorBuilder answer = new AggregatorBuilder(this, correlationExpression);
+        answer.aggregationStrategy(strategy);
+        setRouteBuilder(answer);
+        return answer;
+    }
+
+    /**
+     * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
+     * where an expression is used to calculate the time which the message will be dispatched on
+     *
+     * @param processAtExpression an expression to calculate the time at which the messages should be processed
+     * @return the builder
+     */
+    @Fluent
+    public DelayerBuilder delayer(Expression<Exchange> processAtExpression) {
+        DelayerBuilder answer = new DelayerBuilder(this, processAtExpression, 0L);
+        setRouteBuilder(answer);
+        return answer;
+    }
+
+    /**
+     * A builder for the <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> pattern
+     * where a fixed amount of milliseconds are used to delay processing of a message exchange
+     *
+     * @param delay the default delay in milliseconds
+     * @return the builder
+     */
+    @Fluent
+    public DelayerBuilder delayer(long delay) {
+        DelayerBuilder answer = new DelayerBuilder(this, null, delay);
+        setRouteBuilder(answer);
+        return answer;
     }
 
     /**

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (from r552772, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?view=diff&rev=552848&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java&r1=552772&p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java&r2=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Tue Jul  3 07:16:23 2007
@@ -18,160 +18,38 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.LoggingExceptionHandler;
-import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.spi.ExceptionHandler;
-import org.apache.camel.util.ExpressionComparator;
-import org.apache.camel.util.ExpressionListComparator;
-import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
 
 /**
- * An implementation of the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
+ * An implementation of the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern
+ * where a batch of messages are processed (up to a maximum amount or until some timeout is reached)
+ * and messages for the same correlation key are combined together using some kind of
+ * {@link AggregationStrategy ) (by default the latest message is used) to compress many message exchanges
+ * into a smaller number of exchanges.
+ * <p/>
+ * A good example of this is stock market data; you may be receiving 30,000 messages/second and you may want to
+ * throttle it right down so that multiple messages for the same stock are combined (or just the latest
+ * message is used and older prices are discarded). Another idea is to combine line item messages together
+ * into a single invoice message.
  *
  * @version $Revision: 1.1 $
+ * @param correlationExpression the expression used to calculate the correlation key. For a JMS message this could
+ * be the expression <code>header("JMSDestination")</code> or  <code>header("JMSCorrelationID")</code>
  */
-public class Resequencer extends ServiceSupport implements Runnable {
-    private static final transient Log log = LogFactory.getLog(Resequencer.class);
-    private Endpoint endpoint;
-    private Processor processor;
-    private Set<Exchange> set;
-    private long batchTimeout = 1000L;
-    private int batchSize = 100;
-    private PollingConsumer consumer;
-    private ExceptionHandler exceptionHandler;
-
-    public Resequencer(Endpoint endpoint, Processor processor, Expression<Exchange> expression) {
-        this(endpoint, processor, createSet(expression));
+public class Aggregator extends BatchProcessor {
+    public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression, AggregationStrategy aggregationStrategy) {
+        this(endpoint, processor, new AggregationCollection(correlationExpression, aggregationStrategy));
     }
 
-    public Resequencer(Endpoint endpoint, Processor processor, List<Expression<Exchange>> expressions) {
-        this(endpoint, processor, createSet(expressions));
-    }
-
-    public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> set) {
-        this.endpoint = endpoint;
-        this.processor = processor;
-        this.set = set;
+    public Aggregator(Endpoint endpoint, Processor processor, AggregationCollection collection) {
+        super(endpoint, processor, collection);
     }
 
     @Override
     public String toString() {
-        return "Resequencer[to: " + processor + "]";
-    }
-
-    public void run() {
-        log.debug("Starting thread for " + this);
-        while (!isStopped() && !isStopping()) {
-            try {
-                processBatch();
-            }
-            catch (Exception e) {
-                getExceptionHandler().handleException(e);
-            }
-        }
-        set.clear();
-    }
-
-    // Properties
-    //-------------------------------------------------------------------------
-    public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
-        return exceptionHandler;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public long getBatchTimeout() {
-        return batchTimeout;
-    }
-
-    public void setBatchTimeout(long batchTimeout) {
-        this.batchTimeout = batchTimeout;
-    }
-
-    // Implementation methods
-    //-------------------------------------------------------------------------
-
-    /**
-     * A transactional method to process a batch of messages up to a timeout period
-     * or number of messages reached.
-     */
-    protected synchronized void processBatch() throws Exception {
-        long start = System.currentTimeMillis();
-        long end = start + batchTimeout;
-        for (int i = 0; i < batchSize; i++) {
-            long timeout = end - System.currentTimeMillis();
-
-            Exchange exchange = consumer.receive(timeout);
-            if (exchange == null) {
-                break;
-            }
-            set.add(exchange);
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: " + set);
-        }
-
-        // lets send the batch
-        Iterator<Exchange> iter = set.iterator();
-        while (iter.hasNext()) {
-            Exchange exchange = iter.next();
-            iter.remove();
-            processor.process(exchange);
-        }
-    }
-
-    protected void doStart() throws Exception {
-        consumer = endpoint.createPollingConsumer();
-
-        ServiceHelper.startServices(processor, consumer);
-
-        Thread thread = new Thread(this, this + " Polling Thread");
-        thread.start();
-    }
-
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumer, processor);
-        consumer = null;
-    }
-
-    protected static Set<Exchange> createSet(Expression<Exchange> expression) {
-        return createSet(new ExpressionComparator<Exchange>(expression));
-    }
-
-    protected static Set<Exchange> createSet(List<Expression<Exchange>> expressions) {
-        if (expressions.size() == 1) {
-            return createSet(expressions.get(0));
-        }
-        return createSet(new ExpressionListComparator<Exchange>(expressions));
-    }
-
-    protected static Set<Exchange> createSet(Comparator<? super Exchange> comparator) {
-        return new TreeSet<Exchange>(comparator);
+        return "Aggregator[to: " + getProcessor() + "]";
     }
-}
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,166 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.PollingConsumer;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.LoggingExceptionHandler;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.spi.ExceptionHandler;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * A base class for any kind of {@link Processor} which implements some kind of batch processing.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class BatchProcessor extends ServiceSupport implements Runnable {
+    private static final transient Log log = LogFactory.getLog(Resequencer.class);
+    private Endpoint endpoint;
+    private Processor processor;
+    private Collection<Exchange> collection;
+    private long batchTimeout = 1000L;
+    private int batchSize = 100;
+    private PollingConsumer consumer;
+    private ExceptionHandler exceptionHandler;
+
+    public BatchProcessor(Endpoint endpoint, Processor processor, Collection<Exchange> collection) {
+        this.endpoint = endpoint;
+        this.processor = processor;
+        this.collection = collection;
+    }
+
+    @Override
+    public String toString() {
+        return "BatchProcessor[to: " + processor + "]";
+    }
+
+    public void run() {
+        log.debug("Starting thread for " + this);
+        while (!isStopped() && !isStopping()) {
+            try {
+                processBatch();
+            }
+            catch (Exception e) {
+                getExceptionHandler().handleException(e);
+            }
+        }
+        collection.clear();
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public ExceptionHandler getExceptionHandler() {
+        if (exceptionHandler == null) {
+            exceptionHandler = new LoggingExceptionHandler(getClass());
+        }
+        return exceptionHandler;
+    }
+
+    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
+        this.exceptionHandler = exceptionHandler;
+    }
+
+    public int getBatchSize() {
+        return batchSize;
+    }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public long getBatchTimeout() {
+        return batchTimeout;
+    }
+
+    public void setBatchTimeout(long batchTimeout) {
+        this.batchTimeout = batchTimeout;
+    }
+
+    public Endpoint getEndpoint() {
+        return endpoint;
+    }
+
+    public Processor getProcessor() {
+        return processor;
+    }
+
+    /**
+     * A transactional method to process a batch of messages up to a timeout period
+     * or number of messages reached.
+     */
+    protected synchronized void processBatch() throws Exception {
+        long start = System.currentTimeMillis();
+        long end = start + batchTimeout;
+        for (int i = 0; i < batchSize; i++) {
+            long timeout = end - System.currentTimeMillis();
+
+            Exchange exchange = consumer.receive(timeout);
+            if (exchange == null) {
+                break;
+            }
+            collection.add(exchange);
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: " + collection);
+        }
+
+        // lets send the batch
+        Iterator<Exchange> iter = collection.iterator();
+        while (iter.hasNext()) {
+            Exchange exchange = iter.next();
+            iter.remove();
+            processExchange(exchange);
+        }
+    }
+
+    /**
+     * Strategy Method to process an exchange in the batch. This method allows derived classes
+     * to perform custom processing before or after an individual exchange is processed
+     */
+    protected void processExchange(Exchange exchange) throws Exception {
+        processor.process(exchange);
+    }
+
+    protected void doStart() throws Exception {
+        consumer = endpoint.createPollingConsumer();
+
+        ServiceHelper.startServices(processor, consumer);
+
+        Thread thread = new Thread(this, this + " Polling Thread");
+        thread.start();
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopServices(consumer, processor);
+        consumer = null;
+        collection.clear();
+    }
+
+    protected Collection<Exchange> getCollection() {
+        return collection;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,137 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Processor;
+import org.apache.camel.impl.ServiceSupport;
+import org.apache.camel.util.ExpressionHelper;
+import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * A <a href="http://activemq.apache.org/camel/delayer.html">Delayer</a> which delays
+ * processing the exchange until the correct amount of time has elapsed
+ * using an expression to determine the delivery time.
+ *
+ * For example if you wish to delay JMS messages by 25 seconds from their publish time you could create
+ * an instance of this class with the expression <code>header("JMSTimestamp")</code> and a delay value of 25000L.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class DelayerProcessor extends ServiceSupport implements Processor {
+    private static final transient Log log = LogFactory.getLog(DelayerProcessor.class);
+    private Expression<Exchange> timeExpression;
+    private Processor processor;
+    private long delay = 0L;
+
+    public DelayerProcessor(Processor processor, Expression<Exchange> timeExpression, long delay) {
+        this.processor = processor;
+        this.timeExpression = timeExpression;
+        this.delay = delay;
+    }
+
+    public void process(Exchange exchange) throws Exception {
+        long time = 0;
+        if (timeExpression != null) {
+            Long longValue = ExpressionHelper.evaluateAsType(timeExpression, exchange, Long.class);
+            if (longValue != null) {
+                time = longValue.longValue();
+            }
+        }
+        if (time <= 0) {
+            time = defaultProcessTime(exchange);
+        }
+
+        time += delay;
+
+        waitUntil(time, exchange);
+        processor.process(exchange);
+    }
+
+    // Properties
+    //-------------------------------------------------------------------------
+    public long getDelay() {
+        return delay;
+    }
+
+    /**
+     * Sets the delay from the publish time; which is typically the time from the expression
+     * or the current system time if none is available
+     */
+    public void setDelay(long delay) {
+        this.delay = delay;
+    }
+
+    // Implementation methods
+    //-------------------------------------------------------------------------
+
+    protected void doStart() throws Exception {
+        ServiceHelper.startService(processor);
+    }
+
+    protected void doStop() throws Exception {
+        ServiceHelper.stopService(processor);
+    }
+
+    /**
+     * Wait until the given system time before continuing
+     *
+     * @param time     the system time to wait for
+     * @param exchange the exchange being processed
+     */
+    protected void waitUntil(long time, Exchange exchange) {
+        while (true) {
+            long delay = time - currentSystemTime();
+            if (delay < 0) {
+                return;
+            }
+            else {
+                try {
+                    Thread.sleep(delay);
+                }
+                catch (InterruptedException e) {
+                    handleSleepInteruptedException(e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Called when a sleep is interupted; allows derived classes to handle this case differently
+     */
+    protected void handleSleepInteruptedException(InterruptedException e) {
+        log.debug("Sleep interupted: " + e, e);
+    }
+
+    /**
+     * A Strategy Method to allow derived implementations to decide the current system time or some other
+     * default exchange property
+     *
+     * @param exchange
+     */
+    protected long defaultProcessTime(Exchange exchange) {
+        return currentSystemTime();
+    }
+
+    protected long currentSystemTime() {
+        return System.currentTimeMillis();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayerProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java?view=diff&rev=552848&r1=552847&r2=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Resequencer.java Tue Jul  3 07:16:23 2007
@@ -20,38 +20,22 @@
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
-import org.apache.camel.impl.LoggingExceptionHandler;
-import org.apache.camel.impl.ServiceSupport;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ExpressionComparator;
 import org.apache.camel.util.ExpressionListComparator;
-import org.apache.camel.util.ServiceHelper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
 /**
  * An implementation of the <a href="http://activemq.apache.org/camel/resequencer.html">Resequencer</a>
+ * which can reorder messages within a batch.
  *
  * @version $Revision: 1.1 $
  */
-public class Resequencer extends ServiceSupport implements Runnable {
-    private static final transient Log log = LogFactory.getLog(Resequencer.class);
-    private Endpoint endpoint;
-    private Processor processor;
-    private Set<Exchange> set;
-    private long batchTimeout = 1000L;
-    private int batchSize = 100;
-    private PollingConsumer consumer;
-    private ExceptionHandler exceptionHandler;
-
+public class Resequencer extends BatchProcessor {
     public Resequencer(Endpoint endpoint, Processor processor, Expression<Exchange> expression) {
         this(endpoint, processor, createSet(expression));
     }
@@ -60,105 +44,17 @@
         this(endpoint, processor, createSet(expressions));
     }
 
-    public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> set) {
-        this.endpoint = endpoint;
-        this.processor = processor;
-        this.set = set;
+    public Resequencer(Endpoint endpoint, Processor processor, Set<Exchange> collection) {
+        super(endpoint, processor, collection);
     }
 
     @Override
     public String toString() {
-        return "Resequencer[to: " + processor + "]";
-    }
-
-    public void run() {
-        log.debug("Starting thread for " + this);
-        while (!isStopped() && !isStopping()) {
-            try {
-                processBatch();
-            }
-            catch (Exception e) {
-                getExceptionHandler().handleException(e);
-            }
-        }
-        set.clear();
-    }
-
-    // Properties
-    //-------------------------------------------------------------------------
-    public ExceptionHandler getExceptionHandler() {
-        if (exceptionHandler == null) {
-            exceptionHandler = new LoggingExceptionHandler(getClass());
-        }
-        return exceptionHandler;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    public int getBatchSize() {
-        return batchSize;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public long getBatchTimeout() {
-        return batchTimeout;
-    }
-
-    public void setBatchTimeout(long batchTimeout) {
-        this.batchTimeout = batchTimeout;
+        return "Resequencer[to: " + getProcessor() + "]";
     }
 
     // Implementation methods
     //-------------------------------------------------------------------------
-
-    /**
-     * A transactional method to process a batch of messages up to a timeout period
-     * or number of messages reached.
-     */
-    protected synchronized void processBatch() throws Exception {
-        long start = System.currentTimeMillis();
-        long end = start + batchTimeout;
-        for (int i = 0; i < batchSize; i++) {
-            long timeout = end - System.currentTimeMillis();
-
-            Exchange exchange = consumer.receive(timeout);
-            if (exchange == null) {
-                break;
-            }
-            set.add(exchange);
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: " + set);
-        }
-
-        // lets send the batch
-        Iterator<Exchange> iter = set.iterator();
-        while (iter.hasNext()) {
-            Exchange exchange = iter.next();
-            iter.remove();
-            processor.process(exchange);
-        }
-    }
-
-    protected void doStart() throws Exception {
-        consumer = endpoint.createPollingConsumer();
-
-        ServiceHelper.startServices(processor, consumer);
-
-        Thread thread = new Thread(this, this + " Polling Thread");
-        thread.start();
-    }
-
-    protected void doStop() throws Exception {
-        ServiceHelper.stopServices(consumer, processor);
-        consumer = null;
-    }
 
     protected static Set<Exchange> createSet(Expression<Exchange> expression) {
         return createSet(new ExpressionComparator<Exchange>(expression));

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,68 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+import java.util.AbstractCollection;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A {@link Collection} which aggregates exchanges together using a correlation expression
+ * so that there is only a single message exchange sent for a single correlation key.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class AggregationCollection extends AbstractCollection<Exchange> {
+    private final Expression<Exchange> correlationExpression;
+    private final AggregationStrategy aggregationStrategy;
+    private Map<Object, Exchange> map = new LinkedHashMap<Object, Exchange>();
+
+    public AggregationCollection(Expression<Exchange> correlationExpression, AggregationStrategy aggregationStrategy) {
+        this.correlationExpression = correlationExpression;
+        this.aggregationStrategy = aggregationStrategy;
+    }
+
+    @Override
+    public boolean add(Exchange exchange) {
+        Object correlationKey = correlationExpression.evaluate(exchange);
+        Exchange oldExchange = map.get(correlationKey);
+        Exchange newExchange = exchange;
+        if (oldExchange != null) {
+            newExchange = aggregationStrategy.aggregate(oldExchange, newExchange);
+        }
+
+        // the strategy may just update the old exchange and return it
+        if (newExchange != oldExchange) {
+            map.put(correlationKey, newExchange);
+        }
+        return true;
+    }
+
+    public Iterator<Exchange> iterator() {
+        return map.values().iterator();
+    }
+
+    public int size() {
+        return map.size();
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,41 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+/**
+ * A strategy for aggregating two exchanges together into a single exchange.
+ * Possible implementations include performing some kind of combining or delta processing,
+ * such as adding line items together into an invoice or just using the newest exchange and
+ * removing old exchanges such as for state tracking or market data prices; where old values are of little use.
+ *
+ * @version $Revision: 1.1 $
+ */
+public interface AggregationStrategy {
+    
+    /**
+     * Aggregates an old and new exchange together to create a single combined
+     * exchange
+     *
+     * @param oldExchange the oldest exchange
+     * @param newExchange the newest exchange
+     * @return a combined composite of the two exchanges
+     */
+    Exchange aggregate(Exchange oldExchange, Exchange newExchange);
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java?view=auto&rev=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java Tue Jul  3 07:16:23 2007
@@ -0,0 +1,34 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate;
+
+import org.apache.camel.Exchange;
+
+/**
+ * An {@link AggregationStrategy} which just uses the latest exchange which is useful
+ * for status messages where old status messages have no real value; another example is things
+ * like market data prices, where old stock prices are not that relevant, only the current price is.
+ *
+ * @version $Revision: 1.1 $
+ */
+public class UseLatestAggregationStrategy implements AggregationStrategy{
+
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        return newExchange;
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/UseLatestAggregationStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/package.html (from r552772, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/package.html?view=diff&rev=552848&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html&r1=552772&p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/package.html&r2=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/idempotent/package.html (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/package.html Tue Jul  3 07:16:23 2007
@@ -19,7 +19,7 @@
 </head>
 <body>
 
-An implementation of the <a href="http://activemq.apache.org/camel/idempotent-consumer.html">Idempotent Consumer</a> pattern.
+Helper classes for the <a href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a> pattern.
 
 </body>
 </html>

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java?view=diff&rev=552848&r1=552847&r2=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ServiceHelper.java Tue Jul  3 07:16:23 2007
@@ -32,6 +32,16 @@
     private static final transient Log log = LogFactory.getLog(ServiceHelper.class);
 
 
+    public static void startService(Object value) throws Exception {
+        if (value instanceof Service) {
+            Service service = (Service) value;
+            service.start();
+        }
+        else if (value instanceof Collection) {
+            startServices((Collection) value);
+        }
+    }
+
     /**
      * Starts all of the given services
      */
@@ -53,16 +63,6 @@
         }
     }
 
-    public static void startService(Object value) throws Exception {
-        if (value instanceof Service) {
-            Service service = (Service) value;
-            service.start();
-        }
-        else if (value instanceof Collection) {
-            startServices((Collection) value);
-        }
-    }
-
     /**
      * Stops all of the given services, throwing the first exception caught
      */
@@ -86,6 +86,18 @@
             throw firstException;
         }
     }
+
+    public static void stopService(Object value) throws Exception {
+        if (value instanceof Service) {
+            Service service = (Service) value;
+            service.stop();
+        }
+        else if (value instanceof Collection) {
+            stopServices((Collection) value);
+        }
+    }
+
+
     /**
      * Stops all of the given services, throwing the first exception caught
      */

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java (from r552772, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?view=diff&rev=552848&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java&r1=552772&p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java&r2=552848
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FilterTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java Tue Jul  3 07:16:23 2007
@@ -18,32 +18,25 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Endpoint;
-import org.apache.camel.Exchange;
-import org.apache.camel.Message;
-import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version $Revision: 1.1 $
  */
-public class FilterTest extends ContextTestSupport {
-    protected Endpoint<Exchange> startEndpoint;
-    protected MockEndpoint resultEndpoint;
+public class AggregatorTest extends ContextTestSupport {
+    protected int messageCount = 100;
 
     public void testSendMatchingMessage() throws Exception {
-        resultEndpoint.expectedMessageCount(1);
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
 
-        sendMessage("bar", "matched");
+        resultEndpoint.expectedBodiesReceived("message:" + messageCount);
 
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    public void testSendNotMatchingMessage() throws Exception {
-        resultEndpoint.expectedMessageCount(0);
-
-        sendMessage("notMatchedHeaderValue", "notMatched");
+        // lets send a large batch of messages
+        for (int i = 1; i <= messageCount; i++) {
+            String body = "message:" + i;
+            template.sendBody("direct:a", body, "cheese", 123);
+        }
 
         resultEndpoint.assertIsSatisfied();
     }
@@ -52,26 +45,13 @@
     protected void setUp() throws Exception {
         super.setUp();
 
-        startEndpoint = resolveMandatoryEndpoint("direct:a");
-        resultEndpoint = (MockEndpoint) resolveMandatoryEndpoint("mock:result");
     }
 
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {
-                from("direct:a").filter(header("foo").isEqualTo("bar")).to("mock:result");
+                from("direct:a").aggregator(header("cheese")).to("mock:result");
             }
         };
     }
-
-    protected void sendMessage(final Object headerValue, final Object body) throws Exception {
-        template.send(startEndpoint, new Processor() {
-            public void process(Exchange exchange) {
-                // now lets fire in a message
-                Message in = exchange.getIn();
-                in.setBody(body);
-                in.setHeader("foo", headerValue);
-            }
-        });
-    }
-}
+}
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message