camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r1310710 - in /camel/trunk/camel-core/src/main/java/org/apache/camel/processor: Enricher.java MulticastProcessor.java PollEnricher.java aggregate/AggregateProcessor.java aggregate/TimeoutAwareAggregationStrategy.java
Date Sat, 07 Apr 2012 08:55:49 GMT
Author: davsclaus
Date: Sat Apr  7 08:55:48 2012
New Revision: 1310710

URL: http://svn.apache.org/viewvc?rev=1310710&view=rev
Log:
Improved catching errors from aggregation strategy.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Enricher.java Sat Apr
 7 08:55:48 2012
@@ -18,6 +18,7 @@ package org.apache.camel.processor;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
 import org.apache.camel.Producer;
@@ -160,7 +161,15 @@ public class Enricher extends ServiceSup
 
             // prepare the exchanges for aggregation
             ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-            Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+            // must catch any exception from aggregation
+            Exchange aggregatedExchange;
+            try {
+                aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+            } catch (Throwable e) {
+                exchange.setException(new CamelExchangeException("Error occurred during aggregation",
exchange, e));
+                callback.done(true);
+                return true;
+            }
             if (aggregatedExchange != null) {
                 // copy aggregation result onto original exchange (preserving pattern)
                 copyResultsPreservePattern(exchange, aggregatedExchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
Sat Apr  7 08:55:48 2012
@@ -649,6 +649,7 @@ public class MulticastProcessor extends 
                             return;
                         }
 
+                        // must catch any exceptions from aggregation
                         try {
                             doAggregate(getAggregationStrategy(subExchange), result, subExchange);
                         } catch (Throwable e) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java Sat
Apr  7 08:55:48 2012
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
@@ -140,7 +141,13 @@ public class PollEnricher extends Servic
 
             // prepare the exchanges for aggregation
             ExchangeHelper.prepareAggregation(exchange, resourceExchange);
-            Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+            // must catch any exception from aggregation
+            Exchange aggregatedExchange;
+            try {
+                aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
+            } catch (Throwable e) {
+                throw new CamelExchangeException("Error occurred during aggregation", exchange,
e);
+            }
             if (aggregatedExchange != null) {
                 // copy aggregation result onto original exchange (preserving pattern)
                 copyResultsPreservePattern(exchange, aggregatedExchange);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sat Apr  7 08:55:48 2012
@@ -240,7 +240,12 @@ public class AggregateProcessor extends 
 
         // prepare the exchanges for aggregation and aggregate it
         ExchangeHelper.prepareAggregation(oldExchange, newExchange);
-        answer = onAggregation(oldExchange, exchange);
+        // must catch any exception from aggregation
+        try {
+            answer = onAggregation(oldExchange, exchange);
+        } catch (Throwable e) {
+            throw new CamelExchangeException("Error occurred during aggregation", exchange,
e);
+        }
         if (answer == null) {
             throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy
+ " returned null which is not allowed", exchange);
         }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=1310710&r1=1310709&r2=1310710&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Sat Apr  7 08:55:48 2012
@@ -28,7 +28,9 @@ public interface TimeoutAwareAggregation
     // TODO: In Camel 3.0 we should move this to org.apache.camel package
 
     /**
-     * A timeout occurred
+     * A timeout occurred.
+     * <p/>
+     * <b>Important: </b> This method must <b>not</b> throw any exceptions.
      *
      * @param oldExchange  the current aggregated exchange, or the original {@link Exchange}
if no aggregation
      *                     has been done before the timeout occurred



Mime
View raw message