cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r707153 - in /cxf/branches/2.0.x-fixes: ./ api/src/main/java/org/apache/cxf/endpoint/ api/src/main/java/org/apache/cxf/message/ api/src/main/java/org/apache/cxf/phase/ common/common/src/main/java/org/apache/cxf/helpers/ rt/core/src/main/jav...
Date Wed, 22 Oct 2008 18:19:53 GMT
Author: dkulp
Date: Wed Oct 22 11:19:52 2008
New Revision: 707153

URL: http://svn.apache.org/viewvc?rev=707153&view=rev
Log:
Merged revisions 705280,705449 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/branches/2.1.x-fixes

................
  r705280 | dkulp | 2008-10-16 12:46:40 -0400 (Thu, 16 Oct 2008) | 9 lines
  
  Merged revisions 705274 via svnmerge from 
  https://svn.apache.org/repos/asf/cxf/trunk
  
  ........
    r705274 | dkulp | 2008-10-16 12:29:41 -0400 (Thu, 16 Oct 2008) | 2 lines
    
    [CXF-1776] Provide Async invokes and callbacks on the low level Client objects.  Get HTTP and JMS transports to support the async replies.   Still need to update the JAX-WS frontend to use it and the JMS transport needs someone to look into it to make sure the changes will actually be acceptable.  
  ........
................
  r705449 | dkulp | 2008-10-16 22:44:15 -0400 (Thu, 16 Oct 2008) | 11 lines
  
  Merged revisions 705446 via svnmerge from 
  https://svn.apache.org/repos/asf/cxf/trunk
  
  ........
    r705446 | dkulp | 2008-10-16 22:30:51 -0400 (Thu, 16 Oct 2008) | 4 lines
    
    Start making jaxws async calls use async stuff on client.
    Fix some ws-rm/ws-a issues created from my last commit.
    Add "toString" methods to some BAIS to aid debugging.
  ........
................

Added:
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
      - copied unchanged from r705449, cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
Removed:
    cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSAsyncCallable.java
Modified:
    cxf/branches/2.0.x-fixes/   (props changed)
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java
    cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/IOUtils.java
    cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
    cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
    cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/endpoint/dynamic/DynamicClientFactory.java
    cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java
    cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxWsClientProxy.java
    cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/branches/2.0.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
    cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
    cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Oct 22 11:19:52 2008
@@ -1,3 +1,3 @@
-/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715,691745,692162-692163,692468,692500,694466-694469,694472,694717,694748-694749,694870,695503,695509,695553,695555,695563,695875-695877,695940,695980,696436,696455,696721,697086,698129,701526,701634,702275,702443,702527,702582,702604,702610,702642-702643,702649,702760,702870,702873,702959,703193,703242,703523,704303,704587,704738,704998,705153,705455
-/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355,691488,691602,691706,691728,692116,692157,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694869,695396,695484,695537,695552,695561,695619,695684,695835,695935,695977,696094,696433,696720,697085,698128,700261,700602,701783,701830,701862,702267,702580,702602,702609,702616,702656,702957,703191,703239,703501,704584,704997,705150,705340
+/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715,691745,692162-692163,692468,692500,694466-694469,694472,694717,694748-694749,694870,695503,695509,695553,695555,695563,695875-695877,695940,695980,696436,696455,696721,697086,698129,701526,701634,702275,702443,702527,702582,702604,702610,702642-702643,702649,702760,702870,702873,702959,703193,703242,703523,704303,704587,704738,704998,705153,705280-705449,705455
+/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355,691488,691602,691706,691728,692116,692157,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694869,695396,695484,695537,695552,695561,695619,695684,695835,695935,695977,696094,696433,696720,697085,698128,700261,700602,701783,701830,701862,702267,702580,702602,702609,702616,702656,702957,703191,703239,703501,704584,704997,705150,705274,705340,705446
 /incubator/cxf/trunk:434594-651668

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java (original)
+++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java Wed Oct 22 11:19:52 2008
@@ -20,6 +20,7 @@
 package org.apache.cxf.endpoint;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import javax.xml.namespace.QName;
 
@@ -97,6 +98,69 @@
     
     
     /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked. The service namespace will be used
+     * when looking up the BindingOperationInfo.
+     * @param params  The params that matches the parts of the input message of the operation.  If the 
+     * BindingOperationInfo supports unwrapping, it assumes the params are in the "unwrapped" form.  If 
+     * params are in the wrapped form, use invokeWrapped
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invoke(ClientCallback callback,
+                    String operationName,
+                    Object... params) throws Exception;
+    
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked
+     * @param params  The params that matches the parts of the input message of the operation.  If the 
+     * BindingOperationInfo supports unwrapping, it assumes the params are in the "unwrapped" form.  If 
+     * params are in the wrapped form, use invokeWrapped
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invoke(ClientCallback callback,
+                    QName operationName,
+                    Object... params) throws Exception;
+
+
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked. The service namespace will be used
+     * when looking up the BindingOperationInfo.
+     * @param params  The params that matches the parts of the input message of the operation
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invokeWrapped(ClientCallback callback,
+                           String operationName,
+                    Object... params) throws Exception;
+    
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked
+     * @param params  The params that matches the parts of the input message of the operation
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invokeWrapped(ClientCallback callback,
+                           QName operationName,
+                    Object... params) throws Exception;    
+    
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param oi  The operation to be invoked
+     * @param params  The params that matches the parts of the input message of the operation
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invoke(ClientCallback callback,
+                BindingOperationInfo oi,
+                Object... params) throws Exception;    
+    
+    
+    /**
      * Gets the request context used for future invocations
      * @return context The context
      */
@@ -151,4 +215,11 @@
      *
      */
     void destroy();
