Return-Path: Delivered-To: apmail-incubator-cxf-commits-archive@locus.apache.org Received: (qmail 28942 invoked from network); 5 Dec 2007 21:42:22 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Dec 2007 21:42:22 -0000 Received: (qmail 53520 invoked by uid 500); 5 Dec 2007 21:42:10 -0000 Delivered-To: apmail-incubator-cxf-commits-archive@incubator.apache.org Received: (qmail 53405 invoked by uid 500); 5 Dec 2007 21:42:10 -0000 Mailing-List: contact cxf-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: cxf-dev@incubator.apache.org Delivered-To: mailing list cxf-commits@incubator.apache.org Received: (qmail 53396 invoked by uid 99); 5 Dec 2007 21:42:10 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2007 13:42:10 -0800 X-ASF-Spam-Status: No, hits=-98.0 required=10.0 tests=ALL_TRUSTED,URIBL_BLACK X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Dec 2007 21:41:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 201AA1A983A; Wed, 5 Dec 2007 13:42:00 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r601540 - in /incubator/cxf/branches/2.0.x-fixes: ./ api/src/main/java/org/apache/cxf/ rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/core/src/main/java/org/apache/cxf/interce... Date: Wed, 05 Dec 2007 21:39:23 -0000 To: cxf-commits@incubator.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20071205214200.201AA1A983A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dkulp Date: Wed Dec 5 13:38:31 2007 New Revision: 601540 URL: http://svn.apache.org/viewvc?rev=601540&view=rev Log: Merged revisions 601535 via svnmerge from https://svn.apache.org/repos/asf/incubator/cxf/trunk ........ r601535 | dkulp | 2007-12-05 16:27:05 -0500 (Wed, 05 Dec 2007) | 6 lines [CXF-1264] Make sure the ThreadDefaultBus is propery set when doing dispatches of messages so that handlers, interceptors, services, etc.... that may create a new client or similar will get the correct bus. Also, ThreadLocals are thread safe, we don't need to synchronize to access them. ........ Modified: incubator/cxf/branches/2.0.x-fixes/ (props changed) incubator/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/BusFactory.java incubator/cxf/branches/2.0.x-fixes/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java incubator/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java incubator/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Propchange: incubator/cxf/branches/2.0.x-fixes/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: incubator/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/BusFactory.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/BusFactory.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/BusFactory.java (original) +++ incubator/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/BusFactory.java Wed Dec 5 13:38:31 2007 @@ -85,7 +85,7 @@ * Sets the default bus for the thread. * @param bus the default bus. */ - public static synchronized void setThreadDefaultBus(Bus bus) { + public static void setThreadDefaultBus(Bus bus) { localBus.set(bus); } @@ -93,9 +93,17 @@ * Gets the default bus for the thread. * @return the default bus. */ - public static synchronized Bus getThreadDefaultBus() { - if (localBus.get() == null) { - Bus b = getDefaultBus(); + public static Bus getThreadDefaultBus() { + return getThreadDefaultBus(true); + } + /** + * Gets the default bus for the thread, creating if needed + * @param createIfNeeded Set to true to create a default bus if one doesn't exist + * @return the default bus. + */ + public static Bus getThreadDefaultBus(boolean createIfNeeded) { + if (createIfNeeded && localBus.get() == null) { + Bus b = getDefaultBus(createIfNeeded); localBus.set(b); } return localBus.get(); Modified: incubator/cxf/branches/2.0.x-fixes/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/bindings/coloc/src/main/java/org/apache/cxf/binding/coloc/ColocMessageObserver.java Wed Dec 5 13:38:31 2007 @@ -28,6 +28,7 @@ import javax.xml.namespace.QName; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.binding.Binding; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Endpoint; @@ -55,44 +56,50 @@ } public void onMessage(Message m) { - if (LOG.isLoggable(Level.FINER)) { - LOG.finer("Processing Message at collocated endpoint. Request message: " + m); - } - Exchange ex = new ExchangeImpl(); - setExchangeProperties(ex, m); - - Message inMsg = endpoint.getBinding().createMessage(); - MessageImpl.copyContent(m, inMsg); - - //Copy Request Context to Server inBound Message - //TODO a Context Filter Strategy required. - inMsg.putAll(m); - - inMsg.put(COLOCATED, Boolean.TRUE); - inMsg.put(Message.REQUESTOR_ROLE, Boolean.FALSE); - inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE); - OperationInfo oi = ex.get(OperationInfo.class); - if (oi != null) { - inMsg.put(MessageInfo.class, oi.getInput()); - } - ex.setInMessage(inMsg); - inMsg.setExchange(ex); - - if (LOG.isLoggable(Level.FINEST)) { - LOG.finest("Build inbound interceptor chain."); + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); + try { + if (LOG.isLoggable(Level.FINER)) { + LOG.finer("Processing Message at collocated endpoint. Request message: " + m); + } + Exchange ex = new ExchangeImpl(); + setExchangeProperties(ex, m); + + Message inMsg = endpoint.getBinding().createMessage(); + MessageImpl.copyContent(m, inMsg); + + //Copy Request Context to Server inBound Message + //TODO a Context Filter Strategy required. + inMsg.putAll(m); + + inMsg.put(COLOCATED, Boolean.TRUE); + inMsg.put(Message.REQUESTOR_ROLE, Boolean.FALSE); + inMsg.put(Message.INBOUND_MESSAGE, Boolean.TRUE); + OperationInfo oi = ex.get(OperationInfo.class); + if (oi != null) { + inMsg.put(MessageInfo.class, oi.getInput()); + } + ex.setInMessage(inMsg); + inMsg.setExchange(ex); + + if (LOG.isLoggable(Level.FINEST)) { + LOG.finest("Build inbound interceptor chain."); + } + + //Add all interceptors between USER_LOGICAL and INVOKE. + SortedSet phases = new TreeSet(bus.getExtension(PhaseManager.class).getInPhases()); + ColocUtil.setPhases(phases, Phase.USER_LOGICAL, Phase.INVOKE); + InterceptorChain chain = ColocUtil.getInInterceptorChain(ex, phases); + chain.add(addColocInterceptors()); + inMsg.setInterceptorChain(chain); + + chain.doIntercept(inMsg); + + //Set Server OutBound Message onto InBound Exchange. + setOutBoundMessage(ex, m.getExchange()); + } finally { + BusFactory.setThreadDefaultBus(origBus); } - - //Add all interceptors between USER_LOGICAL and INVOKE. - SortedSet phases = new TreeSet(bus.getExtension(PhaseManager.class).getInPhases()); - ColocUtil.setPhases(phases, Phase.USER_LOGICAL, Phase.INVOKE); - InterceptorChain chain = ColocUtil.getInInterceptorChain(ex, phases); - chain.add(addColocInterceptors()); - inMsg.setInterceptorChain(chain); - - chain.doIntercept(inMsg); - - //Set Server OutBound Message onto InBound Exchange. - setOutBoundMessage(ex, m.getExchange()); } protected void setOutBoundMessage(Exchange from, Exchange to) { Modified: incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Wed Dec 5 13:38:31 2007 @@ -226,101 +226,109 @@ Object[] params, Map context, Exchange exchange) throws Exception { - if (exchange == null) { - exchange = new ExchangeImpl(); - } - Endpoint endpoint = getEndpoint(); - - Map requestContext = null; - Map responseContext = null; - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("Invoke, operation info: " + oi + ", params: " + params); - } - Message message = endpoint.getBinding().createMessage(); - if (null != context) { - requestContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT)); - responseContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT)); - message.put(Message.INVOCATION_CONTEXT, context); - } - //setup the message context - setContext(requestContext, message); - setParameters(params, message); - - if (null != requestContext) { - exchange.putAll(requestContext); - } - exchange.setOneWay(oi.getOutput() == null); - - exchange.setOutMessage(message); - - setOutMessageProperties(message, oi); - setExchangeProperties(exchange, endpoint, oi); - - // setup chain - - PhaseInterceptorChain chain = setupInterceptorChain(endpoint); - message.setInterceptorChain(chain); - - modifyChain(chain, requestContext); - chain.setFaultObserver(outFaultObserver); - - // setup conduit selector - prepareConduitSelector(message); - - // execute chain - chain.doIntercept(message); - + + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); + try { - // Check to see if there is a Fault from the outgoing chain - Exception ex = message.getContent(Exception.class); - boolean mepCompleteCalled = false; - if (ex != null) { - getConduitSelector().complete(exchange); - mepCompleteCalled = true; - if (message.getContent(Exception.class) != null) { - throw ex; + if (exchange == null) { + exchange = new ExchangeImpl(); } - } - ex = message.getExchange().get(Exception.class); - if (ex != null) { - if (!mepCompleteCalled) { + Endpoint endpoint = getEndpoint(); + + Map requestContext = null; + Map responseContext = null; + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("Invoke, operation info: " + oi + ", params: " + params); + } + Message message = endpoint.getBinding().createMessage(); + if (null != context) { + requestContext = CastUtils.cast((Map)context.get(REQUEST_CONTEXT)); + responseContext = CastUtils.cast((Map)context.get(RESPONSE_CONTEXT)); + message.put(Message.INVOCATION_CONTEXT, context); + } + //setup the message context + setContext(requestContext, message); + setParameters(params, message); + + if (null != requestContext) { + exchange.putAll(requestContext); + } + exchange.setOneWay(oi.getOutput() == null); + + exchange.setOutMessage(message); + + setOutMessageProperties(message, oi); + setExchangeProperties(exchange, endpoint, oi); + + // setup chain + + PhaseInterceptorChain chain = setupInterceptorChain(endpoint); + message.setInterceptorChain(chain); + + modifyChain(chain, requestContext); + chain.setFaultObserver(outFaultObserver); + + // setup conduit selector + prepareConduitSelector(message); + + // execute chain + chain.doIntercept(message); + + + // Check to see if there is a Fault from the outgoing chain + Exception ex = message.getContent(Exception.class); + boolean mepCompleteCalled = false; + if (ex != null) { getConduitSelector().complete(exchange); + mepCompleteCalled = true; + if (message.getContent(Exception.class) != null) { + throw ex; + } } - throw ex; - } - - // Wait for a response if we need to - if (!oi.getOperationInfo().isOneWay()) { - synchronized (exchange) { - waitResponse(exchange); + ex = message.getExchange().get(Exception.class); + if (ex != null) { + if (!mepCompleteCalled) { + getConduitSelector().complete(exchange); + } + throw ex; } - } - getConduitSelector().complete(exchange); - - // Grab the response objects if there are any - List resList = null; - Message inMsg = exchange.getInMessage(); - if (inMsg != null) { - if (null != responseContext) { - responseContext.putAll(inMsg); - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("set responseContext to be" + responseContext); + + // Wait for a response if we need to + if (!oi.getOperationInfo().isOneWay()) { + synchronized (exchange) { + waitResponse(exchange); } } - resList = inMsg.getContent(List.class); - } - - // check for an incoming fault - ex = getException(exchange); - - if (ex != null) { - throw ex; - } - - if (resList != null) { - return resList.toArray(); + getConduitSelector().complete(exchange); + + // Grab the response objects if there are any + List resList = null; + Message inMsg = exchange.getInMessage(); + if (inMsg != null) { + if (null != responseContext) { + responseContext.putAll(inMsg); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("set responseContext to be" + responseContext); + } + } + resList = inMsg.getContent(List.class); + } + + // check for an incoming fault + ex = getException(exchange); + + if (ex != null) { + throw ex; + } + + if (resList != null) { + return resList.toArray(); + } + return null; + } finally { + BusFactory.setThreadDefaultBus(origBus); } - return null; } protected Exception getException(Exchange exchange) { @@ -405,6 +413,8 @@ chain.setFaultObserver(outFaultObserver); + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); // execute chain try { String startingAfterInterceptorID = (String) message.get( @@ -426,6 +436,7 @@ message.getExchange().notifyAll(); } } + BusFactory.setThreadDefaultBus(origBus); } } Modified: incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/interceptor/AbstractFaultChainInitiatorObserver.java Wed Dec 5 13:38:31 2007 @@ -24,6 +24,7 @@ import java.util.logging.Logger; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.message.Exchange; @@ -47,50 +48,58 @@ public void onMessage(Message message) { assert null != message; - Exchange exchange = message.getExchange(); - - Message faultMessage = null; - - // now that we have switched over to the fault chain, - // prevent any further operations on the in/out message - - if (isOutboundObserver()) { - Exception ex = message.getContent(Exception.class); - if (!(ex instanceof Fault)) { - ex = new Fault(ex); - } - FaultMode mode = (FaultMode)message.get(FaultMode.class); - - faultMessage = exchange.getOutMessage(); - if (null == faultMessage) { - faultMessage = exchange.get(Endpoint.class).getBinding().createMessage(); - } - faultMessage.setContent(Exception.class, ex); - if (null != mode) { - faultMessage.put(FaultMode.class, mode); - } - exchange.setOutMessage(null); - exchange.setOutFaultMessage(faultMessage); - if (message.get(BindingFaultInfo.class) != null) { - faultMessage.put(BindingFaultInfo.class, message.get(BindingFaultInfo.class)); - } - } else { - faultMessage = message; - exchange.setInMessage(null); - exchange.setInFaultMessage(faultMessage); - } - - - // setup chain - PhaseInterceptorChain chain = new PhaseInterceptorChain(getPhases()); - initializeInterceptors(faultMessage.getExchange(), chain); - faultMessage.setInterceptorChain(chain); + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); try { - chain.doIntercept(faultMessage); - } catch (Exception exc) { - LogUtils.log(LOG, Level.SEVERE, "Error occurred during error handling, give up!", exc); - throw new RuntimeException(exc.getCause()); + + Exchange exchange = message.getExchange(); + + Message faultMessage = null; + + // now that we have switched over to the fault chain, + // prevent any further operations on the in/out message + + if (isOutboundObserver()) { + Exception ex = message.getContent(Exception.class); + if (!(ex instanceof Fault)) { + ex = new Fault(ex); + } + FaultMode mode = (FaultMode)message.get(FaultMode.class); + + faultMessage = exchange.getOutMessage(); + if (null == faultMessage) { + faultMessage = exchange.get(Endpoint.class).getBinding().createMessage(); + } + faultMessage.setContent(Exception.class, ex); + if (null != mode) { + faultMessage.put(FaultMode.class, mode); + } + exchange.setOutMessage(null); + exchange.setOutFaultMessage(faultMessage); + if (message.get(BindingFaultInfo.class) != null) { + faultMessage.put(BindingFaultInfo.class, message.get(BindingFaultInfo.class)); + } + } else { + faultMessage = message; + exchange.setInMessage(null); + exchange.setInFaultMessage(faultMessage); + } + + + // setup chain + PhaseInterceptorChain chain = new PhaseInterceptorChain(getPhases()); + initializeInterceptors(faultMessage.getExchange(), chain); + + faultMessage.setInterceptorChain(chain); + try { + chain.doIntercept(faultMessage); + } catch (Exception exc) { + LogUtils.log(LOG, Level.SEVERE, "Error occurred during error handling, give up!", exc); + throw new RuntimeException(exc.getCause()); + } + } finally { + BusFactory.setThreadDefaultBus(origBus); } } Modified: incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/ChainInitiationObserver.java Wed Dec 5 13:38:31 2007 @@ -26,6 +26,7 @@ import javax.xml.namespace.QName; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.binding.Binding; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.message.Exchange; @@ -50,27 +51,33 @@ } public void onMessage(Message m) { - Message message = getBinding().createMessage(m); - Exchange exchange = message.getExchange(); - if (exchange == null) { - exchange = new ExchangeImpl(); - exchange.setInMessage(message); + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); + try { + Message message = getBinding().createMessage(m); + Exchange exchange = message.getExchange(); + if (exchange == null) { + exchange = new ExchangeImpl(); + exchange.setInMessage(message); + } + setExchangeProperties(exchange, message); + + // setup chain + PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(), + bus.getInInterceptors(), + endpoint.getInInterceptors(), + getBinding().getInInterceptors(), + endpoint.getService().getInInterceptors()); + + + message.setInterceptorChain(chain); + + chain.setFaultObserver(endpoint.getOutFaultObserver()); + + chain.doIntercept(message); + } finally { + BusFactory.setThreadDefaultBus(origBus); } - setExchangeProperties(exchange, message); - - // setup chain - PhaseInterceptorChain chain = chainCache.get(bus.getExtension(PhaseManager.class).getInPhases(), - bus.getInInterceptors(), - endpoint.getInInterceptors(), - getBinding().getInInterceptors(), - endpoint.getService().getInInterceptors()); - - - message.setInterceptorChain(chain); - - chain.setFaultObserver(endpoint.getOutFaultObserver()); - - chain.doIntercept(message); } Modified: incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/transport/MultipleEndpointObserver.java Wed Dec 5 13:38:31 2007 @@ -24,6 +24,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.endpoint.Endpoint; import org.apache.cxf.interceptor.Interceptor; import org.apache.cxf.message.Exchange; @@ -54,32 +55,38 @@ } public void onMessage(Message message) { - message = createMessage(message); - Exchange exchange = message.getExchange(); - if (exchange == null) { - exchange = new ExchangeImpl(); - exchange.setInMessage(message); + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); + try { + message = createMessage(message); + Exchange exchange = message.getExchange(); + if (exchange == null) { + exchange = new ExchangeImpl(); + exchange.setInMessage(message); + } + setExchangeProperties(exchange, message); + + // setup chain + PhaseInterceptorChain chain = createChain(); + + message.setInterceptorChain(chain); + + chain.add(bus.getInInterceptors()); + if (bindingInterceptors != null) { + chain.add(bindingInterceptors); + } + if (routingInterceptors != null) { + chain.add(routingInterceptors); + } + + if (endpoints != null) { + exchange.put(ENDPOINTS, endpoints); + } + + chain.doIntercept(message); + } finally { + BusFactory.setThreadDefaultBus(origBus); } - setExchangeProperties(exchange, message); - - // setup chain - PhaseInterceptorChain chain = createChain(); - - message.setInterceptorChain(chain); - - chain.add(bus.getInInterceptors()); - if (bindingInterceptors != null) { - chain.add(bindingInterceptors); - } - if (routingInterceptors != null) { - chain.add(routingInterceptors); - } - - if (endpoints != null) { - exchange.put(ENDPOINTS, endpoints); - } - - chain.doIntercept(message); } /** Modified: incubator/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java Wed Dec 5 13:38:31 2007 @@ -54,6 +54,7 @@ import javax.xml.ws.soap.SOAPFaultException; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.binding.soap.SoapBinding; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.endpoint.Client; @@ -127,94 +128,100 @@ LOG.info("Dispatch: invoke called"); } - Endpoint endpoint = getEndpoint(); - Message message = endpoint.getBinding().createMessage(); - - if (context != null) { - message.setContent(JAXBContext.class, context); - } - - - Map reqContext = new HashMap(this.getRequestContext()); - Map respContext = this.getResponseContext(); - // clear the response context's hold information - // Not call the clear Context is to avoid the error - // that getResponseContext() would be called by Client code first - respContext.clear(); - - ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext); - message.putAll(reqContext); - //need to do context mapping from jax-ws to cxf message - - Exchange exchange = new ExchangeImpl(); - - exchange.setOutMessage(message); - setExchangeProperties(exchange, endpoint); - - message.setContent(Object.class, obj); - - if (obj instanceof SOAPMessage) { - message.setContent(SOAPMessage.class, obj); - } else if (obj instanceof Source) { - message.setContent(Source.class, obj); - } else if (obj instanceof DataSource) { - message.setContent(DataSource.class, obj); - } - - message.put(Message.REQUESTOR_ROLE, Boolean.TRUE); - - PhaseInterceptorChain chain = getDispatchOutChain(endpoint); - message.setInterceptorChain(chain); - - // setup conduit selector - prepareConduitSelector(message); - - // execute chain - chain.doIntercept(message); - - - if (message.getContent(Exception.class) != null) { - getConduitSelector().complete(exchange); - if (getBinding() instanceof SOAPBinding) { - try { - SOAPFault soapFault = SOAPFactory.newInstance().createFault(); - Fault fault = (Fault)message.getContent(Exception.class); - soapFault.setFaultCode(fault.getFaultCode()); - soapFault.setFaultString(fault.getMessage()); - SOAPFaultException exception = new SOAPFaultException(soapFault); + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); + try { + Endpoint endpoint = getEndpoint(); + Message message = endpoint.getBinding().createMessage(); + + if (context != null) { + message.setContent(JAXBContext.class, context); + } + + + Map reqContext = new HashMap(this.getRequestContext()); + Map respContext = this.getResponseContext(); + // clear the response context's hold information + // Not call the clear Context is to avoid the error + // that getResponseContext() would be called by Client code first + respContext.clear(); + + ContextPropertiesMapping.mapRequestfromJaxws2Cxf(reqContext); + message.putAll(reqContext); + //need to do context mapping from jax-ws to cxf message + + Exchange exchange = new ExchangeImpl(); + + exchange.setOutMessage(message); + setExchangeProperties(exchange, endpoint); + + message.setContent(Object.class, obj); + + if (obj instanceof SOAPMessage) { + message.setContent(SOAPMessage.class, obj); + } else if (obj instanceof Source) { + message.setContent(Source.class, obj); + } else if (obj instanceof DataSource) { + message.setContent(DataSource.class, obj); + } + + message.put(Message.REQUESTOR_ROLE, Boolean.TRUE); + + PhaseInterceptorChain chain = getDispatchOutChain(endpoint); + message.setInterceptorChain(chain); + + // setup conduit selector + prepareConduitSelector(message); + + // execute chain + chain.doIntercept(message); + + + if (message.getContent(Exception.class) != null) { + getConduitSelector().complete(exchange); + if (getBinding() instanceof SOAPBinding) { + try { + SOAPFault soapFault = SOAPFactory.newInstance().createFault(); + Fault fault = (Fault)message.getContent(Exception.class); + soapFault.setFaultCode(fault.getFaultCode()); + soapFault.setFaultString(fault.getMessage()); + SOAPFaultException exception = new SOAPFaultException(soapFault); + throw exception; + } catch (SOAPException e) { + throw new WebServiceException(e); + } + } else if (getBinding() instanceof HTTPBinding) { + HTTPException exception = new HTTPException(HttpURLConnection.HTTP_INTERNAL_ERROR); + exception.initCause(message.getContent(Exception.class)); throw exception; - } catch (SOAPException e) { - throw new WebServiceException(e); + } else { + throw new WebServiceException(message.getContent(Exception.class)); } - } else if (getBinding() instanceof HTTPBinding) { - HTTPException exception = new HTTPException(HttpURLConnection.HTTP_INTERNAL_ERROR); - exception.initCause(message.getContent(Exception.class)); - throw exception; + } + + // correlate response + if (getConduitSelector().selectConduit(message).getBackChannel() != null) { + // process partial response and wait for decoupled response } else { - throw new WebServiceException(message.getContent(Exception.class)); + // process response: send was synchronous so when we get here we can assume that the + // Exchange's inbound message is set and had been passed through the inbound + // interceptor chain. } - } - - // correlate response - if (getConduitSelector().selectConduit(message).getBackChannel() != null) { - // process partial response and wait for decoupled response - } else { - // process response: send was synchronous so when we get here we can assume that the - // Exchange's inbound message is set and had been passed through the inbound interceptor chain. - } - - if (!isOneWay) { - synchronized (exchange) { - Message inMsg = waitResponse(exchange); - respContext.putAll(inMsg); - getConduitSelector().complete(exchange); - //need to do context mapping from cxf message to jax-ws - ContextPropertiesMapping.mapResponsefromCxf2Jaxws(respContext); - return cl.cast(inMsg.getContent(Object.class)); + + if (!isOneWay) { + synchronized (exchange) { + Message inMsg = waitResponse(exchange); + respContext.putAll(inMsg); + getConduitSelector().complete(exchange); + //need to do context mapping from cxf message to jax-ws + ContextPropertiesMapping.mapResponsefromCxf2Jaxws(respContext); + return cl.cast(inMsg.getContent(Object.class)); + } } - } - return null; - + return null; + } finally { + BusFactory.setThreadDefaultBus(origBus); + } } private Message waitResponse(Exchange exchange) { @@ -308,6 +315,8 @@ chain.add(inInterceptors); // execute chain + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); try { chain.doIntercept(message); } finally { @@ -316,6 +325,7 @@ message.getExchange().setInMessage(message); message.getExchange().notifyAll(); } + BusFactory.setThreadDefaultBus(origBus); } } Modified: incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/main/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestination.java Wed Dec 5 13:38:31 2007 @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletResponse; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.helpers.HttpHeaderHelper; @@ -237,7 +238,12 @@ } // REVISIT: service on executor if associated with endpoint - serviceRequest(req, resp); + try { + BusFactory.setThreadDefaultBus(bus); + serviceRequest(req, resp); + } finally { + BusFactory.setThreadDefaultBus(null); + } } protected void serviceRequest(final HttpServletRequest req, final HttpServletResponse resp) Modified: incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/transports/http-jetty/src/test/java/org/apache/cxf/transport/http_jetty/JettyHTTPDestinationTest.java Wed Dec 5 13:38:31 2007 @@ -33,6 +33,7 @@ import javax.xml.namespace.QName; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.bus.CXFBusImpl; import org.apache.cxf.common.util.Base64Utility; import org.apache.cxf.common.util.StringUtils; @@ -81,6 +82,7 @@ private static final String DIGEST_CHALLENGE = "Digest realm=luna"; private static final String CUSTOM_CHALLENGE = "Custom realm=sol"; private Bus bus; + private Bus threadDefaultBus; private Conduit decoupledBackChannel; private EndpointInfo endpointInfo; private EndpointReferenceType address; @@ -137,6 +139,7 @@ is = null; os = null; destination = null; + BusFactory.setDefaultBus(null); } @Test @@ -194,10 +197,17 @@ @Test public void testDoService() throws Exception { + Bus defaultBus = new CXFBusImpl(); + assertSame("Default thread bus has not been set", + defaultBus, BusFactory.getThreadDefaultBus()); destination = setUpDestination(false, false); setUpDoService(false); + assertSame("Default thread bus has been unexpectedly reset", + defaultBus, BusFactory.getThreadDefaultBus()); destination.doService(request, response); verifyDoService(); + assertSame("Default thread bus has not been reset", + defaultBus, BusFactory.getThreadDefaultBus()); } @Test @@ -491,6 +501,7 @@ observer = new MessageObserver() { public void onMessage(Message m) { inMessage = m; + threadDefaultBus = BusFactory.getThreadDefaultBus(); } }; dest.setMessageObserver(observer); @@ -704,6 +715,8 @@ } private void verifyDoService() throws Exception { + assertSame("Default thread bus has not been set for request", + bus, threadDefaultBus); assertNotNull("unexpected null message", inMessage); assertSame("unexpected HTTP request", inMessage.get(JettyHTTPDestination.HTTP_REQUEST), Modified: incubator/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java URL: http://svn.apache.org/viewvc/incubator/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=601540&r1=601539&r2=601540&view=diff ============================================================================== --- incubator/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original) +++ incubator/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Wed Dec 5 13:38:31 2007 @@ -41,6 +41,7 @@ import javax.naming.NamingException; import org.apache.cxf.Bus; +import org.apache.cxf.BusFactory; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.configuration.Configurable; import org.apache.cxf.configuration.Configurer; @@ -195,13 +196,17 @@ inMessage.setDestination(this); + BusFactory.setThreadDefaultBus(bus); + //handle the incoming message incomingObserver.onMessage(inMessage); } catch (JMSException jmsex) { //TODO: need to revisit for which exception should we throw. throw new IOException(jmsex.getMessage()); - } + } finally { + BusFactory.setThreadDefaultBus(null); + } } public void connected(javax.jms.Destination target,