camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/2] camel git commit: CAMEL-10171 memory leak when continuation expires
Date Wed, 17 Aug 2016 07:52:49 GMT
CAMEL-10171 memory leak when continuation expires

setobject done earlier and exception set on camelExchange.
isExpired method call thru Continuation interface is cancelled and
below issues became invalid as discussed in CXF-7011

https://issues.apache.org/jira/browse/CXF-7002
https://issues.apache.org/jira/browse/CXF-7011

instead

such block means the same;

https://issues.apache.org/jira/browse/CXF-7011?focusedCommentId=15422696&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15422696


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e822ae58
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e822ae58
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e822ae58

Branch: refs/heads/camel-2.17.x
Commit: e822ae58c43aec84cc4d1ff64df008ea22ddd910
Parents: 19f619e
Author: önder sezgin <ondersezgin@gmail.com>
Authored: Tue Aug 16 22:22:46 2016 +0300
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Wed Aug 17 09:52:05 2016 +0200

----------------------------------------------------------------------
 .../org/apache/camel/component/cxf/CxfConsumer.java   | 14 +++++++++++++-
 .../camel/component/cxf/jaxrs/CxfRsInvoker.java       | 14 +++++++++++++-
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
index 11ec2f2..43561ba 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/CxfConsumer.java
@@ -26,6 +26,7 @@ import org.w3c.dom.Element;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Processor;
 import org.apache.camel.component.cxf.common.message.CxfConstants;
 import org.apache.camel.impl.DefaultConsumer;
@@ -149,6 +150,8 @@ public class CxfConsumer extends DefaultConsumer {
 
                     // The continuation could be called before the suspend is called
                     continuation.suspend(cxfEndpoint.getContinuationTimeout());
+                    
+                    continuation.setObject(camelExchange);
 
                     // use the asynchronous API to process the exchange
                     getAsyncProcessor().process(camelExchange, new AsyncCallback() {
@@ -157,7 +160,6 @@ public class CxfConsumer extends DefaultConsumer {
                             synchronized (continuation) {
                                 LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId());
                                 // resume processing after both, sync and async callbacks
-                                continuation.setObject(camelExchange);
                                 continuation.resume();
                             }
                         }
@@ -171,6 +173,16 @@ public class CxfConsumer extends DefaultConsumer {
                         CxfConsumer.this.doneUoW(camelExchange);
                     }
 
+                } else if (!continuation.isResumed() && !continuation.isPending())
{
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject();
+                    try {
+                        if (!continuation.isPending()) {
+                            camelExchange.setException(new ExchangeTimedOutException(camelExchange,
cxfEndpoint.getContinuationTimeout()));
+                        }
+                        setResponseBack(cxfExchange, camelExchange);
+                    } finally {
+                        CxfConsumer.this.doneUoW(camelExchange);
+                    }
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/e822ae58/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
----------------------------------------------------------------------
diff --git a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
index fb999f0..01563d3 100644
--- a/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
+++ b/components/camel-cxf/src/main/java/org/apache/camel/component/cxf/jaxrs/CxfRsInvoker.java
@@ -26,6 +26,7 @@ import javax.ws.rs.core.UriInfo;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationProvider;
@@ -90,13 +91,13 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 // The continuation could be called before the suspend is called
                 continuation.suspend(endpoint.getContinuationTimeout());
                 cxfExchange.put(SUSPENED, Boolean.TRUE);
+                continuation.setObject(camelExchange);
                 cxfRsConsumer.getAsyncProcessor().process(camelExchange, new AsyncCallback()
{
                     public void done(boolean doneSync) {
                         // make sure the continuation resume will not be called before the
suspend method in other thread
                         synchronized (continuation) {
                             LOG.trace("Resuming continuation of exchangeId: {}", camelExchange.getExchangeId());
                             // resume processing after both, sync and async callbacks
-                            continuation.setObject(camelExchange);
                             continuation.resume();
                         }
                     }
@@ -111,6 +112,17 @@ public class CxfRsInvoker extends JAXRSInvoker {
                 } finally {
                     cxfRsConsumer.doneUoW(camelExchange);
                 }
+            } else {
+                if (!continuation.isPending()) {
+                    cxfExchange.put(SUSPENED, Boolean.FALSE);
+                    org.apache.camel.Exchange camelExchange = (org.apache.camel.Exchange)continuation.getObject();
+                    camelExchange.setException(new ExchangeTimedOutException(camelExchange,
endpoint.getContinuationTimeout()));
+                    try {
+                        return returnResponse(cxfExchange, camelExchange);
+                    } finally {
+                        cxfRsConsumer.doneUoW(camelExchange);
+                    }
+                }
             }
         }
         return null;


Mime
View raw message