Return-Path: Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: (qmail 30347 invoked from network); 16 Oct 2008 16:30:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 16 Oct 2008 16:30:20 -0000 Received: (qmail 31424 invoked by uid 500); 16 Oct 2008 16:30:21 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 31377 invoked by uid 500); 16 Oct 2008 16:30:21 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 31368 invoked by uid 99); 16 Oct 2008 16:30:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Oct 2008 09:30:21 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Oct 2008 16:29:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C8630238896B; Thu, 16 Oct 2008 09:29:50 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r705274 - in /cxf/trunk: api/src/main/java/org/apache/cxf/endpoint/ api/src/main/java/org/apache/cxf/message/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/transports/http/src/main/java/org/apache/cxf/transport/http/ rt/transports/http/... Date: Thu, 16 Oct 2008 16:29:46 -0000 To: commits@cxf.apache.org From: dkulp@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081016162950.C8630238896B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: dkulp Date: Thu Oct 16 09:29:41 2008 New Revision: 705274 URL: http://svn.apache.org/viewvc?rev=705274&view=rev Log: [CXF-1776] Provide Async invokes and callbacks on the low level Client objects. Get HTTP and JMS transports to support the async replies. Still need to update the JAX-WS frontend to use it and the JMS transport needs someone to look into it to make sure the changes will actually be acceptable. Added: cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java (with props) Modified: cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java cxf/trunk/api/src/main/java/org/apache/cxf/message/Exchange.java cxf/trunk/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java Modified: cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java (original) +++ cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/Client.java Thu Oct 16 09:29:41 2008 @@ -20,6 +20,7 @@ package org.apache.cxf.endpoint; import java.util.Map; +import java.util.concurrent.Executor; import javax.xml.namespace.QName; @@ -97,6 +98,69 @@ /** + * Invokes an operation asyncronously + * @param callback The callback that is called when the response is ready + * @param operationName The name of the operation to be invoked. The service namespace will be used + * when looking up the BindingOperationInfo. + * @param params The params that matches the parts of the input message of the operation. If the + * BindingOperationInfo supports unwrapping, it assumes the params are in the "unwrapped" form. If + * params are in the wrapped form, use invokeWrapped + * @return The return values that matche the parts of the output message of the operation + */ + void invoke(ClientCallback callback, + String operationName, + Object... params) throws Exception; + + /** + * Invokes an operation asyncronously + * @param callback The callback that is called when the response is ready + * @param operationName The name of the operation to be invoked + * @param params The params that matches the parts of the input message of the operation. If the + * BindingOperationInfo supports unwrapping, it assumes the params are in the "unwrapped" form. If + * params are in the wrapped form, use invokeWrapped + * @return The return values that matche the parts of the output message of the operation + */ + void invoke(ClientCallback callback, + QName operationName, + Object... params) throws Exception; + + + /** + * Invokes an operation asyncronously + * @param callback The callback that is called when the response is ready + * @param operationName The name of the operation to be invoked. The service namespace will be used + * when looking up the BindingOperationInfo. + * @param params The params that matches the parts of the input message of the operation + * @return The return values that matche the parts of the output message of the operation + */ + void invokeWrapped(ClientCallback callback, + String operationName, + Object... params) throws Exception; + + /** + * Invokes an operation asyncronously + * @param callback The callback that is called when the response is ready + * @param operationName The name of the operation to be invoked + * @param params The params that matches the parts of the input message of the operation + * @return The return values that matche the parts of the output message of the operation + */ + void invokeWrapped(ClientCallback callback, + QName operationName, + Object... params) throws Exception; + + /** + * Invokes an operation asyncronously + * @param callback The callback that is called when the response is ready + * @param oi The operation to be invoked + * @param params The params that matches the parts of the input message of the operation + * @return The return values that matche the parts of the output message of the operation + */ + void invoke(ClientCallback callback, + BindingOperationInfo oi, + Object... params) throws Exception; + + + /** * Gets the request context used for future invocations * @return context The context */ @@ -151,4 +215,11 @@ * */ void destroy(); + + /** + * Sets the executor which is used to process Asynchronous responses. The default + * is to use the threads provided by the transport. (example: the JMS listener threads) + * @param executor + */ + void setExecutor(Executor executor); } Added: cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java?rev=705274&view=auto ============================================================================== --- cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java (added) +++ cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java Thu Oct 16 09:29:41 2008 @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.endpoint; + +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.cxf.message.Message; + +/** + * + */ +public class ClientCallback implements Future { + + Map context; + Object[] result; + Throwable exception; + boolean done; + boolean cancelled; + boolean started; + + public ClientCallback() { + } + + /** + * Called when a message is first received prior to any actions + * being applied to the message. The InterceptorChain is setup so + * modifications to that can be done. + */ + public void start(Message msg) { + started = true; + } + + /** + * If the processing of the incoming message proceeds normally, this + * method is called with the response context values and the resulting objects. + * + * The default behavior just stores the objects and calls notifyAll to wake + * up threads waiting for the response. + * + * @param ctx + * @param res + */ + public void handleResponse(Map ctx, Object[] res) { + context = ctx; + result = res; + done = true; + synchronized (this) { + notifyAll(); + } + } + + /** + * If processing of the incoming message results in an exception, this + * method is called with the resulting exception. + * + * The default behavior just stores the objects and calls notifyAll to wake + * up threads waiting for the response. + * + * @param ctx + * @param ex + */ + public void handleException(Map ctx, Throwable ex) { + context = ctx; + exception = ex; + done = true; + synchronized (this) { + notifyAll(); + } + } + + + public boolean cancel(boolean mayInterruptIfRunning) { + if (!started) { + cancelled = true; + synchronized (this) { + notifyAll(); + } + return true; + } + return false; + } + + public Map getResponseContext() throws InterruptedException, ExecutionException { + synchronized (this) { + if (!done) { + wait(); + } + } + if (cancelled) { + throw new InterruptedException("Operation Cancelled"); + } + if (exception != null) { + throw new ExecutionException(exception); + } + return context; + } + + public Object[] get() throws InterruptedException, ExecutionException { + synchronized (this) { + if (!done) { + wait(); + } + } + if (cancelled) { + throw new InterruptedException("Operation Cancelled"); + } + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } + + public Object[] get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + synchronized (this) { + if (!done) { + unit.timedWait(this, timeout); + } + } + if (cancelled) { + throw new InterruptedException("Operation Cancelled"); + } + if (exception != null) { + throw new ExecutionException(exception); + } + return result; + } + + public boolean isCancelled() { + return cancelled; + } + + public boolean isDone() { + return done; + } +} Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java ------------------------------------------------------------------------------ svn:eol-style = native Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: cxf/trunk/api/src/main/java/org/apache/cxf/message/Exchange.java URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/message/Exchange.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/api/src/main/java/org/apache/cxf/message/Exchange.java (original) +++ cxf/trunk/api/src/main/java/org/apache/cxf/message/Exchange.java Thu Oct 16 09:29:41 2008 @@ -63,6 +63,14 @@ * @return true if the exchange is known to be a one-way exchange */ boolean isOneWay(); + + /** + * @return true if the frontend will be wait for the response. Transports + * can then optimize themselves to process the response immediately instead + * of using a background thread or similar. + */ + boolean isSynchronous(); + void setSynchronous(boolean b); /** * Modified: cxf/trunk/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java (original) +++ cxf/trunk/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java Thu Oct 16 09:29:41 2008 @@ -30,6 +30,7 @@ private Destination destination; private boolean oneWay; + private boolean synchronous = true; private Message inMessage; private Message outMessage; @@ -104,6 +105,14 @@ public void setOneWay(boolean b) { oneWay = b; } + + public boolean isSynchronous() { + return synchronous; + } + + public void setSynchronous(boolean b) { + synchronous = b; + } public Session getSession() { return session; Modified: cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original) +++ cxf/trunk/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Thu Oct 16 09:29:41 2008 @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; @@ -89,6 +90,8 @@ protected ThreadLocal > responseContext = new ThreadLocal>(); + + protected Executor executor; public ClientImpl(Bus b, Endpoint e) { @@ -312,6 +315,104 @@ } } } + + public void invoke(ClientCallback callback, + String operationName, + Object... params) throws Exception { + QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName); + invoke(callback, q, params); + } + + public void invoke(ClientCallback callback, + QName operationName, + Object... params) throws Exception { + BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName); + if (op == null) { + throw new UncheckedException( + new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName)); + } + + if (op.isUnwrappedCapable()) { + op = op.getUnwrappedOperation(); + } + + invoke(callback, op, params); + } + + + public void invokeWrapped(ClientCallback callback, + String operationName, + Object... params) + throws Exception { + QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName); + invokeWrapped(callback, q, params); + } + + public void invokeWrapped(ClientCallback callback, + QName operationName, + Object... params) + throws Exception { + BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName); + if (op == null) { + throw new UncheckedException( + new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName)); + } + invoke(callback, op, params); + } + + + public void invoke(ClientCallback callback, + BindingOperationInfo oi, + Object... params) throws Exception { + Bus origBus = BusFactory.getThreadDefaultBus(false); + BusFactory.setThreadDefaultBus(bus); + try { + Exchange exchange = new ExchangeImpl(); + exchange.setSynchronous(false); + Endpoint endpoint = getEndpoint(); + Map context = new HashMap(); + Map resp = getResponseContext(); + resp.clear(); + Map reqContext = new HashMap(getRequestContext()); + context.put(RESPONSE_CONTEXT, resp); + context.put(REQUEST_CONTEXT, reqContext); + + Message message = endpoint.getBinding().createMessage(); + message.put(Message.INVOCATION_CONTEXT, context); + + //setup the message context + setContext(reqContext, message); + setParameters(params, message); + + if (null != reqContext) { + exchange.putAll(reqContext); + } + exchange.setOneWay(oi.getOutput() == null); + exchange.setOutMessage(message); + exchange.put(ClientCallback.class, callback); + + setOutMessageProperties(message, oi); + setExchangeProperties(exchange, endpoint, oi); + + // setup chain + + PhaseInterceptorChain chain = setupInterceptorChain(endpoint); + message.setInterceptorChain(chain); + + modifyChain(chain, reqContext); + chain.setFaultObserver(outFaultObserver); + + // setup conduit selector + prepareConduitSelector(message); + + // execute chain + chain.doIntercept(message); + + } finally { + BusFactory.setThreadDefaultBus(origBus); + } + } + public Object[] invoke(BindingOperationInfo oi, Object[] params, Map context, @@ -322,6 +423,7 @@ if (exchange == null) { exchange = new ExchangeImpl(); } + exchange.setSynchronous(true); Endpoint endpoint = getEndpoint(); Map reqContext = null; @@ -363,62 +465,68 @@ // execute chain chain.doIntercept(message); + return processResult(message, exchange, oi, resContext); - // Check to see if there is a Fault from the outgoing chain - Exception ex = message.getContent(Exception.class); - boolean mepCompleteCalled = false; - if (ex != null) { - getConduitSelector().complete(exchange); - mepCompleteCalled = true; - if (message.getContent(Exception.class) != null) { - throw ex; - } - } - ex = message.getExchange().get(Exception.class); - if (ex != null) { - if (!mepCompleteCalled) { - getConduitSelector().complete(exchange); - } + } finally { + BusFactory.setThreadDefaultBus(origBus); + } + } + + private Object[] processResult(Message message, + Exchange exchange, + BindingOperationInfo oi, + Map resContext) throws Exception { + // Check to see if there is a Fault from the outgoing chain + Exception ex = message.getContent(Exception.class); + boolean mepCompleteCalled = false; + if (ex != null) { + getConduitSelector().complete(exchange); + mepCompleteCalled = true; + if (message.getContent(Exception.class) != null) { throw ex; } - - // Wait for a response if we need to - if (!oi.getOperationInfo().isOneWay()) { - synchronized (exchange) { - waitResponse(exchange); - } - } - getConduitSelector().complete(exchange); - - // Grab the response objects if there are any - List resList = null; - Message inMsg = exchange.getInMessage(); - if (inMsg != null) { - if (null != resContext) { - resContext.putAll(inMsg); - if (LOG.isLoggable(Level.FINE)) { - LOG.fine("set responseContext to be" + responseContext); - } - } - resList = inMsg.getContent(List.class); + } + ex = message.getExchange().get(Exception.class); + if (ex != null) { + if (!mepCompleteCalled) { + getConduitSelector().complete(exchange); } - - // check for an incoming fault - ex = getException(exchange); - - if (ex != null) { - throw ex; + throw ex; + } + + // Wait for a response if we need to + if (oi != null && !oi.getOperationInfo().isOneWay()) { + synchronized (exchange) { + waitResponse(exchange); } - - if (resList != null) { - return resList.toArray(); + } + getConduitSelector().complete(exchange); + + // Grab the response objects if there are any + List resList = null; + Message inMsg = exchange.getInMessage(); + if (inMsg != null) { + if (null != resContext) { + resContext.putAll(inMsg); + if (LOG.isLoggable(Level.FINE)) { + LOG.fine("set responseContext to be" + responseContext); + } } - return null; - } finally { - BusFactory.setThreadDefaultBus(origBus); + resList = inMsg.getContent(List.class); + } + + // check for an incoming fault + ex = getException(exchange); + + if (ex != null) { + throw ex; + } + + if (resList != null) { + return resList.toArray(); } + return null; } - protected Exception getException(Exchange exchange) { if (exchange.getInFaultMessage() != null) { return exchange.getInFaultMessage().getContent(Exception.class); @@ -461,7 +569,9 @@ } public void onMessage(Message message) { + ClientCallback callback = message.getExchange().get(ClientCallback.class); Endpoint endpoint = message.getExchange().get(Endpoint.class); + message.getExchange().setInMessage(message); if (endpoint == null) { // in this case correlation will occur outside the transport, // however there's a possibility that the endpoint may have been @@ -476,8 +586,6 @@ message.put(Message.INBOUND_MESSAGE, Boolean.TRUE); PhaseManager pm = bus.getExtension(PhaseManager.class); - - List i1 = bus.getInInterceptors(); if (LOG.isLoggable(Level.FINE)) { LOG.fine("Interceptors contributed by bus: " + i1); @@ -498,13 +606,20 @@ PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4); message.setInterceptorChain(chain); - chain.setFaultObserver(outFaultObserver); Bus origBus = BusFactory.getThreadDefaultBus(false); BusFactory.setThreadDefaultBus(bus); // execute chain try { + if (callback != null) { + if (callback.isCancelled()) { + getConduitSelector().complete(message.getExchange()); + return; + } + callback.start(message); + } + String startingAfterInterceptorID = (String) message.get( PhaseInterceptorChain.STARTING_AFTER_INTERCEPTOR_ID); String startingInterceptorID = (String) message.get( @@ -516,6 +631,22 @@ } else { chain.doIntercept(message); } + + if (callback != null) { + Map resCtx = CastUtils.cast((Map)message + .getExchange() + .getOutMessage() + .get(Message.INVOCATION_CONTEXT)); + resCtx = CastUtils.cast((Map)resCtx.get(RESPONSE_CONTEXT)); + + try { + Object obj[] = processResult(message, message.getExchange(), + null, resCtx); + callback.handleResponse(resCtx, obj); + } catch (Exception ex) { + callback.handleException(resCtx, ex); + } + } } finally { synchronized (message.getExchange()) { if (!isPartialResponse(message)) { @@ -566,7 +697,19 @@ exchange.put(OperationInfo.class, boi.getOperationInfo()); } - exchange.put(MessageObserver.class, this); + if (exchange.isSynchronous() || executor == null) { + exchange.put(MessageObserver.class, this); + } else { + exchange.put(MessageObserver.class, new MessageObserver() { + public void onMessage(final Message message) { + executor.execute(new Runnable() { + public void run() { + ClientImpl.this.onMessage(message); + } + }); + } + }); + } exchange.put(Retryable.class, this); exchange.put(Client.class, this); exchange.put(Bus.class, bus); @@ -691,4 +834,11 @@ super.putAll(shared); } } + + + public void setExecutor(Executor executor) { + this.executor = executor; + } + + } Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original) +++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Thu Oct 16 09:29:41 2008 @@ -66,6 +66,8 @@ import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.transport.http.policy.PolicyUtils; import org.apache.cxf.transports.http.configuration.HTTPClientPolicy; +import org.apache.cxf.workqueue.AutomaticWorkQueue; +import org.apache.cxf.workqueue.WorkQueueManager; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.ws.policy.Assertor; import org.apache.cxf.ws.policy.PolicyEngine; @@ -1934,6 +1936,30 @@ // Process retransmits until we fall out. handleRetransmits(); + if (outMessage == null + || outMessage.getExchange() == null + || outMessage.getExchange().isSynchronous()) { + handleResponseInternal(); + } else { + Runnable runnable = new Runnable() { + public void run() { + try { + handleResponseInternal(); + } catch (IOException e) { + LOG.log(Level.WARNING, e.getMessage(), e); + } + } + }; + WorkQueueManager mgr = outMessage.getExchange().get(Bus.class) + .getExtension(WorkQueueManager.class); + AutomaticWorkQueue queue = mgr.getNamedWorkQueue("http-conduit"); + if (queue == null) { + queue = mgr.getAutomaticWorkQueue(); + } + queue.execute(runnable); + } + } + protected void handleResponseInternal() throws IOException { int responseCode = connection.getResponseCode(); if (LOG.isLoggable(Level.FINE)) { Modified: cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java (original) +++ cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java Thu Oct 16 09:29:41 2008 @@ -329,6 +329,8 @@ message.setExchange(exchange); exchange.isOneWay(); EasyMock.expectLastCall().andReturn(true); + exchange.isSynchronous(); + EasyMock.expectLastCall().andReturn(true); } private HTTPConduit setUpConduit( Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Thu Oct 16 09:29:41 2008 @@ -56,14 +56,14 @@ private EndpointInfo endpointInfo; private JMSConfiguration jmsConfig; - private Map correlationMap; + private Map correlationMap; private DefaultMessageListenerContainer jmsListener; public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration jmsConfig) { super(target); this.jmsConfig = jmsConfig; this.endpointInfo = endpointInfo; - correlationMap = new ConcurrentHashMap(); + correlationMap = new ConcurrentHashMap(); } /** @@ -128,25 +128,26 @@ * fill to Message and notify this thread */ if (!exchange.isOneWay()) { - Message inMessage = new MessageImpl(); - synchronized (inMessage) { - correlationMap.put(correlationId, inMessage); + synchronized (exchange) { + correlationMap.put(correlationId, exchange); jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator); - try { - inMessage.wait(jmsTemplate.getReceiveTimeout()); - } catch (InterruptedException e) { + + if (exchange.isSynchronous()) { + try { + exchange.wait(jmsTemplate.getReceiveTimeout()); + } catch (InterruptedException e) { + correlationMap.remove(correlationId); + throw new RuntimeException(e); + } correlationMap.remove(correlationId); - throw new RuntimeException(e); + if (exchange.getInMessage() == null + && exchange.getInFaultMessage() == null) { + throw new RuntimeException("Timeout receiving message with correlationId " + + correlationId); + } + + } - correlationMap.remove(correlationId); - if (inMessage.getContent(InputStream.class) == null) { - throw new RuntimeException("Timeout receiving message with correlationId " - + correlationId); - } - } - exchange.setInMessage(inMessage); - if (incomingObserver != null) { - incomingObserver.onMessage(inMessage); } } else { jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator); @@ -165,20 +166,30 @@ } catch (JMSException e) { throw JmsUtils.convertJmsAccessException(e); } - Message inMessage = correlationMap.get(correlationId); - if (inMessage == null) { + + Exchange exchange = correlationMap.remove(correlationId); + if (exchange == null) { LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId); + return; } + Message inMessage = new MessageImpl(); + exchange.setInMessage(inMessage); LOG.log(Level.FINE, "client received reply: ", jmsMessage); JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); byte[] response = JMSUtils.retrievePayload(jmsMessage); LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]"); inMessage.setContent(InputStream.class, new ByteArrayInputStream(response)); - synchronized (inMessage) { - inMessage.notifyAll(); + if (exchange.isSynchronous()) { + synchronized (exchange) { + exchange.notifyAll(); + } + } + + //REVISIT: put on a workqueue? + if (incomingObserver != null) { + incomingObserver.onMessage(exchange.getInMessage()); } - } public void close() { Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Thu Oct 16 09:29:41 2008 @@ -91,6 +91,7 @@ Exchange exchange = new ExchangeImpl(); exchange.setOneWay(isOneWay); + exchange.setSynchronous(true); message.setExchange(exchange); exchange.setOutMessage(message); try { Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Thu Oct 16 09:29:41 2008 @@ -93,11 +93,12 @@ setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl", "HelloWorldServiceLoop", "HelloWorldPortLoop"); JMSConduit conduit = setupJMSConduit(true, false); - conduit.getJmsConfig().setReceiveTimeout(1000); + conduit.getJmsConfig().setReceiveTimeout(10000); try { - for (int c = 0; c < 100; c++) { + for (int c = 0; c < 10; c++) { LOG.info("Sending message " + c); + inMessage = null; Message message = new MessageImpl(); sendoutMessage(conduit, message, false); verifyReceivedMessage(message); @@ -133,7 +134,14 @@ } } - private void verifyReceivedMessage(Message message) { + private void verifyReceivedMessage(Message message) throws InterruptedException { + while (inMessage == null) { + //the send has completed, but the response might not be back yet. + //wait for it. + synchronized (this) { + wait(10); + } + } ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class); Assert.assertNotNull("The received message input stream should not be null", bis); byte bytes[] = new byte[bis.available()]; @@ -142,8 +150,8 @@ } catch (IOException ex) { ex.printStackTrace(); } - String reponse = IOUtils.newStringFromBytes(bytes); - assertEquals("The reponse date should be equals", reponse, "HelloWorld"); + String response = IOUtils.newStringFromBytes(bytes); + assertEquals("The response data should be equal", "HelloWorld", response); JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); Modified: cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java?rev=705274&r1=705273&r2=705274&view=diff ============================================================================== --- cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java (original) +++ cxf/trunk/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java Thu Oct 16 09:29:41 2008 @@ -24,6 +24,7 @@ import org.apache.cxf.endpoint.Client; +import org.apache.cxf.endpoint.ClientCallback; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.jaxws.endpoint.dynamic.JaxWsDynamicClientFactory; import org.apache.cxf.no_body_parts.types.Operation1; @@ -71,6 +72,12 @@ Object[] rparts = client.invoke("operation1", parameters, bucketOfBytes); Operation1Response r = (Operation1Response)rparts[0]; assertEquals(md5(bucketOfBytes), r.getStatus()); + + ClientCallback callback = new ClientCallback(); + client.invoke(callback, "operation1", parameters, bucketOfBytes); + rparts = callback.get(); + r = (Operation1Response)rparts[0]; + assertEquals(md5(bucketOfBytes), r.getStatus()); } }