+    
+    /**
+     * Sets the executor which is used to process Asynchronous responses.  The default
+     * is to use the threads provided by the transport.  (example: the JMS listener threads) 
+     * @param executor
+     */
+    void setExecutor(Executor executor);
 }

Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java (original)
+++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java Wed Oct 22 11:19:52 2008
@@ -63,6 +63,14 @@
      * @return true if the exchange is known to be a one-way exchange
      */
     boolean isOneWay();
+    
+    /**
+     * @return true if the frontend will be wait for the response.   Transports
+     * can then optimize themselves to process the response immediately instead 
+     * of using a background thread or similar.
+     */
+    boolean isSynchronous();
+    void setSynchronous(boolean b);
 
     /**
      * 

Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java (original)
+++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java Wed Oct 22 11:19:52 2008
@@ -30,6 +30,7 @@
 
     private Destination destination;
     private boolean oneWay;
+    private boolean synchronous = true;
     
     private Message inMessage;
     private Message outMessage;
@@ -104,6 +105,14 @@
     public void setOneWay(boolean b) {
         oneWay = b;
     }
+    
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean b) {
+        synchronous = b;
+    }
 
     public Session getSession() {
         return session;

Modified: cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/branches/2.0.x-fixes/api/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Wed Oct 22 11:19:52 2008
@@ -265,6 +265,7 @@
         }
         if (state == State.EXECUTING) {
             state = State.COMPLETE;
+            pausedMessage = null;
         }
         return state == State.COMPLETE;
     }

Modified: cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/IOUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/IOUtils.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/IOUtils.java (original)
+++ cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/IOUtils.java Wed Oct 22 11:19:52 2008
@@ -25,15 +25,50 @@
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Reader;
+import java.io.UnsupportedEncodingException;
 import java.io.Writer;
+import java.nio.charset.Charset;
 
 public final class IOUtils {
-
+    public static final Charset UTF8_CHARSET = Charset.forName("utf-8");
     private static final int DEFAULT_BUFFER_SIZE = 1024 * 4;
 
     private IOUtils() {
 
     }
+    
+    /**
+     * Use this function instead of new String(byte[]) to avoid surprises from non-standard default encodings.
+     * @param bytes
+     * @return
+     */
+    public static String newStringFromBytes(byte[] bytes) {
+        try {
+            return new String(bytes, UTF8_CHARSET.name());
+        } catch (UnsupportedEncodingException e) {
+            throw 
+                new RuntimeException("Impossible failure: Charset.forName(\"utf-8\") returns invalid name.");
+
+        }
+    }
+
+    /**
+     * Use this function instead of new String(byte[], int, int) 
+     * to avoid surprises from non-standard default encodings.
+     * @param bytes
+     * @param start
+     * @param length
+     * @return
+     */
+    public static String newStringFromBytes(byte[] bytes, int start, int length) {
+        try {
+            return new String(bytes, start, length, UTF8_CHARSET.name());
+        } catch (UnsupportedEncodingException e) {
+            throw 
+                new RuntimeException("Impossible failure: Charset.forName(\"utf-8\") returns invalid name.");
+
+        }
+    }
 
     public static int copy(final InputStream input, final OutputStream output)
         throws IOException {
@@ -107,7 +142,7 @@
         int n = 0;
         n = input.read(buffer);
         while (-1 != n) {
-            buf.append(new String(buffer, 0, n));
+            buf.append(newStringFromBytes(buffer, 0, n));
             n = input.read(buffer);
         }
         input.close();

Modified: cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java (original)
+++ cxf/branches/2.0.x-fixes/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java Wed Oct 22 11:19:52 2008
@@ -44,7 +44,11 @@
     }
     
     public ByteArrayInputStream createInputStream() {
-        return new ByteArrayInputStream(buf, 0, count);
+        return new ByteArrayInputStream(buf, 0, count) {
+            public String toString() {
+                return IOUtils.newStringFromBytes(buf, 0, count);
+            }
+        };
     }
     
     public byte[] toByteArray() {

Modified: cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java (original)
+++ cxf/branches/2.0.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java Wed Oct 22 11:19:52 2008
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -89,6 +90,8 @@
 
     protected ThreadLocal <Map<String, Object>> responseContext =
             new ThreadLocal<Map<String, Object>>();
+    
+    protected Executor executor;
 
 
     public ClientImpl(Bus b, Endpoint e) {
@@ -292,6 +295,104 @@
             }
         }
     }
