cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From serg...@apache.org
Subject svn commit: r1541983 - in /cxf/trunk: core/src/main/java/org/apache/cxf/interceptor/ core/src/main/java/org/apache/cxf/message/ core/src/main/java/org/apache/cxf/phase/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ rt/transports/http-jetty/src/...
Date Thu, 14 Nov 2013 17:19:41 GMT
Author: sergeyb
Date: Thu Nov 14 17:19:40 2013
New Revision: 1541983

URL: http://svn.apache.org/r1541983
Log:
[CXF-5373] More generic solution to a custom executor thread deadlock issue

Modified:
    cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
    cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java
    cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
    cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
    cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
(original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java
Thu Nov 14 17:19:40 2013
@@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionExc
 import java.util.concurrent.Executor;
 import java.util.concurrent.FutureTask;
 
-import org.apache.cxf.common.util.PropertyUtils;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
@@ -32,6 +31,7 @@ import org.apache.cxf.message.MessageCon
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.phase.AbstractPhaseInterceptor;
 import org.apache.cxf.phase.Phase;
+import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.service.Service;
 import org.apache.cxf.service.invoker.Invoker;
 
@@ -89,45 +89,51 @@ public class ServiceInvokerInterceptor e
         
         Executor executor = getExecutor(endpoint);
         Executor executor2 = exchange.get(Executor.class);
-        if (executor2 == executor || executor == null) {
+        if (executor2 == executor || executor == null
+            || !(message.getInterceptorChain() instanceof PhaseInterceptorChain)) {
             // already executing on the appropriate executor
             invocation.run();
         } else {
             exchange.put(Executor.class, executor);
-            FutureTask<Object> o = new FutureTask<Object>(invocation, null) {
+            // The current thread holds the lock on PhaseInterceptorChain.
+            // In order to avoid the executor threads deadlocking on any of
+            // synchronized PhaseInterceptorChain methods the current thread
+            // needs to release the chain lock and re-acquire it after the
+            // executor thread is done
+            
+            final PhaseInterceptorChain chain = (PhaseInterceptorChain)message.getInterceptorChain();
+            final FutureTask<Object> o = new FutureTask<Object>(invocation, null)
{
                 @Override
                 protected void done() {
                     super.done();
-                    synchronized (this) {
-                        this.notifyAll();
+                    chain.releaseChain();
+                }
+                
+                @Override
+                public void run() {
+                    synchronized (chain) {
+                        super.run();
                     }
                 }
             };
-            synchronized (o) {
+            synchronized (chain) {
                 executor.execute(o);
-                if (!o.isDone()) {
-                    try {
-                        o.wait();
-                    } catch (InterruptedException e) {
-                        //IGNORE
-                    }
-                }
-                try {
-                    o.get();
-                } catch (InterruptedException e) {
-                    throw new Fault(e);
-                } catch (ExecutionException e) {
-                    if (e.getCause() instanceof RuntimeException) {
-                        throw (RuntimeException)e.getCause();
-                    } else {
-                        throw new Fault(e.getCause());
-                    }
-                } finally {
-                    if (PropertyUtils.isTrue(exchange.remove(Message.SUSPENDED_INVOCATION)))
{    
-                        message.getInterceptorChain().suspend();
-                    }
+                // the task will already be done if the executor uses the current thread
+                // but the chain lock status still needs to be re-set
+                chain.releaseAndAcquireChain();
+            }
+            try {
+                o.get();
+            } catch (InterruptedException e) {
+                throw new Fault(e);
+            } catch (ExecutionException e) {
+                if (e.getCause() instanceof RuntimeException) {
+                    throw (RuntimeException)e.getCause();
+                } else {
+                    throw new Fault(e.getCause());
                 }
             }
+            
         }
     }
     

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java Thu Nov 14 17:19:40 2013
@@ -139,8 +139,6 @@ public interface Message extends StringM
     String WSDL_INTERFACE = "javax.xml.ws.wsdl.interface";
     String WSDL_OPERATION = "javax.xml.ws.wsdl.operation";
 
-    String SUSPENDED_INVOCATION = "org.apache.cxf.suspended.invocation";
-    
     /**
      * Some properties to allow adding interceptors to the chain
      * on a per-request basis.  All are a Collection<Interceptor> 

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Thu Nov 14
17:19:40 2013
@@ -93,7 +93,7 @@ public class PhaseInterceptorChain imple
     // to avoid duplicate fault processing on nested calling of
     // doIntercept(), which will throw same fault multi-times
     private boolean faultOccurred;
-    
+    private boolean chainReleased;
     
     
     private PhaseInterceptorChain(PhaseInterceptorChain src) {
@@ -161,6 +161,22 @@ public class PhaseInterceptorChain imple
         return state;
     }
     
+    public synchronized void releaseAndAcquireChain() {
+        while (!chainReleased) {
+            try {
+                this.wait();
+            } catch (InterruptedException ex) {
+                // ignore
+            }
+        }
+        chainReleased = false;
+    }
+    
+    public synchronized void releaseChain() {
+        this.chainReleased = true;
+        this.notifyAll();
+    }
+    
     public PhaseInterceptorChain cloneChain() {
         return new PhaseInterceptorChain(this);
     }

Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original)
+++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Thu Nov
14 17:19:40 2013
@@ -41,7 +41,6 @@ import org.apache.cxf.common.classloader
 import org.apache.cxf.common.i18n.BundleUtils;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.common.util.ClassHelper;
-import org.apache.cxf.common.util.PropertyUtils;
 import org.apache.cxf.helpers.CastUtils;
 import org.apache.cxf.interceptor.Fault;
 import org.apache.cxf.interceptor.InterceptorChain.State;
@@ -104,8 +103,7 @@ public class JAXRSInvoker extends Abstra
             }
             return handleFault(ex, exchange.getInMessage());
         } finally {
-            boolean suspended = PropertyUtils.isTrue(exchange.get(Message.SUSPENDED_INVOCATION))
-                || exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED;
+            boolean suspended = exchange.getInMessage().getInterceptorChain().getState()
== State.SUSPENDED;
             if (exchange.isOneWay() || suspended) {
                 ServerProviderFactory.getInstance(exchange.getInMessage()).clearThreadLocalProxies();
             }

Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
(original)
+++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java
Thu Nov 14 17:19:40 2013
@@ -25,7 +25,6 @@ import javax.servlet.http.HttpServletRes
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.continuations.ContinuationCallback;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.transport.http.AbstractHTTPDestination;
 import org.eclipse.jetty.continuation.ContinuationListener;
 import org.eclipse.jetty.continuation.ContinuationSupport;
@@ -101,13 +100,8 @@ public class JettyContinuationWrapper im
         }
         isNew = false;
         
-        if (PhaseInterceptorChain.getCurrentMessage() == null) {
-            // the current thread is different to the one which holds a lock on PhaseInterceptorChain

-            message.getExchange().put(Message.SUSPENDED_INVOCATION, true);
-        } else {
-            // Need to get the right message which is handled in the interceptor chain
-            message.getExchange().getInMessage().getInterceptorChain().suspend();
-        }
+        message.getExchange().getInMessage().getInterceptorChain().suspend();
+        
         continuation.setTimeout(pendingTimeout);
         if (!isPending) {
             continuation.suspend();

Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
(original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java
Thu Nov 14 17:19:40 2013
@@ -31,7 +31,6 @@ import org.apache.cxf.continuations.Cont
 import org.apache.cxf.continuations.ContinuationCallback;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.PhaseInterceptorChain;
 
 /**
  * 
@@ -100,13 +99,7 @@ public class Servlet3ContinuationProvide
             isNew = false;
             
             context.setTimeout(timeout);
-            if (PhaseInterceptorChain.getCurrentMessage() == null) {
-                // the current thread is different to the one which holds a lock on PhaseInterceptorChain

-                inMessage.getExchange().put(Message.SUSPENDED_INVOCATION, true);
-            } else {
-                // Need to get the right message which is handled in the interceptor chain
-                inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
-            }
+            inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
             
             return true;
         }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=1541983&r1=1541982&r2=1541983&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java
Thu Nov 14 17:19:40 2013
@@ -29,7 +29,6 @@ import org.apache.cxf.common.classloader
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.continuations.Continuation;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.phase.PhaseInterceptorChain;
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.jms.JMSConfiguration;
 import org.apache.cxf.workqueue.WorkQueue;
@@ -139,13 +138,8 @@ public class JMSContinuation implements 
         if (isPending) {
             return false;
         }
-        if (PhaseInterceptorChain.getCurrentMessage() == null) {
-            // the current thread is different to the one which holds a lock on PhaseInterceptorChain

-            inMessage.getExchange().put(Message.SUSPENDED_INVOCATION, true);
-        } else {
-            // Need to get the right message which is handled in the interceptor chain
-            inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
-        }
+        inMessage.getExchange().getInMessage().getInterceptorChain().suspend();
+        
         updateContinuations(false);
                 
         isNew = false;



Mime
View raw message