camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r987166 - in /camel/branches/camel-1.x/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/processor/
Date Thu, 19 Aug 2010 14:44:53 GMT
Author: davsclaus
Date: Thu Aug 19 14:44:53 2010
New Revision: 987166

URL: http://svn.apache.org/viewvc?rev=987166&view=rev
Log:
CAMEL-3064: Aggregator now better handles thrown exception from custom aggregation strategy,
to avoid batch sender thread dying.

Added:
    camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorAggregationStrategyThrowExceptionTest.java
      - copied, changed from r965546, camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
Modified:
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=987166&r1=987165&r2=987166&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Thu Aug 19 14:44:53 2010
@@ -31,6 +31,8 @@ import org.apache.camel.impl.LoggingExce
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A base class for any kind of {@link Processor} which implements some kind of batch processing.
@@ -41,6 +43,7 @@ public class BatchProcessor extends Serv
 
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
+    protected static final transient Log LOG = LogFactory.getLog(BatchProcessor.class);
 
     private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
     private int batchSize = DEFAULT_BATCH_SIZE;
@@ -211,6 +214,8 @@ public class BatchProcessor extends Serv
             // Unlocking is important as the process of sending out the exchanges
             // would otherwise block new exchanges from being queued.
 
+            BatchProcessor.LOG.info("BatchSender thread +++ START +++");
+
             queueLock.lock();
             try {
                 do {
@@ -250,9 +255,15 @@ public class BatchProcessor extends Serv
 
                 } while (true);
 
+            } catch (Throwable t) {
+                BatchProcessor.LOG.error("BatchSender thread caught exception", t);
+                // a fail safe to handle all exceptions being thrown
+                getExceptionHandler().handleException(t);
             } finally {
                 queueLock.unlock();
             }
+
+            BatchProcessor.LOG.info("BatchSender thread +++ END +++");
         }
 
         /**

Modified: camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java?rev=987166&r1=987165&r2=987166&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
(original)
+++ camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
Thu Aug 19 14:44:53 2010
@@ -56,6 +56,10 @@ public class DefaultAggregationCollectio
     @Override
     public boolean add(Exchange exchange) {
         Object correlationKey = correlationExpression.evaluate(exchange);
+        if (correlationKey == null) {
+            // warn if correlation key could not be evaluated
+            LOG.warn("CorrelationKey was evaluated as null using expression: " + correlationExpression
+ " on Exchange: " + exchange);
+        }
         if (LOG.isDebugEnabled()) {
             LOG.debug("evaluated expression: " + correlationExpression + " as CorrelationKey:
" + correlationKey);
         }
@@ -68,7 +72,14 @@ public class DefaultAggregationCollectio
                 count = 1;
             }
             count++;
-            newExchange = aggregationStrategy.aggregate(oldExchange, newExchange);
+            try {
+                newExchange = aggregationStrategy.aggregate(oldExchange, newExchange);
+            } catch (Throwable t) {
+                LOG.error("Exception occurred in aggregation strategy: " + aggregationStrategy
+                    + " with oldExchange: " + oldExchange + " and newExchange: " + newExchange,
t);
+                // we cannot deal with this exception, as if we throw an exception the batch
sender
+                // thread will die, so at best log it and we lose that single message
+            }
             newExchange.setProperty(Exchange.AGGREGATED_COUNT, count);
         }
 

Copied: camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorAggregationStrategyThrowExceptionTest.java
(from r965546, camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java)
URL: http://svn.apache.org/viewvc/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorAggregationStrategyThrowExceptionTest.java?p2=camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorAggregationStrategyThrowExceptionTest.java&p1=camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java&r1=965546&r2=987166&rev=987166&view=diff
==============================================================================
--- camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorTest.java
(original)
+++ camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/AggregatorAggregationStrategyThrowExceptionTest.java
Thu Aug 19 14:44:53 2010
@@ -17,107 +17,53 @@
 package org.apache.camel.processor;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
 
 /**
  * @version $Revision$
  */
