Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CC09110097 for ; Thu, 14 Nov 2013 17:20:24 +0000 (UTC) Received: (qmail 67909 invoked by uid 500); 14 Nov 2013 17:20:16 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 67753 invoked by uid 500); 14 Nov 2013 17:20:08 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 67736 invoked by uid 99); 14 Nov 2013 17:20:07 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Nov 2013 17:20:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 14 Nov 2013 17:20:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 917B52388994; Thu, 14 Nov 2013 17:19:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1541983 - in /cxf/trunk: core/src/main/java/org/apache/cxf/interceptor/ core/src/main/java/org/apache/cxf/message/ core/src/main/java/org/apache/cxf/phase/ rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/ rt/transports/http-jetty/src/... Date: Thu, 14 Nov 2013 17:19:41 -0000 To: commits@cxf.apache.org From: sergeyb@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20131114171941.917B52388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sergeyb Date: Thu Nov 14 17:19:40 2013 New Revision: 1541983 URL: http://svn.apache.org/r1541983 Log: [CXF-5373] More generic solution to a custom executor thread deadlock issue Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Modified: cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java (original) +++ cxf/trunk/core/src/main/java/org/apache/cxf/interceptor/ServiceInvokerInterceptor.java Thu Nov 14 17:19:40 2013 @@ -24,7 +24,6 @@ import java.util.concurrent.ExecutionExc import java.util.concurrent.Executor; import java.util.concurrent.FutureTask; -import org.apache.cxf.common.util.PropertyUtils; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; @@ -32,6 +31,7 @@ import org.apache.cxf.message.MessageCon import org.apache.cxf.message.MessageImpl; import org.apache.cxf.phase.AbstractPhaseInterceptor; import org.apache.cxf.phase.Phase; +import org.apache.cxf.phase.PhaseInterceptorChain; import org.apache.cxf.service.Service; import org.apache.cxf.service.invoker.Invoker; @@ -89,45 +89,51 @@ public class ServiceInvokerInterceptor e Executor executor = getExecutor(endpoint); Executor executor2 = exchange.get(Executor.class); - if (executor2 == executor || executor == null) { + if (executor2 == executor || executor == null + || !(message.getInterceptorChain() instanceof PhaseInterceptorChain)) { // already executing on the appropriate executor invocation.run(); } else { exchange.put(Executor.class, executor); - FutureTask o = new FutureTask(invocation, null) { + // The current thread holds the lock on PhaseInterceptorChain. + // In order to avoid the executor threads deadlocking on any of + // synchronized PhaseInterceptorChain methods the current thread + // needs to release the chain lock and re-acquire it after the + // executor thread is done + + final PhaseInterceptorChain chain = (PhaseInterceptorChain)message.getInterceptorChain(); + final FutureTask o = new FutureTask(invocation, null) { @Override protected void done() { super.done(); - synchronized (this) { - this.notifyAll(); + chain.releaseChain(); + } + + @Override + public void run() { + synchronized (chain) { + super.run(); } } }; - synchronized (o) { + synchronized (chain) { executor.execute(o); - if (!o.isDone()) { - try { - o.wait(); - } catch (InterruptedException e) { - //IGNORE - } - } - try { - o.get(); - } catch (InterruptedException e) { - throw new Fault(e); - } catch (ExecutionException e) { - if (e.getCause() instanceof RuntimeException) { - throw (RuntimeException)e.getCause(); - } else { - throw new Fault(e.getCause()); - } - } finally { - if (PropertyUtils.isTrue(exchange.remove(Message.SUSPENDED_INVOCATION))) { - message.getInterceptorChain().suspend(); - } + // the task will already be done if the executor uses the current thread + // but the chain lock status still needs to be re-set + chain.releaseAndAcquireChain(); + } + try { + o.get(); + } catch (InterruptedException e) { + throw new Fault(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof RuntimeException) { + throw (RuntimeException)e.getCause(); + } else { + throw new Fault(e.getCause()); } } + } } Modified: cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java (original) +++ cxf/trunk/core/src/main/java/org/apache/cxf/message/Message.java Thu Nov 14 17:19:40 2013 @@ -139,8 +139,6 @@ public interface Message extends StringM String WSDL_INTERFACE = "javax.xml.ws.wsdl.interface"; String WSDL_OPERATION = "javax.xml.ws.wsdl.operation"; - String SUSPENDED_INVOCATION = "org.apache.cxf.suspended.invocation"; - /** * Some properties to allow adding interceptors to the chain * on a per-request basis. All are a Collection Modified: cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original) +++ cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Thu Nov 14 17:19:40 2013 @@ -93,7 +93,7 @@ public class PhaseInterceptorChain imple // to avoid duplicate fault processing on nested calling of // doIntercept(), which will throw same fault multi-times private boolean faultOccurred; - + private boolean chainReleased; private PhaseInterceptorChain(PhaseInterceptorChain src) { @@ -161,6 +161,22 @@ public class PhaseInterceptorChain imple return state; } + public synchronized void releaseAndAcquireChain() { + while (!chainReleased) { + try { + this.wait(); + } catch (InterruptedException ex) { + // ignore + } + } + chainReleased = false; + } + + public synchronized void releaseChain() { + this.chainReleased = true; + this.notifyAll(); + } + public PhaseInterceptorChain cloneChain() { return new PhaseInterceptorChain(this); } Modified: cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java (original) +++ cxf/trunk/rt/frontend/jaxrs/src/main/java/org/apache/cxf/jaxrs/JAXRSInvoker.java Thu Nov 14 17:19:40 2013 @@ -41,7 +41,6 @@ import org.apache.cxf.common.classloader import org.apache.cxf.common.i18n.BundleUtils; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.common.util.ClassHelper; -import org.apache.cxf.common.util.PropertyUtils; import org.apache.cxf.helpers.CastUtils; import org.apache.cxf.interceptor.Fault; import org.apache.cxf.interceptor.InterceptorChain.State; @@ -104,8 +103,7 @@ public class JAXRSInvoker extends Abstra } return handleFault(ex, exchange.getInMessage()); } finally { - boolean suspended = PropertyUtils.isTrue(exchange.get(Message.SUSPENDED_INVOCATION)) - || exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED; + boolean suspended = exchange.getInMessage().getInterceptorChain().getState() == State.SUSPENDED; if (exchange.isOneWay() || suspended) { ServerProviderFactory.getInstance(exchange.getInMessage()).clearThreadLocalProxies(); } Modified: cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java (original) +++ cxf/trunk/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/continuations/JettyContinuationWrapper.java Thu Nov 14 17:19:40 2013 @@ -25,7 +25,6 @@ import javax.servlet.http.HttpServletRes import org.apache.cxf.continuations.Continuation; import org.apache.cxf.continuations.ContinuationCallback; import org.apache.cxf.message.Message; -import org.apache.cxf.phase.PhaseInterceptorChain; import org.apache.cxf.transport.http.AbstractHTTPDestination; import org.eclipse.jetty.continuation.ContinuationListener; import org.eclipse.jetty.continuation.ContinuationSupport; @@ -101,13 +100,8 @@ public class JettyContinuationWrapper im } isNew = false; - if (PhaseInterceptorChain.getCurrentMessage() == null) { - // the current thread is different to the one which holds a lock on PhaseInterceptorChain - message.getExchange().put(Message.SUSPENDED_INVOCATION, true); - } else { - // Need to get the right message which is handled in the interceptor chain - message.getExchange().getInMessage().getInterceptorChain().suspend(); - } + message.getExchange().getInMessage().getInterceptorChain().suspend(); + continuation.setTimeout(pendingTimeout); if (!isPending) { continuation.suspend(); Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java (original) +++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/Servlet3ContinuationProvider.java Thu Nov 14 17:19:40 2013 @@ -31,7 +31,6 @@ import org.apache.cxf.continuations.Cont import org.apache.cxf.continuations.ContinuationCallback; import org.apache.cxf.continuations.ContinuationProvider; import org.apache.cxf.message.Message; -import org.apache.cxf.phase.PhaseInterceptorChain; /** * @@ -100,13 +99,7 @@ public class Servlet3ContinuationProvide isNew = false; context.setTimeout(timeout); - if (PhaseInterceptorChain.getCurrentMessage() == null) { - // the current thread is different to the one which holds a lock on PhaseInterceptorChain - inMessage.getExchange().put(Message.SUSPENDED_INVOCATION, true); - } else { - // Need to get the right message which is handled in the interceptor chain - inMessage.getExchange().getInMessage().getInterceptorChain().suspend(); - } + inMessage.getExchange().getInMessage().getInterceptorChain().suspend(); return true; } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java?rev=1541983&r1=1541982&r2=1541983&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/continuations/JMSContinuation.java Thu Nov 14 17:19:40 2013 @@ -29,7 +29,6 @@ import org.apache.cxf.common.classloader import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.continuations.Continuation; import org.apache.cxf.message.Message; -import org.apache.cxf.phase.PhaseInterceptorChain; import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.transport.jms.JMSConfiguration; import org.apache.cxf.workqueue.WorkQueue; @@ -139,13 +138,8 @@ public class JMSContinuation implements if (isPending) { return false; } - if (PhaseInterceptorChain.getCurrentMessage() == null) { - // the current thread is different to the one which holds a lock on PhaseInterceptorChain - inMessage.getExchange().put(Message.SUSPENDED_INVOCATION, true); - } else { - // Need to get the right message which is handled in the interceptor chain - inMessage.getExchange().getInMessage().getInterceptorChain().suspend(); - } + inMessage.getExchange().getInMessage().getInterceptorChain().suspend(); + updateContinuations(false); isNew = false;