Author: hadrian
Date: Mon Mar 3 06:35:30 2008
New Revision: 633098
URL: http://svn.apache.org/viewvc?rev=633098&view=rev
Log:
CAMEL-352. Patch applied with thanks!
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=633098&r1=633097&r2=633098&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Mar 3 06:35:30 2008
@@ -40,7 +40,7 @@
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
- private static final transient Log LOG = LogFactory.getLog(Resequencer.class);
+ private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
private Endpoint endpoint;
private Processor processor;
private Collection<Exchange> collection;
@@ -118,16 +118,20 @@
long end = start + batchTimeout;
for (int i = 0; !isBatchCompleted(i); i++) {
long timeout = end - System.currentTimeMillis();
-
+ if (timeout < 0L) {
+ LOG.debug("batch timeout expired at batch index:" + i);
+ break;
+ }
Exchange exchange = consumer.receive(timeout);
if (exchange == null) {
+ LOG.debug("receive with timeout: " + timeout + " expired at batch index:"
+ i);
break;
}
collection.add(exchange);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout +
" so sending set: "
+ LOG.debug("Finished batch size: " + batchSize + " timeout: " + batchTimeout +
" so sending set: "
+ collection);
}
Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=633098&r1=633097&r2=633098&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
Mon Mar 3 06:35:30 2008
@@ -24,6 +24,8 @@
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A {@link Collection} which aggregates exchanges together using a correlation
@@ -33,6 +35,7 @@
* @version $Revision$
*/
public class AggregationCollection extends AbstractCollection<Exchange> {
+ private static final transient Log LOG = LogFactory.getLog(AggregationCollection.class);
private final Expression<Exchange> correlationExpression;
private final AggregationStrategy aggregationStrategy;
private Map<Object, Exchange> map = new LinkedHashMap<Object, Exchange>();
@@ -58,6 +61,7 @@
// the strategy may just update the old exchange and return it
if (newExchange != oldExchange) {
+ LOG.debug("put exchange:" + newExchange + " for key:" + correlationKey);
map.put(correlationKey, newExchange);
}
onAggregation(correlationKey, newExchange);
Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=633098&r1=633097&r2=633098&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Mar 3 06:35:30 2008
@@ -60,7 +60,15 @@
template.sendBodyAndHeader("direct:predicate", "test", "aggregated", 5);
resultEndpoint.assertIsSatisfied();
}
-
+
+ public void testBatchTimeoutExpiry() throws Exception {
+ MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+ resultEndpoint.expectedMessageCount(1);
+ resultEndpoint.setSleepForEmptyTest(2 * BatchProcessor.DEFAULT_BATCH_TIMEOUT);
+ template.sendBodyAndHeader("direct:start", "message:1", "cheese", 123);
+ resultEndpoint.assertIsSatisfied();
+ }
+
protected RouteBuilder createRouteBuilder() {
return new RouteBuilder() {
public void configure() {
|