camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/4] camel git commit: CAMEL-8033: Exchange Leak Caused By pollEnrich
Date Tue, 11 Nov 2014 10:54:26 GMT
CAMEL-8033: Exchange Leak Caused By pollEnrich

Conflicts:
	camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2260a1d4
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2260a1d4
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2260a1d4

Branch: refs/heads/camel-2.13.x
Commit: 2260a1d4e32a3cb47a6ee898bc631767e1b873c1
Parents: bfcc735
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Tue Nov 11 11:45:10 2014 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Tue Nov 11 11:52:01 2014 +0100

----------------------------------------------------------------------
 .../apache/camel/processor/PollEnricher.java    | 66 +++++++++++---------
 1 file changed, 36 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2260a1d4/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
index 34d445a..22bb098 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/PollEnricher.java
@@ -130,30 +130,36 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor
{
         }
 
         Exchange resourceExchange;
-        if (timeout < 0) {
-            LOG.debug("Consumer receive: {}", consumer);
-            resourceExchange = consumer.receive();
-        } else if (timeout == 0) {
-            LOG.debug("Consumer receiveNoWait: {}", consumer);
-            resourceExchange = consumer.receiveNoWait();
-        } else {
-            LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
-            resourceExchange = consumer.receive(timeout);
-        }
+        try {
+            if (timeout < 0) {
+                LOG.debug("Consumer receive: {}", consumer);
+                resourceExchange = consumer.receive();
+            } else if (timeout == 0) {
+                LOG.debug("Consumer receiveNoWait: {}", consumer);
+                resourceExchange = consumer.receiveNoWait();
+            } else {
+                LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
+                resourceExchange = consumer.receive(timeout);
+            }
 
-        if (resourceExchange == null) {
-            LOG.debug("Consumer received no exchange");
-        } else {
-            LOG.debug("Consumer received: {}", resourceExchange);
+            if (resourceExchange == null) {
+                LOG.debug("Consumer received no exchange");
+            } else {
+                LOG.debug("Consumer received: {}", resourceExchange);
+            }
+        } catch (Exception e) {
+            exchange.setException(new CamelExchangeException("Error during poll", exchange,
e));
+            callback.done(true);
+            return true;
         }
 
-        if (resourceExchange != null && resourceExchange.isFailed()) {
-            // copy resource exchange onto original exchange (preserving pattern)
-            copyResultsPreservePattern(exchange, resourceExchange);
-        } else {
-            prepareResult(exchange);
+        try {
+            if (resourceExchange != null && resourceExchange.isFailed()) {
+                // copy resource exchange onto original exchange (preserving pattern)
+                copyResultsPreservePattern(exchange, resourceExchange);
+            } else {
+                prepareResult(exchange);
 
-            try {
                 // prepare the exchanges for aggregation
                 ExchangeHelper.prepareAggregation(exchange, resourceExchange);
                 // must catch any exception from aggregation
@@ -166,18 +172,18 @@ public class PollEnricher extends ServiceSupport implements AsyncProcessor
{
                         resourceExchange.handoverCompletions(exchange);
                     }
                 }
-            } catch (Throwable e) {
-                exchange.setException(new CamelExchangeException("Error occurred during aggregation",
exchange, e));
-                callback.done(true);
-                return true;
             }
-        }
 
-        // set header with the uri of the endpoint enriched so we can use that for tracing
etc
-        if (exchange.hasOut()) {
-            exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
-        } else {
-            exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+            // set header with the uri of the endpoint enriched so we can use that for tracing
etc
+            if (exchange.hasOut()) {
+                exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+            } else {
+                exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
+            }
+        } catch (Throwable e) {
+            exchange.setException(new CamelExchangeException("Error occurred during aggregation",
exchange, e));
+            callback.done(true);
+            return true;
         }
 
         callback.done(true);


Mime
View raw message