+    
+    public void invoke(ClientCallback callback, 
+                       String operationName, 
+                       Object... params) throws Exception {
+        QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
+        invoke(callback, q, params);        
+    }
+
+    public void invoke(ClientCallback callback, 
+                       QName operationName, 
+                       Object... params) throws Exception {
+        BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
+        if (op == null) {
+            throw new UncheckedException(
+                new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
+        }
+        
+        if (op.isUnwrappedCapable()) {
+            op = op.getUnwrappedOperation();
+        }
+        
+        invoke(callback, op, params);
+    }
+
+
+    public void invokeWrapped(ClientCallback callback, 
+                              String operationName, 
+                              Object... params)
+        throws Exception {
+        QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
+        invokeWrapped(callback, q, params);        
+    }
+
+    public void invokeWrapped(ClientCallback callback, 
+                              QName operationName, 
+                              Object... params)
+        throws Exception {
+        BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
+        if (op == null) {
+            throw new UncheckedException(
+                new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
+        }
+        invoke(callback, op, params);
+    }
+
+    
+    public void invoke(ClientCallback callback, 
+                       BindingOperationInfo oi, 
+                       Object... params) throws Exception {
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try {
+            Exchange exchange = new ExchangeImpl();
+            exchange.setSynchronous(false);
+            Endpoint endpoint = getEndpoint();
+            Map<String, Object> context = new HashMap<String, Object>();
+            Map<String, Object> resp = getResponseContext();
+            resp.clear();
+            Map<String, Object> reqContext = new HashMap<String, Object>(getRequestContext());
+            context.put(RESPONSE_CONTEXT, resp);
+            context.put(REQUEST_CONTEXT, reqContext);
+
+            Message message = endpoint.getBinding().createMessage();
+            message.put(Message.INVOCATION_CONTEXT, context);
+            
+            //setup the message context
+            setContext(reqContext, message);
+            setParameters(params, message);
+    
+            if (null != reqContext) {
+                exchange.putAll(reqContext);
+            }
+            exchange.setOneWay(oi.getOutput() == null);
+            exchange.setOutMessage(message);
+            exchange.put(ClientCallback.class, callback);
+            
+            setOutMessageProperties(message, oi);
+            setExchangeProperties(exchange, endpoint, oi);
+            
+            // setup chain
+    
+            PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
+            message.setInterceptorChain(chain);
+            
+            modifyChain(chain, reqContext);
+            chain.setFaultObserver(outFaultObserver);
+            
+            // setup conduit selector
+            prepareConduitSelector(message);
+            
+            // execute chain        
+            chain.doIntercept(message);
+
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
+        }       
+    }
+    
     public Object[] invoke(BindingOperationInfo oi,
                            Object[] params, 
                            Map<String, Object> context,
@@ -302,6 +403,7 @@
             if (exchange == null) {
                 exchange = new ExchangeImpl();
             }
+            exchange.setSynchronous(true);
             Endpoint endpoint = getEndpoint();
             
             Map<String, Object> reqContext = null;
@@ -343,62 +445,69 @@
             // execute chain        
             chain.doIntercept(message);
     
+            return processResult(message, exchange, oi, resContext);
             
-            // Check to see if there is a Fault from the outgoing chain
-            Exception ex = message.getContent(Exception.class);
-            boolean mepCompleteCalled = false;
-            if (ex != null) {
-                getConduitSelector().complete(exchange);
-                mepCompleteCalled = true;
-                if (message.getContent(Exception.class) != null) {
-                    throw ex;
-                }
-            }
-            ex = message.getExchange().get(Exception.class);
-            if (ex != null) {
-                if (!mepCompleteCalled) {
-                    getConduitSelector().complete(exchange);
-                }
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
+        }
+    }
+
+    private Object[] processResult(Message message, 
+                                   Exchange exchange,
+                                   BindingOperationInfo oi,
+                                   Map<String, Object> resContext) throws Exception {
+     // Check to see if there is a Fault from the outgoing chain
+        Exception ex = message.getContent(Exception.class);
+        boolean mepCompleteCalled = false;
+        if (ex != null) {
+            getConduitSelector().complete(exchange);
+            mepCompleteCalled = true;
+            if (message.getContent(Exception.class) != null) {
                 throw ex;
             }
-            
-            // Wait for a response if we need to
-            if (!oi.getOperationInfo().isOneWay()) {
-                synchronized (exchange) {
-                    waitResponse(exchange);
-                }
-            }
-            getConduitSelector().complete(exchange);
-    
-            // Grab the response objects if there are any
-            List resList = null;
-            Message inMsg = exchange.getInMessage();
-            if (inMsg != null) {
-                if (null != resContext) {                   
-                    resContext.putAll(inMsg);
-                    if (LOG.isLoggable(Level.FINE)) {
-                        LOG.fine("set responseContext to be" + responseContext);
-                    }
-                }
-                resList = inMsg.getContent(List.class);
+        }
+        ex = message.getExchange().get(Exception.class);
+        if (ex != null) {
+            if (!mepCompleteCalled) {
+                getConduitSelector().complete(exchange);
             }
-            
-            // check for an incoming fault
-            ex = getException(exchange);
-            
-            if (ex != null) {
-                throw ex;
+            throw ex;
+        }
+        
+        // Wait for a response if we need to
+        if (oi != null && !oi.getOperationInfo().isOneWay()) {
+            synchronized (exchange) {
+                waitResponse(exchange);
             }
-            
-            if (resList != null) {
-                return resList.toArray();
+        }
+        getConduitSelector().complete(exchange);
+
+        // Grab the response objects if there are any
+        List resList = null;
+        Message inMsg = exchange.getInMessage();
+        if (inMsg != null) {
+            if (null != resContext) {                   
+                resContext.putAll(inMsg);
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.fine("set responseContext to be" + responseContext);
+                }
             }
-            return null;
-        } finally {
-            BusFactory.setThreadDefaultBus(origBus);
+            resList = inMsg.getContent(List.class);
+        }
+        
+        // check for an incoming fault
+        ex = getException(exchange);
+        
+        if (ex != null) {
+            throw ex;
         }
+        
+        if (resList != null) {
+            return resList.toArray();
+        }
+        
+        return null;
     }
-
     protected Exception getException(Exchange exchange) {
         if (exchange.getInFaultMessage() != null) {
             return exchange.getInFaultMessage().getContent(Exception.class);
@@ -441,6 +550,7 @@
     }
     
     public void onMessage(Message message) {
+
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
         if (endpoint == null) {
             // in this case correlation will occur outside the transport,
@@ -456,8 +566,6 @@
         message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
         PhaseManager pm = bus.getExtension(PhaseManager.class);
         
-        
-        
         List<Interceptor> i1 = bus.getInInterceptors();
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("Interceptors contributed by bus: " + i1);
@@ -478,13 +586,21 @@
         PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3, i4); 
         message.setInterceptorChain(chain);
         
-        
         chain.setFaultObserver(outFaultObserver);
         
         Bus origBus = BusFactory.getThreadDefaultBus(false);
         BusFactory.setThreadDefaultBus(bus);
         // execute chain
+        ClientCallback callback = message.getExchange().get(ClientCallback.class);
         try {
+            if (callback != null) {
+                if (callback.isCancelled()) {
+                    getConduitSelector().complete(message.getExchange());
+                    return;
+                }
+                callback.start(message);
+            }
+            
             String startingAfterInterceptorID = (String) message.get(
                 PhaseInterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
             String startingInterceptorID = (String) message.get(
@@ -496,12 +612,32 @@
             } else {
                 chain.doIntercept(message);
             }
+            
+            callback = message.getExchange().get(ClientCallback.class);
+            
+            if (callback != null && !isPartialResponse(message)) {
+                message.getExchange().setInMessage(message);
+                Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message
+                                                                .getExchange()
+                                                                .getOutMessage()
+                                                                .get(Message.INVOCATION_CONTEXT));
+                resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
+                
+                try {
+                    Object obj[] = processResult(message, message.getExchange(),
+                                                 null, resCtx);
+                                        
+                    callback.handleResponse(resCtx, obj);
+                } catch (Throwable ex) {
+                    callback.handleException(resCtx, ex);
+                }
+            }
         } finally {
             synchronized (message.getExchange()) {
-                if (!isPartialResponse(message)) {
+                if (!isPartialResponse(message) && callback == null) {
                     message.getExchange().put(FINISHED, Boolean.TRUE);
                     message.getExchange().setInMessage(message);
-                    message.getExchange().notifyAll();
+                    message.getExchange().notifyAll();                   
                 }
             }
             BusFactory.setThreadDefaultBus(origBus);
@@ -546,7 +682,19 @@
             exchange.put(OperationInfo.class, boi.getOperationInfo());
         }
                 
-        exchange.put(MessageObserver.class, this);
+        if (exchange.isSynchronous() || executor == null) {
+            exchange.put(MessageObserver.class, this);
+        } else {
+            exchange.put(MessageObserver.class, new MessageObserver() {
+                public void onMessage(final Message message) {
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            ClientImpl.this.onMessage(message);
+                        }
+                    });
+                }
+            });            
+        }
         exchange.put(Retryable.class, this);
         exchange.put(Client.class, this);
         exchange.put(Bus.class, bus);
@@ -671,4 +819,11 @@
             super.putAll(shared);
         }
     }
+
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+
 }

Modified: cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/endpoint/dynamic/DynamicClientFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/endpoint/dynamic/DynamicClientFactory.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/endpoint/dynamic/DynamicClientFactory.java (original)
+++ cxf/branches/2.0.x-fixes/rt/databinding/jaxb/src/main/java/org/apache/cxf/endpoint/dynamic/DynamicClientFactory.java Wed Oct 22 11:19:52 2008
@@ -20,8 +20,8 @@
 
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URISyntaxException;
 import java.net.URL;

Modified: cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java (original)
+++ cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/DispatchImpl.java Wed Oct 22 11:19:52 2008
@@ -87,7 +87,6 @@
 
     private Bus bus;
     private InterceptorProvider iProvider;
-
     private Class<T> cl;
     private Executor executor;
     private JAXBContext context;
@@ -356,6 +355,7 @@
     }
 
     public Future<?> invokeAsync(T obj, AsyncHandler<T> asyncHandler) {
+        
         Response<?> r = invokeAsync(obj);
         AsyncCallbackFuture callback = new AsyncCallbackFuture(r, asyncHandler);
 

Modified: cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxWsClientProxy.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxWsClientProxy.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxWsClientProxy.java (original)
+++ cxf/branches/2.0.x-fixes/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JaxWsClientProxy.java Wed Oct 22 11:19:52 2008
@@ -22,10 +22,11 @@
 import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Method;
 import java.net.HttpURLConnection;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.FutureTask;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.logging.Logger;
 
 import javax.xml.namespace.QName;
@@ -50,6 +51,7 @@
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientCallback;
 import org.apache.cxf.endpoint.Endpoint;
 import org.apache.cxf.frontend.MethodDispatcher;
 import org.apache.cxf.helpers.CastUtils;
@@ -144,7 +146,8 @@
         
         // need to do context mapping from cxf message to jax-ws
         ContextPropertiesMapping.mapResponsefromCxf2Jaxws(client.getResponseContext());
-        Map<String, Scope> scopes = CastUtils.cast((Map<?, ?>)client.getResponseContext().get(WrappedMessageContext.SCOPES));
+        Map<String, Scope> scopes = CastUtils.cast((Map<?, ?>)client.getResponseContext()
+                                                       .get(WrappedMessageContext.SCOPES));
         if (scopes != null) {
             for (Map.Entry<String, Scope> scope : scopes.entrySet()) {
                 if (scope.getValue() == Scope.HANDLER) {
@@ -207,26 +210,63 @@
                  || address.equals(getClient().getEndpoint().getEndpointInfo().getAddress()));
     }
 
-    private Object invokeAsync(Method method, BindingOperationInfo oi, Object[] params) {
+    @SuppressWarnings("unchecked")
+    private Object invokeAsync(Method method, BindingOperationInfo oi, Object[] params) throws Exception {
 
-        Map<String, Object> context = new HashMap<String, Object>(client.getRequestContext());
-        FutureTask<Object> f = new FutureTask<Object>(new JAXWSAsyncCallable(this, method, oi, params,
-                                                                             context));
-
-        Endpoint endpoint = getClient().getEndpoint();
-        endpoint.getExecutor().execute(f);
-        Response<?> r = new AsyncResponse<Object>(f, Object.class);
+        client.setExecutor(getClient().getEndpoint().getExecutor());
+        
+        final AsyncHandler<Object> handler;
         if (params.length > 0 && params[params.length - 1] instanceof AsyncHandler) {
-            // callback style
-            AsyncCallbackFuture callback = 
-                new AsyncCallbackFuture(r, (AsyncHandler)params[params.length - 1]);
-            endpoint.getExecutor().execute(callback);
-            return callback;
+            handler = (AsyncHandler)params[params.length - 1];
         } else {
-            return r;
+            handler = null;
         }
+        
+        final ClientCallback callback = new ClientCallback() {
+            public void handleResponse(Map<String, Object> ctx, Object[] res) {
+                super.handleResponse(ctx, res);
+                if (handler != null) {
+                    handler.handleResponse(new ResponseCallback(this));
+                }
+            }
+        };
+        
+        Response<Object> ret = new ResponseCallback(callback);
+        client.invoke(callback, oi, params);
+        return ret;
     }
 
+    static class ResponseCallback implements Response<Object> {
+        ClientCallback callback;
+        public ResponseCallback(ClientCallback cb) {
+            callback = cb;
+        }
+        
+        public Map<String, Object> getContext() {
+            try {
+                return callback.getResponseContext();
+            } catch (Exception ex) {
+                return null;
+            }
+        }
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            return callback.cancel(mayInterruptIfRunning);
+        }
+        public Object get() throws InterruptedException, ExecutionException {
+            return callback.get()[0];
+        }
+        public Object get(long timeout, TimeUnit unit) throws InterruptedException,
+            ExecutionException, TimeoutException {
+            return callback.get(timeout, unit)[0];
+        }
+        public boolean isCancelled() {
+            return callback.isCancelled();
+        }
+        public boolean isDone() {
+            return callback.isDone();
+        }
+    };
+
     
     public Map<String, Object> getRequestContext() {
         return new WrappedMessageContext(this.getClient().getRequestContext(),

Modified: cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Wed Oct 22 11:19:52 2008
@@ -65,6 +65,8 @@
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.policy.PolicyUtils;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.policy.Assertor;
 import org.apache.cxf.ws.policy.PolicyEngine;
@@ -1897,6 +1899,30 @@
             // Process retransmits until we fall out.
             handleRetransmits();
             
+            if (outMessage == null 
+                || outMessage.getExchange() == null
+                || outMessage.getExchange().isSynchronous()) {
+                handleResponseInternal();
+            } else {
+                Runnable runnable = new Runnable() {
+                    public void run() {
+                        try {
+                            handleResponseInternal();
+                        } catch (IOException e) {
+                            LOG.log(Level.WARNING, e.getMessage(), e);
+                        }
+                    }
+                };
+                WorkQueueManager mgr = outMessage.getExchange().get(Bus.class)
+                    .getExtension(WorkQueueManager.class);
+                AutomaticWorkQueue queue = mgr.getNamedWorkQueue("http-conduit");
+                if (queue == null) {
+                    queue = mgr.getAutomaticWorkQueue();
+                }
+                queue.execute(runnable);
+            }
+        }
+        protected void handleResponseInternal() throws IOException {
             int responseCode = connection.getResponseCode();
             
             if (LOG.isLoggable(Level.FINE)) {

Modified: cxf/branches/2.0.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java Wed Oct 22 11:19:52 2008
@@ -328,6 +328,8 @@
         message.setExchange(exchange);
         exchange.isOneWay();
         EasyMock.expectLastCall().andReturn(true);
+        exchange.isSynchronous();
+        EasyMock.expectLastCall().andReturn(true);
     }
 
     private HTTPConduit setUpConduit(

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Wed Oct 22 11:19:52 2008
@@ -53,17 +53,17 @@
  */
 public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener {
     static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
-
+    private static final String CORRELATED = JMSConduit.class.getName() + ".correlated";
     private EndpointInfo endpointInfo;
     private JMSConfiguration jmsConfig;
-    private Map<String, Message> correlationMap;
+    private Map<String, Exchange> correlationMap;
     private DefaultMessageListenerContainer jmsListener;
 
     public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration jmsConfig) {
         super(target);
         this.jmsConfig = jmsConfig;
         this.endpointInfo = endpointInfo;
-        correlationMap = new ConcurrentHashMap<String, Message>();
+        correlationMap = new ConcurrentHashMap<String, Exchange>();
     }
 
     /**
@@ -128,26 +128,26 @@
          * fill to Message and notify this thread
          */
         if (!exchange.isOneWay()) {
-            Message inMessage = new MessageImpl();
-            synchronized (inMessage) {
-                correlationMap.put(correlationId, inMessage);
+            synchronized (exchange) {
+                correlationMap.put(correlationId, exchange);
                 jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
-                try {
-                    inMessage.wait(jmsTemplate.getReceiveTimeout());
-                } catch (InterruptedException e) {
+                
+                if (exchange.isSynchronous()) {
+                    try {
+                        exchange.wait(jmsTemplate.getReceiveTimeout());
+                    } catch (InterruptedException e) {
+                        correlationMap.remove(correlationId);
+                        throw new RuntimeException(e);
+                    }
                     correlationMap.remove(correlationId);
-                    throw new RuntimeException(e);
-                }
-                correlationMap.remove(correlationId);
-                if (inMessage.getContent(InputStream.class) == null) {
-                    throw new RuntimeException("Timeout receiving message with correlationId "
-                                               + correlationId);
+                    if (exchange.get(CORRELATED) == null) {
+                        throw new RuntimeException("Timeout receiving message with correlationId "
+                                                   + correlationId);
+                    }
+                    
+                    
                 }
             }
-            exchange.setInMessage(inMessage);
-            if (incomingObserver != null) {
-                incomingObserver.onMessage(inMessage);
-            }
         } else {
             jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
         }
@@ -165,20 +165,31 @@
         } catch (JMSException e) {
             throw JmsUtils.convertJmsAccessException(e);
         }
-        Message inMessage = correlationMap.get(correlationId);
-        if (inMessage == null) {
+
+        Exchange exchange = correlationMap.remove(correlationId);
+        if (exchange == null) {
             LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
+            return;
         }
+        Message inMessage = new MessageImpl();
+        exchange.setInMessage(inMessage);
         LOG.log(Level.FINE, "client received reply: ", jmsMessage);
         JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
         byte[] response = JMSUtils.retrievePayload(jmsMessage);
         LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
         inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
 
-        synchronized (inMessage) {
-            inMessage.notifyAll();
+        if (exchange.isSynchronous()) {
+            synchronized (exchange) {
+                exchange.put(CORRELATED, Boolean.TRUE);
+                exchange.notifyAll();
+            }
+        }
+        
+        //REVISIT: put on a workqueue?
+        if (incomingObserver != null) {
+            incomingObserver.onMessage(exchange.getInMessage());
         }
-
     }
 
     public void close() {

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Wed Oct 22 11:19:52 2008
@@ -91,6 +91,7 @@
 
         Exchange exchange = new ExchangeImpl();
         exchange.setOneWay(isOneWay);
+        exchange.setSynchronous(true);
         message.setExchange(exchange);
         exchange.setOutMessage(message);
         try {

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Wed Oct 22 11:19:52 2008
@@ -94,11 +94,12 @@
         setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
                          "HelloWorldServiceLoop", "HelloWorldPortLoop");
         JMSConduit conduit = setupJMSConduit(true, false);
-        conduit.getJmsConfig().setReceiveTimeout(1000);
+        conduit.getJmsConfig().setReceiveTimeout(10000);
 
         try {
-            for (int c = 0; c < 100; c++) {
+            for (int c = 0; c < 10; c++) {
                 LOG.info("Sending message " + c);
+                inMessage = null;
                 Message message = new MessageImpl();
                 sendoutMessage(conduit, message, false);
                 verifyReceivedMessage(message);
@@ -134,7 +135,14 @@
         }
     }
 
-    private void verifyReceivedMessage(Message message) {
+    private void verifyReceivedMessage(Message message) throws InterruptedException {
+        while (inMessage == null) {
+            //the send has completed, but the response might not be back yet.
+            //wait for it.
+            synchronized (this) {
+                wait(10);
+            }
+        }
         ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
         Assert.assertNotNull("The received message input stream should not be null", bis);
         byte bytes[] = new byte[bis.available()];
@@ -143,8 +151,8 @@
         } catch (IOException ex) {
             ex.printStackTrace();
         }
-        String reponse = new String(bytes);
-        assertEquals("The reponse date should be equals", reponse, "HelloWorld");
+        String response = new String(bytes);
+        assertEquals("The response data should be equal", "HelloWorld", response);
 
         JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);

Modified: cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/PersistenceUtils.java Wed Oct 22 11:19:52 2008
@@ -19,8 +19,6 @@
 
 package org.apache.cxf.ws.rm.persistence;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 
 import javax.xml.bind.JAXBContext;
@@ -30,6 +28,7 @@
 import javax.xml.bind.Unmarshaller;
 
 import org.apache.cxf.common.util.PackageUtils;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
 import org.apache.cxf.ws.rm.SequenceAcknowledgement;
 
 /**
@@ -71,13 +70,13 @@
     }
     
     public InputStream serialiseAcknowledgment(SequenceAcknowledgement ack) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream(); 
+        LoadingByteArrayOutputStream bos = new LoadingByteArrayOutputStream(); 
         try {
             getMarshaller().marshal(ack, bos);
         } catch (JAXBException ex) {
             throw new RMStoreException(ex);
         }
-        return new ByteArrayInputStream(bos.toByteArray());
+        return bos.createInputStream();
     }
     
     private JAXBContext getContext() throws JAXBException {

Modified: cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java (original)
+++ cxf/branches/2.0.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java Wed Oct 22 11:19:52 2008
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.ws.rm.persistence.jdbc;
 
+
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
@@ -45,6 +46,7 @@
 
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.ws.addressing.v200408.EndpointReferenceType;
 import org.apache.cxf.ws.rm.DestinationSequence;
 import org.apache.cxf.ws.rm.Identifier;
@@ -494,7 +496,11 @@
         stmt.setBigDecimal(i++, new BigDecimal(nr));
         stmt.setString(i++, to); 
         byte[] bytes = msg.getContent();    
-        stmt.setBinaryStream(i++, new ByteArrayInputStream(bytes), bytes.length);
+        stmt.setBinaryStream(i++, new ByteArrayInputStream(bytes) {
+            public String toString() {
+                return IOUtils.newStringFromBytes(buf, 0, count);
+            }
+        }, bytes.length);
         stmt.execute();
         LOG.log(Level.FINE, "Successfully stored {0} message number {1} for sequence {2}",
                 new Object[] {outbound ? "outbound" : "inbound", nr, id});

Modified: cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java (original)
+++ cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/ClientServerTest.java Wed Oct 22 11:19:52 2008
@@ -28,7 +28,9 @@
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLEncoder;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -74,6 +76,7 @@
 import org.apache.hello_world_soap_http.SOAPServiceMultiPortTypeTest;
 import org.apache.hello_world_soap_http.types.BareDocumentResponse;
 import org.apache.hello_world_soap_http.types.GreetMeLaterResponse;
+import org.apache.hello_world_soap_http.types.GreetMeResponse;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -485,16 +488,17 @@
         
         
         assertEquals(((TestExecutor)executor).getCount(), 0);
-        try {
-            Greeter greeter = (Greeter)service.getPort(portName, Greeter.class);
-            for (int i = 0; i < 5; i++) {
-                greeter.greetMeAsync("asyn call" + i);
-            }
-        } catch (UndeclaredThrowableException ex) {
-            throw (Exception)ex.getCause();
+        Greeter greeter = (Greeter)service.getPort(portName, Greeter.class);
+        List<Response<GreetMeResponse>> responses = new ArrayList<Response<GreetMeResponse>>();
+        for (int i = 0; i < 5; i++) {
+            responses.add(greeter.greetMeAsync("asyn call" + i));
+        }
+        //wait for all the responses
+        for (Response<GreetMeResponse> resp : responses) {
+            resp.get();
         }
-        
-        assertEquals(((TestExecutor)executor).getCount(), 5);
+            
+        assertEquals(5, ((TestExecutor)executor).getCount());
     }
 
     

Modified: cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?rev=707153&r1=707152&r2=707153&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java (original)
+++ cxf/branches/2.0.x-fixes/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java Wed Oct 22 11:19:52 2008
@@ -91,30 +91,6 @@
     private OutMessageRecorder outRecorder;
     private InMessageRecorder inRecorder;
 
-    private boolean testAll = true;
-    private boolean doTestOnewayAnonymousAcks = testAll;
-    private boolean doTestOnewayDeferredAnonymousAcks = testAll;
-    private boolean doTestOnewayDeferredNonAnonymousAcks = testAll;
-    private boolean doTestOnewayAnonymousAcksSequenceLength1 = testAll;
-    private boolean doTestOnewayAnonymousAcksSuppressed = testAll;
-    private boolean doTestOnewayAnonymousAcksSuppressedAsyncExecutor = testAll;
-    private boolean doTestTwowayNonAnonymous = testAll;
-    private boolean doTestTwowayNonAnonymousEndpointSpecific = testAll;
-    private boolean doTestTwowayNonAnonymousDeferred = testAll;
-    private boolean doTestTwowayNonAnonymousMaximumSequenceLength2 = testAll;
-    private boolean doTestTwowayAtMostOnce = testAll;
-    private boolean doTestUnknownSequence = testAll;
-    private boolean doTestInactivityTimeout = testAll;
-    private boolean doTestOnewayMessageLoss = testAll;
-    private boolean doTestOnewayMessageLossAsyncExecutor = testAll;
-    private boolean doTestTwowayMessageLoss = testAll;
-    private boolean doTestTwowayMessageLossAsyncExecutor = testAll;
-    private boolean doTestTwowayNonAnonymousNoOffer = testAll;
-    private boolean doTestConcurrency = testAll;
-    private boolean doTestMultiClientOneway = testAll;
-    private boolean doTestMultiClientTwoway = testAll;
-    private boolean doTestServerSideMessageLoss = testAll;
-    private boolean doTestTerminateOnShutdown = testAll;
 
     @BeforeClass
     public static void startServers() throws Exception {
@@ -129,13 +105,14 @@
             
     
     @After
-    public void tearDown() {
+    public void tearDown() throws Exception {
         try {
             stopGreeter();
             stopControl();
         } catch (Throwable t) {
             //ignore
         }
+        Thread.sleep(100);
     }
 
     /** 
@@ -186,9 +163,6 @@
     
     @Test
     public void testOnewayAnonymousAcks() throws Exception {
-        if (!doTestOnewayAnonymousAcks) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/rminterceptors.xml");
 
         greeter.greetMeOneWay("once");
@@ -221,9 +195,6 @@
     
     @Test
     public void testOnewayDeferredAnonymousAcks() throws Exception {
-        if (!doTestOnewayDeferredAnonymousAcks) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/deferred.xml");
 
         greeter.greetMeOneWay("once");
@@ -261,9 +232,6 @@
     
     @Test
     public void testOnewayDeferredNonAnonymousAcks() throws Exception {
-        if (!doTestOnewayDeferredNonAnonymousAcks) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/deferred.xml", true);
 
         greeter.greetMeOneWay("once");
@@ -315,9 +283,6 @@
     
     @Test
     public void testOnewayAnonymousAcksSequenceLength1() throws Exception {
-        if (!doTestOnewayAnonymousAcksSequenceLength1) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/seqlength1.xml");
 
         greeter.greetMeOneWay("once");
@@ -358,17 +323,11 @@
    
     @Test
     public void testOnewayAnonymousAcksSuppressed() throws Exception {
-        if (!doTestOnewayAnonymousAcksSuppressed) {
-            return;
-        }
         testOnewayAnonymousAcksSuppressed(null);
     }
 
     @Test
     public void testOnewayAnonymousAcksSuppressedAsyncExecutor() throws Exception {
-        if (!doTestOnewayAnonymousAcksSuppressedAsyncExecutor) {
-            return;
-        }
         testOnewayAnonymousAcksSuppressed(Executors.newSingleThreadExecutor());
     }
 
@@ -418,14 +377,11 @@
     
     @Test
     public void testTwowayNonAnonymous() throws Exception {
-        if (!doTestTwowayNonAnonymous) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/rminterceptors.xml", true);
 
-        greeter.greetMe("one");
-        greeter.greetMe("two");
-        greeter.greetMe("three");
+        assertEquals("ONE", greeter.greetMe("one"));
+        assertEquals("TWO", greeter.greetMe("two"));
+        assertEquals("THREE", greeter.greetMe("three"));
 
         // CreateSequence and three greetMe messages
         // TODO there should be partial responses to the decoupled responses!
@@ -468,9 +424,6 @@
 
     @Test
     public void testTwowayNonAnonymousEndpointSpecific() throws Exception {
-        if (!doTestTwowayNonAnonymousEndpointSpecific) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/twoway-endpoint-specific.xml", true);
 
 
@@ -517,9 +470,6 @@
 
     @Test
     public void testTwowayNonAnonymousDeferred() throws Exception {
-        if (!doTestTwowayNonAnonymousDeferred) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/deferred.xml", true);
 
         greeter.greetMe("one");
@@ -582,10 +532,6 @@
      */
     @Test
     public void testTwowayNonAnonymousMaximumSequenceLength2() throws Exception {
-
-        if (!doTestTwowayNonAnonymousMaximumSequenceLength2) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/seqlength10.xml", true);
         
         RMManager manager = greeterBus.getExtension(RMManager.class);
@@ -642,10 +588,6 @@
     
     @Test
     public void testTwowayAtMostOnce() throws Exception {
-        if (!doTestTwowayAtMostOnce) {
-            return;
-        }
-        
         init("org/apache/cxf/systest/ws/rm/atmostonce.xml");
         
         class MessageNumberInterceptor extends AbstractPhaseInterceptor {
@@ -712,10 +654,6 @@
     
     @Test
     public void testUnknownSequence() throws Exception {
-        if (!doTestUnknownSequence) {
-            return;
-        }
-        
         init("org/apache/cxf/systest/ws/rm/rminterceptors.xml");
         
         class SequenceIdInterceptor extends AbstractPhaseInterceptor {
@@ -751,10 +689,6 @@
     
     @Test
     public void testInactivityTimeout() throws Exception {
-        if (!doTestInactivityTimeout) {
-            return;
-        }
-        
         init("org/apache/cxf/systest/ws/rm/inactivity-timeout.xml");
        
         greeter.greetMe("one");
@@ -815,9 +749,6 @@
 
     @Test    
     public void testOnewayMessageLoss() throws Exception {
-        if (!doTestOnewayMessageLoss) {
-            return;
-        }
         // waite a while for the last bus shutdown
         Thread.sleep(5000);
         testOnewayMessageLoss(null);
@@ -825,9 +756,6 @@
     
     @Test    
     public void testOnewayMessageLossAsyncExecutor() throws Exception {
-        if (!doTestOnewayMessageLossAsyncExecutor) {
-            return;
-        }
         testOnewayMessageLoss(Executors.newSingleThreadExecutor());
     } 
 
@@ -883,17 +811,11 @@
 
     @Test
     public void testTwowayMessageLoss() throws Exception {
-        if (!doTestTwowayMessageLoss) {
-            return;
-        }
         testTwowayMessageLoss(null);
     }
 
     @Test
     public void testTwowayMessageLossAsyncExecutor() throws Exception {
-        if (!doTestTwowayMessageLossAsyncExecutor) {
-            return;
-        }
         testTwowayMessageLoss(Executors.newSingleThreadExecutor());
     }
     
@@ -953,9 +875,6 @@
     
     @Test
     public void testTwowayNonAnonymousNoOffer() throws Exception {
-        if (!doTestTwowayNonAnonymousNoOffer) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/no-offer.xml", true);        
         
         greeter.greetMe("one");
@@ -989,23 +908,21 @@
 
     @Test
     public void testConcurrency() throws Exception {
-        if (!doTestConcurrency) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/rminterceptors.xml", true);
 
-        for (int i = 0; i < 5; i++) {
+        int max = 5;
+        for (int i = 0; i < max; i++) {
             greeter.greetMeAsync(Integer.toString(i));
         }
 
         // CreateSequence and five greetMe messages
         // full and partial responses to each
 
-        awaitMessages(6, 12, 7500);
+        awaitMessages(max + 1, (max * 2) + 1, 7500);
         MessageFlow mf = new MessageFlow(outRecorder.getOutboundMessages(), inRecorder.getInboundMessages());
         
-        mf.verifyMessages(6, true);
-        String[] expectedActions = new String[6];
+        mf.verifyMessages(max + 1, true);
+        String[] expectedActions = new String[max + 1];
         expectedActions[0] = RMConstants.getCreateSequenceAction();
         for (int i = 1; i < expectedActions.length; i++) {
             expectedActions[i] = GREETME_ACTION;
@@ -1015,9 +932,6 @@
   
     @Test
     public void testMultiClientOneway() throws Exception {
-        if (!doTestMultiClientOneway) {
-            return;
-        }
         
         SpringBusFactory bf = new SpringBusFactory();
         String cfgResource = "org/apache/cxf/systest/ws/rm/rminterceptors.xml";            
@@ -1098,10 +1012,6 @@
     
     @Test
     public void testMultiClientTwoway() throws Exception {
-        if (!doTestMultiClientTwoway) {
-            return;
-        }
-        
         SpringBusFactory bf = new SpringBusFactory();
         String cfgResource = "org/apache/cxf/systest/ws/rm/rminterceptors.xml";            
         initControl(bf, cfgResource);
@@ -1191,9 +1101,6 @@
     
     @Test
     public void testServerSideMessageLoss() throws Exception {
-        if (!doTestServerSideMessageLoss) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/message-loss-server.xml", true);
         
         // avoid client side message loss
@@ -1247,9 +1154,6 @@
      
     @Test
     public void testTerminateOnShutdown() throws Exception {
-        if (!doTestTerminateOnShutdown) {
-            return;
-        }
         init("org/apache/cxf/systest/ws/rm/terminate-on-shutdown.xml", true);
         
         greeter.greetMeOneWay("neutrophil");



Mime
View raw message