camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1310690 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/aggregator/
Date Sat, 07 Apr 2012 07:55:16 GMT
Author: davsclaus
Date: Sat Apr  7 07:55:15 2012
New Revision: 1310690

URL: http://svn.apache.org/viewvc?rev=1310690&view=rev
Log:
CAMEL-5148: Aggregate EIP now supports TimeoutAwareAggregationStrategy.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
      - copied, changed from r1310679, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1310690&r1=1310689&r2=1310690&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sat Apr  7 07:55:15 2012
@@ -375,6 +375,15 @@ public class AggregateProcessor extends 
             closedCorrelationKeys.put(key, key);
         }
 
+        if (fromTimeout) {
+            // invoke timeout if its timeout aware aggregation strategy,
+            // to allow any custom processing before discarding the exchange
+            if (aggregationStrategy instanceof TimeoutAwareAggregationStrategy) {
+                long timeout = getCompletionTimeout() > 0 ? getCompletionTimeout() : -1;
+                ((TimeoutAwareAggregationStrategy) aggregationStrategy).timeout(exchange,
-1, -1, timeout);
+            }
+        }
+
         if (fromTimeout && isDiscardOnCompletionTimeout()) {
             // discard due timeout
             LOG.debug("Aggregation for correlation key {} discarding aggregated exchange:
()", key, exchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=1310690&r1=1310689&r2=1310690&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Sat Apr  7 07:55:15 2012
@@ -32,9 +32,9 @@ public interface TimeoutAwareAggregation
      *
      * @param oldExchange  the current aggregated exchange, or the original {@link Exchange}
if no aggregation
      *                     has been done before the timeout occurred
-     * @param index        the index
-     * @param total        the total
-     * @param timeout      the timeout value in millis
+     * @param index        the index, may be <tt>-1</tt> if not possible to determine
the index
+     * @param total        the total, may be <tt>-1</tt> if not possible to determine
the total
+     * @param timeout      the timeout value in millis, may be <tt>-1</tt> if
not possible to determine the timeout
      */
     void timeout(Exchange oldExchange, int index, int total, long timeout);
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
(from r1310679, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java&r1=1310679&r2=1310690&rev=1310690&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
Sat Apr  7 07:55:15 2012
@@ -17,18 +17,22 @@
 package org.apache.camel.processor.aggregator;
 
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
 
 /**
  * @version 
  */
-public class AggregateDiscardOnTimeoutTest extends ContextTestSupport {
+public class AggregateTimeoutTest extends ContextTestSupport {
 
-    public void testAggregateDiscardOnTimeout() throws Exception {
+    private static final AtomicInteger invoked = new AtomicInteger();
+
+    public void testAggregateTimeout() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
         mock.expectedMessageCount(0);
 
@@ -40,6 +44,9 @@ public class AggregateDiscardOnTimeoutTe
 
         mock.assertIsSatisfied();
 
+        // should invoke the timeout method
+        assertEquals(1, invoked.get());
+
         // now send 3 which does not timeout
         mock.reset();
         mock.expectedBodiesReceived("C+D+E");
@@ -50,6 +57,9 @@ public class AggregateDiscardOnTimeoutTe
 
         // should complete before timeout
         mock.await(1500, TimeUnit.MILLISECONDS);
+
+        // should not invoke the timeout method
+        assertEquals(1, invoked.get());
     }
 
     @Override
@@ -57,17 +67,38 @@ public class AggregateDiscardOnTimeoutTe
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // START SNIPPET: e1
                 from("direct:start")
-                    .aggregate(header("id"), new BodyInAggregatingStrategy())
+                    .aggregate(header("id"), new MyAggregationStrategy())
+                        .discardOnCompletionTimeout()
                         .completionSize(3)
-                        // use a 3 second timeout
+                        // use a 2 second timeout
                         .completionTimeout(2000)
-                        // and if timeout occurred then just discard the aggregated message
-                        .discardOnCompletionTimeout()
                         .to("mock:aggregated");
-                // END SNIPPET: e1
             }
         };
     }
+
+    private static class MyAggregationStrategy implements TimeoutAwareAggregationStrategy
{
+
+        public void timeout(Exchange oldExchange, int index, int total, long timeout) {
+            invoked.incrementAndGet();
+
+            assertEquals(2000, timeout);
+            assertEquals(-1, total);
+            assertEquals(-1, index);
+            assertNotNull(oldExchange);
+            assertEquals("AB", oldExchange.getIn().getBody());
+        }
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
+            String body = oldExchange.getIn().getBody(String.class);
+            oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
+            return oldExchange;
+        }
+    }
+
 }



Mime
View raw message