-public class AggregatorTest extends ContextTestSupport {
-    protected int messageCount = 100;
+public class AggregatorAggregationStrategyThrowExceptionTest extends ContextTestSupport {
 
-    public void testSendingLotsOfMessagesGetAggregatedToTheLatestMessage() throws Exception
{
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
+    public void testThrowException() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        resultEndpoint.expectedBodiesReceived("message:" + messageCount);
+        template.sendBody("direct:start", "A");
+        template.sendBody("direct:start", "B");
+        template.sendBody("direct:start", "C");
+        template.sendBody("direct:start", "D");
+        template.sendBody("direct:start", "E");
 
-        // lets send a large batch of messages
-        for (int i = 1; i <= messageCount; i++) {
-            String body = "message:" + i;
-            template.sendBodyAndHeader("direct:start", body, "cheese", 123);
-        }
-
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    public void testPredicate() throws Exception {
-        testSendALargeBatch("direct:predicate");
-    }
-    
-    public void testOutBatchPredicate() throws Exception {
-        testSendALargeBatch("direct:outBatchPredicate");
-    }
-
-    public void testOutBatchWithNoInBatching() throws Exception {
-        testSendALargeBatch("direct:outBatchNoInBatching");
-    }
-    
-    public void testOneMessage() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-
-        resultEndpoint.expectedMessageCount(1);
-        template.sendBodyAndHeader("direct:predicate", "test", "aggregated", 5);
-        resultEndpoint.assertIsSatisfied();
-    }
+        assertMockEndpointsSatisfied();
 
-    public void testBatchTimeoutExpiry() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-        resultEndpoint.setSleepForEmptyTest(2 * BatchProcessor.DEFAULT_BATCH_TIMEOUT);
-        template.sendBodyAndHeader("direct:start", "message:1", "cheese", 123);
-        resultEndpoint.assertIsSatisfied();
+        log.info("Test complete");
     }
 
-
-    public void testAggregatorNotAtStart() throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-        resultEndpoint.expectedMessageCount(1);
-        resultEndpoint.message(0).header("visited").isNotNull();
-        resultEndpoint.setSleepForEmptyTest(2 * BatchProcessor.DEFAULT_BATCH_TIMEOUT);
-        template.sendBodyAndHeader("seda:header", "message:1", "cheese", 123);
-        resultEndpoint.assertIsSatisfied();
-    }
-
-    protected RouteBuilder createRouteBuilder() {
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
-            public void configure() {
-
-                // START SNIPPET: ex
-                // in this route we aggregate all from direct:state based on the header id
cheese
-                from("direct:start").aggregator(header("cheese")).to("mock:result");
-
-                from("seda:header").setHeader("visited", constant(true)).aggregator(header("cheese")).to("mock:result");
-
-                // in this sample we aggreagte using our own startegy with a completion predicate
-                // stating that the aggregated header is equal to 5.
-                from("direct:predicate").aggregator(header("cheese"), new MyAggregationStrategy()).
-                    completedPredicate(header("aggregated").isEqualTo(5)).to("mock:result");
-                
-                // this sample is similar to the one above but it also illustrates the use
of outBatchSize 
-                // to send exchanges to mock:endpoint in batches of 10.  
-                from("direct:outBatchPredicate").aggregator(header("cheese"), new MyAggregationStrategy()).
-                    completedPredicate(header("aggregated").isEqualTo(5)).outBatchSize(10).to("mock:result");
-                // END SNIPPET: ex
-
-                // turning off in batching (batchSize = 1) is a good way to test "out" batching.
 Don't include
-                // in wiki snippet as it may not be a good example to follow.
-                from("direct:outBatchNoInBatching").aggregator(header("cheese"), new MyAggregationStrategy()).
-                completedPredicate(header("aggregated").isEqualTo(5)).batchSize(1).outBatchSize(10).to("mock:result");
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .aggregator(constant(true), new MyFlankyAggregationStrategy())
+                        .batchSize(5).batchTimeout(10000)
+                        .to("mock:result");
             }
         };
     }
-    
-    private void testSendALargeBatch(String endpointUri) throws Exception {
-        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
-
-        resultEndpoint.expectedMessageCount(messageCount / 5);
-        // lets send a large batch of messages
-        for (int i = 1; i <= messageCount; i++) {
-            String body = "message:" + i;
-            template.sendBodyAndHeader(endpointUri, body, "cheese", 123);
-        }
 
-        resultEndpoint.assertIsSatisfied();
-        
+    private final class MyFlankyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (newExchange != null) {
+                String old = oldExchange.getIn().getBody(String.class);
+                String body = old + newExchange.getIn().getBody(String.class);
+                oldExchange.getIn().setBody(body);
+            }
+
+            // simulate some error
+            throw new IllegalArgumentException("Damn");
+        }
     }
 }



Mime
View raw message