camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hadr...@apache.org
Subject svn commit: r633098 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/BatchProcessor.java main/java/org/apache/camel/processor/aggregate/AggregationCollection.java test/java/org/apache/camel/processor/AggregatorTest.java
Date Mon, 03 Mar 2008 14:35:31 GMT
Author: hadrian
Date: Mon Mar  3 06:35:30 2008
New Revision: 633098

URL: http://svn.apache.org/viewvc?rev=633098&view=rev
Log:
CAMEL-352.  Patch applied with thanks!

Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=633098&r1=633097&r2=633098&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Mon Mar  3 06:35:30 2008
@@ -40,7 +40,7 @@
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 
-    private static final transient Log LOG = LogFactory.getLog(Resequencer.class);
+    private static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
     private Endpoint endpoint;
     private Processor processor;
     private Collection<Exchange> collection;
@@ -118,16 +118,20 @@
         long end = start + batchTimeout;
         for (int i = 0; !isBatchCompleted(i); i++) {
             long timeout = end - System.currentTimeMillis();
-
+            if (timeout < 0L) {                
+                LOG.debug("batch timeout expired at batch index:"  + i);
+                break;
+            }
             Exchange exchange = consumer.receive(timeout);
             if (exchange == null) {
+                LOG.debug("receive with timeout: " + timeout + " expired at batch index:"
 + i);
                 break;
             }
             collection.add(exchange);
         }
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Finsihed batch size: " + batchSize + " timeout: " + batchTimeout +
" so sending set: "
+            LOG.debug("Finished batch size: " + batchSize + " timeout: " + batchTimeout +
" so sending set: "
                       + collection);
         }
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=633098&r1=633097&r2=633098&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
(original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
Mon Mar  3 06:35:30 2008
@@ -24,6 +24,8 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Expression;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A {@link Collection} which aggregates exchanges together using a correlation
@@ -33,6 +35,7 @@
  * @version $Revision$
  */
 public class AggregationCollection extends AbstractCollection<Exchange> {
+    private static final transient Log LOG = LogFactory.getLog(AggregationCollection.class);
     private final Expression<Exchange> correlationExpression;
     private final AggregationStrategy aggregationStrategy;
     private Map<Object, Exchange> map = new LinkedHashMap<Object, Exchange>();
@@ -58,6 +61,7 @@
 
         // the strategy may just update the old exchange and return it
         if (newExchange != oldExchange) {
+            LOG.debug("put exchange:" + newExchange + " for key:"  + correlationKey);
             map.put(correlationKey, newExchange);
         }
         onAggregation(correlationKey, newExchange);

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java?rev=633098&r1=633097&r2=633098&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Mon Mar  3 06:35:30 2008
@@ -60,7 +60,15 @@
         template.sendBodyAndHeader("direct:predicate", "test", "aggregated", 5);
         resultEndpoint.assertIsSatisfied();
     }
-    
+ 
+    public void testBatchTimeoutExpiry() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+        resultEndpoint.setSleepForEmptyTest(2 * BatchProcessor.DEFAULT_BATCH_TIMEOUT);
+        template.sendBodyAndHeader("direct:start", "message:1", "cheese", 123);
+        resultEndpoint.assertIsSatisfied();
+    }
+     
     protected RouteBuilder createRouteBuilder() {
         return new RouteBuilder() {
             public void configure() {



Mime
View raw message