cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r705280 - in /cxf/branches/2.1.x-fixes: ./ api/src/main/java/org/apache/cxf/endpoint/ api/src/main/java/org/apache/cxf/message/ rt/core/src/main/java/org/apache/cxf/endpoint/ rt/transports/http/src/main/java/org/apache/cxf/transport/http/ r...
Date Thu, 16 Oct 2008 16:46:46 GMT
Author: dkulp
Date: Thu Oct 16 09:46:40 2008
New Revision: 705280

URL: http://svn.apache.org/viewvc?rev=705280&view=rev
Log:
Merged revisions 705274 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r705274 | dkulp | 2008-10-16 12:29:41 -0400 (Thu, 16 Oct 2008) | 2 lines
  
  [CXF-1776] Provide Async invokes and callbacks on the low level Client objects.  Get HTTP
and JMS transports to support the async replies.   Still need to update the JAX-WS frontend
to use it and the JMS transport needs someone to look into it to make sure the changes will
actually be acceptable.  
........

Added:
    cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
      - copied unchanged from r705274, cxf/trunk/api/src/main/java/org/apache/cxf/endpoint/ClientCallback.java
Modified:
    cxf/branches/2.1.x-fixes/   (props changed)
    cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java
    cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java
    cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java
    cxf/branches/2.1.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
    cxf/branches/2.1.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
    cxf/branches/2.1.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/branches/2.1.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java

Propchange: cxf/branches/2.1.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 16 09:46:40 2008
@@ -1 +1 @@
-/cxf/trunk:686333-686363,686764,686820,687096,687194,687363,687387,687463,687543,687722,687798,687814,687817,687891,687910,687914,688086,688102,688133,688596,688735,688870,689572,689596,689855,689924,690067,690289,691246,691271,691295,691338,691355,691488,691602,691646,691706,691728,692116,692157,692310,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694795,694869,694981,694987,694993,695041,695096,695396,695484,695537,695552,695561,695619,695684,695835,695840,695868,695935,695977,696016,696094,696433,696720,697085,697868,698128,699289,700261,700507,700602,700981,701316,701783,701830,701862,702187,702205-702248,702267,702547,702561,702580,702602,702609,702616,702653,702656,702957,703191,703239,703309,703501,703513,703548,704584,704937,704997,705150,705235
+/cxf/trunk:686333-686363,686764,686820,687096,687194,687363,687387,687463,687543,687722,687798,687814,687817,687891,687910,687914,688086,688102,688133,688596,688735,688870,689572,689596,689855,689924,690067,690289,691246,691271,691295,691338,691355,691488,691602,691646,691706,691728,692116,692157,692310,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694795,694869,694981,694987,694993,695041,695096,695396,695484,695537,695552,695561,695619,695684,695835,695840,695868,695935,695977,696016,696094,696433,696720,697085,697868,698128,699289,700261,700507,700602,700981,701316,701783,701830,701862,702187,702205-702248,702267,702547,702561,702580,702602,702609,702616,702653,702656,702957,703191,703239,703309,703501,703513,703548,704584,704937,704997,705150,705235,705274

Propchange: cxf/branches/2.1.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java (original)
+++ cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/endpoint/Client.java Thu Oct
16 09:46:40 2008
@@ -20,6 +20,7 @@
 package org.apache.cxf.endpoint;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import javax.xml.namespace.QName;
 
@@ -97,6 +98,69 @@
     
     
     /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked. The service namespace
will be used
+     * when looking up the BindingOperationInfo.
+     * @param params  The params that matches the parts of the input message of the operation.
 If the 
+     * BindingOperationInfo supports unwrapping, it assumes the params are in the "unwrapped"
form.  If 
+     * params are in the wrapped form, use invokeWrapped
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invoke(ClientCallback callback,
+                    String operationName,
+                    Object... params) throws Exception;
+    
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked
+     * @param params  The params that matches the parts of the input message of the operation.
 If the 
+     * BindingOperationInfo supports unwrapping, it assumes the params are in the "unwrapped"
form.  If 
+     * params are in the wrapped form, use invokeWrapped
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invoke(ClientCallback callback,
+                    QName operationName,
+                    Object... params) throws Exception;
+
+
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked. The service namespace
will be used
+     * when looking up the BindingOperationInfo.
+     * @param params  The params that matches the parts of the input message of the operation
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invokeWrapped(ClientCallback callback,
+                           String operationName,
+                    Object... params) throws Exception;
+    
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param operationName The name of the operation to be invoked
+     * @param params  The params that matches the parts of the input message of the operation
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invokeWrapped(ClientCallback callback,
+                           QName operationName,
+                    Object... params) throws Exception;    
+    
+    /**
+     * Invokes an operation asyncronously
+     * @param callback The callback that is called when the response is ready
+     * @param oi  The operation to be invoked
+     * @param params  The params that matches the parts of the input message of the operation
+     * @return The return values that matche the parts of the output message of the operation
+     */
+    void invoke(ClientCallback callback,
+                BindingOperationInfo oi,
+                Object... params) throws Exception;    
+    
+    
+    /**
      * Gets the request context used for future invocations
      * @return context The context
      */
@@ -151,4 +215,11 @@
      *
      */
     void destroy();
+    
+    /**
+     * Sets the executor which is used to process Asynchronous responses.  The default
+     * is to use the threads provided by the transport.  (example: the JMS listener threads)

+     * @param executor
+     */
+    void setExecutor(Executor executor);
 }

Modified: cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java (original)
+++ cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/Exchange.java Thu Oct
16 09:46:40 2008
@@ -63,6 +63,14 @@
      * @return true if the exchange is known to be a one-way exchange
      */
     boolean isOneWay();
+    
+    /**
+     * @return true if the frontend will be wait for the response.   Transports
+     * can then optimize themselves to process the response immediately instead 
+     * of using a background thread or similar.
+     */
+    boolean isSynchronous();
+    void setSynchronous(boolean b);
 
     /**
      * 

Modified: cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java (original)
+++ cxf/branches/2.1.x-fixes/api/src/main/java/org/apache/cxf/message/ExchangeImpl.java Thu
Oct 16 09:46:40 2008
@@ -30,6 +30,7 @@
 
     private Destination destination;
     private boolean oneWay;
+    private boolean synchronous = true;
     
     private Message inMessage;
     private Message outMessage;
@@ -104,6 +105,14 @@
     public void setOneWay(boolean b) {
         oneWay = b;
     }
+    
+    public boolean isSynchronous() {
+        return synchronous;
+    }
+
+    public void setSynchronous(boolean b) {
+        synchronous = b;
+    }
 
     public Session getSession() {
         return session;

Modified: cxf/branches/2.1.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
(original)
+++ cxf/branches/2.1.x-fixes/rt/core/src/main/java/org/apache/cxf/endpoint/ClientImpl.java
Thu Oct 16 09:46:40 2008
@@ -26,6 +26,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -89,6 +90,8 @@
 
     protected ThreadLocal <Map<String, Object>> responseContext =
             new ThreadLocal<Map<String, Object>>();
+    
+    protected Executor executor;
 
 
     public ClientImpl(Bus b, Endpoint e) {
@@ -312,6 +315,104 @@
             }
         }
     }
+    
+    public void invoke(ClientCallback callback, 
+                       String operationName, 
+                       Object... params) throws Exception {
+        QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
+        invoke(callback, q, params);        
+    }
+
+    public void invoke(ClientCallback callback, 
+                       QName operationName, 
+                       Object... params) throws Exception {
+        BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
+        if (op == null) {
+            throw new UncheckedException(
+                new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
+        }
+        
+        if (op.isUnwrappedCapable()) {
+            op = op.getUnwrappedOperation();
+        }
+        
+        invoke(callback, op, params);
+    }
+
+
+    public void invokeWrapped(ClientCallback callback, 
+                              String operationName, 
+                              Object... params)
+        throws Exception {
+        QName q = new QName(getEndpoint().getService().getName().getNamespaceURI(), operationName);
+        invokeWrapped(callback, q, params);        
+    }
+
+    public void invokeWrapped(ClientCallback callback, 
+                              QName operationName, 
+                              Object... params)
+        throws Exception {
+        BindingOperationInfo op = getEndpoint().getEndpointInfo().getBinding().getOperation(operationName);
+        if (op == null) {
+            throw new UncheckedException(
+                new org.apache.cxf.common.i18n.Message("NO_OPERATION", LOG, operationName));
+        }
+        invoke(callback, op, params);
+    }
+
+    
+    public void invoke(ClientCallback callback, 
+                       BindingOperationInfo oi, 
+                       Object... params) throws Exception {
+        Bus origBus = BusFactory.getThreadDefaultBus(false);
+        BusFactory.setThreadDefaultBus(bus);
+        try {
+            Exchange exchange = new ExchangeImpl();
+            exchange.setSynchronous(false);
+            Endpoint endpoint = getEndpoint();
+            Map<String, Object> context = new HashMap<String, Object>();
+            Map<String, Object> resp = getResponseContext();
+            resp.clear();
+            Map<String, Object> reqContext = new HashMap<String, Object>(getRequestContext());
+            context.put(RESPONSE_CONTEXT, resp);
+            context.put(REQUEST_CONTEXT, reqContext);
+
+            Message message = endpoint.getBinding().createMessage();
+            message.put(Message.INVOCATION_CONTEXT, context);
+            
+            //setup the message context
+            setContext(reqContext, message);
+            setParameters(params, message);
+    
+            if (null != reqContext) {
+                exchange.putAll(reqContext);
+            }
+            exchange.setOneWay(oi.getOutput() == null);
+            exchange.setOutMessage(message);
+            exchange.put(ClientCallback.class, callback);
+            
+            setOutMessageProperties(message, oi);
+            setExchangeProperties(exchange, endpoint, oi);
+            
+            // setup chain
+    
+            PhaseInterceptorChain chain = setupInterceptorChain(endpoint);
+            message.setInterceptorChain(chain);
+            
+            modifyChain(chain, reqContext);
+            chain.setFaultObserver(outFaultObserver);
+            
+            // setup conduit selector
+            prepareConduitSelector(message);
+            
+            // execute chain        
+            chain.doIntercept(message);
+
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
+        }       
+    }
+    
     public Object[] invoke(BindingOperationInfo oi,
                            Object[] params, 
                            Map<String, Object> context,
@@ -322,6 +423,7 @@
             if (exchange == null) {
                 exchange = new ExchangeImpl();
             }
+            exchange.setSynchronous(true);
             Endpoint endpoint = getEndpoint();
             
             Map<String, Object> reqContext = null;
@@ -363,62 +465,68 @@
             // execute chain        
             chain.doIntercept(message);
     
+            return processResult(message, exchange, oi, resContext);
             
-            // Check to see if there is a Fault from the outgoing chain
-            Exception ex = message.getContent(Exception.class);
-            boolean mepCompleteCalled = false;
-            if (ex != null) {
-                getConduitSelector().complete(exchange);
-                mepCompleteCalled = true;
-                if (message.getContent(Exception.class) != null) {
-                    throw ex;
-                }
-            }
-            ex = message.getExchange().get(Exception.class);
-            if (ex != null) {
-                if (!mepCompleteCalled) {
-                    getConduitSelector().complete(exchange);
-                }
+        } finally {
+            BusFactory.setThreadDefaultBus(origBus);
+        }
+    }
+
+    private Object[] processResult(Message message, 
+                                   Exchange exchange,
+                                   BindingOperationInfo oi,
+                                   Map<String, Object> resContext) throws Exception
{
+     // Check to see if there is a Fault from the outgoing chain
+        Exception ex = message.getContent(Exception.class);
+        boolean mepCompleteCalled = false;
+        if (ex != null) {
+            getConduitSelector().complete(exchange);
+            mepCompleteCalled = true;
+            if (message.getContent(Exception.class) != null) {
                 throw ex;
             }
-            
-            // Wait for a response if we need to
-            if (!oi.getOperationInfo().isOneWay()) {
-                synchronized (exchange) {
-                    waitResponse(exchange);
-                }
-            }
-            getConduitSelector().complete(exchange);
-    
-            // Grab the response objects if there are any
-            List resList = null;
-            Message inMsg = exchange.getInMessage();
-            if (inMsg != null) {
-                if (null != resContext) {                   
-                    resContext.putAll(inMsg);
-                    if (LOG.isLoggable(Level.FINE)) {
-                        LOG.fine("set responseContext to be" + responseContext);
-                    }
-                }
-                resList = inMsg.getContent(List.class);
+        }
+        ex = message.getExchange().get(Exception.class);
+        if (ex != null) {
+            if (!mepCompleteCalled) {
+                getConduitSelector().complete(exchange);
             }
-            
-            // check for an incoming fault
-            ex = getException(exchange);
-            
-            if (ex != null) {
-                throw ex;
+            throw ex;
+        }
+        
+        // Wait for a response if we need to
+        if (oi != null && !oi.getOperationInfo().isOneWay()) {
+            synchronized (exchange) {
+                waitResponse(exchange);
             }
-            
-            if (resList != null) {
-                return resList.toArray();
+        }
+        getConduitSelector().complete(exchange);
+
+        // Grab the response objects if there are any
+        List resList = null;
+        Message inMsg = exchange.getInMessage();
+        if (inMsg != null) {
+            if (null != resContext) {                   
+                resContext.putAll(inMsg);
+                if (LOG.isLoggable(Level.FINE)) {
+                    LOG.fine("set responseContext to be" + responseContext);
+                }
             }
-            return null;
-        } finally {
-            BusFactory.setThreadDefaultBus(origBus);
+            resList = inMsg.getContent(List.class);
+        }
+        
+        // check for an incoming fault
+        ex = getException(exchange);
+        
+        if (ex != null) {
+            throw ex;
+        }
+        
+        if (resList != null) {
+            return resList.toArray();
         }
+        return null;
     }
-
     protected Exception getException(Exchange exchange) {
         if (exchange.getInFaultMessage() != null) {
             return exchange.getInFaultMessage().getContent(Exception.class);
@@ -461,7 +569,9 @@
     }
     
     public void onMessage(Message message) {
+        ClientCallback callback = message.getExchange().get(ClientCallback.class);
         Endpoint endpoint = message.getExchange().get(Endpoint.class);
+        message.getExchange().setInMessage(message);
         if (endpoint == null) {
             // in this case correlation will occur outside the transport,
             // however there's a possibility that the endpoint may have been 
@@ -476,8 +586,6 @@
         message.put(Message.INBOUND_MESSAGE, Boolean.TRUE);
         PhaseManager pm = bus.getExtension(PhaseManager.class);
         
-        
-        
         List<Interceptor> i1 = bus.getInInterceptors();
         if (LOG.isLoggable(Level.FINE)) {
             LOG.fine("Interceptors contributed by bus: " + i1);
@@ -498,13 +606,20 @@
         PhaseInterceptorChain chain = inboundChainCache.get(pm.getInPhases(), i1, i2, i3,
i4); 
         message.setInterceptorChain(chain);
         
-        
         chain.setFaultObserver(outFaultObserver);
         
         Bus origBus = BusFactory.getThreadDefaultBus(false);
         BusFactory.setThreadDefaultBus(bus);
         // execute chain
         try {
+            if (callback != null) {
+                if (callback.isCancelled()) {
+                    getConduitSelector().complete(message.getExchange());
+                    return;
+                }
+                callback.start(message);
+            }
+            
             String startingAfterInterceptorID = (String) message.get(
                 PhaseInterceptorChain.STARTING_AFTER_INTERCEPTOR_ID);
             String startingInterceptorID = (String) message.get(
@@ -516,6 +631,22 @@
             } else {
                 chain.doIntercept(message);
             }
+            
+            if (callback != null) {
+                Map<String, Object> resCtx = CastUtils.cast((Map<?, ?>)message
+                                                                .getExchange()
+                                                                .getOutMessage()
+                                                                .get(Message.INVOCATION_CONTEXT));
+                resCtx = CastUtils.cast((Map<?, ?>)resCtx.get(RESPONSE_CONTEXT));
+                
+                try {
+                    Object obj[] = processResult(message, message.getExchange(),
+                                                 null, resCtx);
+                    callback.handleResponse(resCtx, obj);
+                } catch (Exception ex) {
+                    callback.handleException(resCtx, ex);
+                }
+            }
         } finally {
             synchronized (message.getExchange()) {
                 if (!isPartialResponse(message)) {
@@ -566,7 +697,19 @@
             exchange.put(OperationInfo.class, boi.getOperationInfo());
         }
                 
-        exchange.put(MessageObserver.class, this);
+        if (exchange.isSynchronous() || executor == null) {
+            exchange.put(MessageObserver.class, this);
+        } else {
+            exchange.put(MessageObserver.class, new MessageObserver() {
+                public void onMessage(final Message message) {
+                    executor.execute(new Runnable() {
+                        public void run() {
+                            ClientImpl.this.onMessage(message);
+                        }
+                    });
+                }
+            });            
+        }
         exchange.put(Retryable.class, this);
         exchange.put(Client.class, this);
         exchange.put(Bus.class, bus);
@@ -691,4 +834,11 @@
             super.putAll(shared);
         }
     }
+
+
+    public void setExecutor(Executor executor) {
+        this.executor = executor;
+    }
+
+
 }

Modified: cxf/branches/2.1.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
(original)
+++ cxf/branches/2.1.x-fixes/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
Thu Oct 16 09:46:40 2008
@@ -66,6 +66,8 @@
 import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.http.policy.PolicyUtils;
 import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
+import org.apache.cxf.workqueue.AutomaticWorkQueue;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.policy.Assertor;
 import org.apache.cxf.ws.policy.PolicyEngine;
@@ -1934,6 +1936,30 @@
             // Process retransmits until we fall out.
             handleRetransmits();
             
+            if (outMessage == null 
+                || outMessage.getExchange() == null
+                || outMessage.getExchange().isSynchronous()) {
+                handleResponseInternal();
+            } else {
+                Runnable runnable = new Runnable() {
+                    public void run() {
+                        try {
+                            handleResponseInternal();
+                        } catch (IOException e) {
+                            LOG.log(Level.WARNING, e.getMessage(), e);
+                        }
+                    }
+                };
+                WorkQueueManager mgr = outMessage.getExchange().get(Bus.class)
+                    .getExtension(WorkQueueManager.class);
+                AutomaticWorkQueue queue = mgr.getNamedWorkQueue("http-conduit");
+                if (queue == null) {
+                    queue = mgr.getAutomaticWorkQueue();
+                }
+                queue.execute(runnable);
+            }
+        }
+        protected void handleResponseInternal() throws IOException {
             int responseCode = connection.getResponseCode();
             
             if (LOG.isLoggable(Level.FINE)) {

Modified: cxf/branches/2.1.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
(original)
+++ cxf/branches/2.1.x-fixes/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
Thu Oct 16 09:46:40 2008
@@ -329,6 +329,8 @@
         message.setExchange(exchange);
         exchange.isOneWay();
         EasyMock.expectLastCall().andReturn(true);
+        exchange.isSynchronous();
+        EasyMock.expectLastCall().andReturn(true);
     }
 
     private HTTPConduit setUpConduit(

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Thu Oct 16 09:46:40 2008
@@ -56,14 +56,14 @@
 
     private EndpointInfo endpointInfo;
     private JMSConfiguration jmsConfig;
-    private Map<String, Message> correlationMap;
+    private Map<String, Exchange> correlationMap;
     private DefaultMessageListenerContainer jmsListener;
 
     public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration
jmsConfig) {
         super(target);
         this.jmsConfig = jmsConfig;
         this.endpointInfo = endpointInfo;
-        correlationMap = new ConcurrentHashMap<String, Message>();
+        correlationMap = new ConcurrentHashMap<String, Exchange>();
     }
 
     /**
@@ -128,25 +128,26 @@
          * fill to Message and notify this thread
          */
         if (!exchange.isOneWay()) {
-            Message inMessage = new MessageImpl();
-            synchronized (inMessage) {
-                correlationMap.put(correlationId, inMessage);
+            synchronized (exchange) {
+                correlationMap.put(correlationId, exchange);
                 jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
-                try {
-                    inMessage.wait(jmsTemplate.getReceiveTimeout());
-                } catch (InterruptedException e) {
+                
+                if (exchange.isSynchronous()) {
+                    try {
+                        exchange.wait(jmsTemplate.getReceiveTimeout());
+                    } catch (InterruptedException e) {
+                        correlationMap.remove(correlationId);
+                        throw new RuntimeException(e);
+                    }
                     correlationMap.remove(correlationId);
-                    throw new RuntimeException(e);
+                    if (exchange.getInMessage() == null
+                        && exchange.getInFaultMessage() == null) {
+                        throw new RuntimeException("Timeout receiving message with correlationId
"
+                                                   + correlationId);
+                    }
+                    
+                    
                 }
-                correlationMap.remove(correlationId);
-                if (inMessage.getContent(InputStream.class) == null) {
-                    throw new RuntimeException("Timeout receiving message with correlationId
"
-                                               + correlationId);
-                }
-            }
-            exchange.setInMessage(inMessage);
-            if (incomingObserver != null) {
-                incomingObserver.onMessage(inMessage);
             }
         } else {
             jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
@@ -165,20 +166,30 @@
         } catch (JMSException e) {
             throw JmsUtils.convertJmsAccessException(e);
         }
-        Message inMessage = correlationMap.get(correlationId);
-        if (inMessage == null) {
+
+        Exchange exchange = correlationMap.remove(correlationId);
+        if (exchange == null) {
             LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
+            return;
         }
+        Message inMessage = new MessageImpl();
+        exchange.setInMessage(inMessage);
         LOG.log(Level.FINE, "client received reply: ", jmsMessage);
         JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
         byte[] response = JMSUtils.retrievePayload(jmsMessage);
         LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]");
         inMessage.setContent(InputStream.class, new ByteArrayInputStream(response));
 
-        synchronized (inMessage) {
-            inMessage.notifyAll();
+        if (exchange.isSynchronous()) {
+            synchronized (exchange) {
+                exchange.notifyAll();
+            }
+        }
+        
+        //REVISIT: put on a workqueue?
+        if (incomingObserver != null) {
+            incomingObserver.onMessage(exchange.getInMessage());
         }
-
     }
 
     public void close() {

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
(original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
Thu Oct 16 09:46:40 2008
@@ -91,6 +91,7 @@
 
         Exchange exchange = new ExchangeImpl();
         exchange.setOneWay(isOneWay);
+        exchange.setSynchronous(true);
         message.setExchange(exchange);
         exchange.setOutMessage(message);
         try {

Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
(original)
+++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
Thu Oct 16 09:46:40 2008
@@ -93,11 +93,12 @@
         setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl",
                          "HelloWorldServiceLoop", "HelloWorldPortLoop");
         JMSConduit conduit = setupJMSConduit(true, false);
-        conduit.getJmsConfig().setReceiveTimeout(1000);
+        conduit.getJmsConfig().setReceiveTimeout(10000);
 
         try {
-            for (int c = 0; c < 100; c++) {
+            for (int c = 0; c < 10; c++) {
                 LOG.info("Sending message " + c);
+                inMessage = null;
                 Message message = new MessageImpl();
                 sendoutMessage(conduit, message, false);
                 verifyReceivedMessage(message);
@@ -133,7 +134,14 @@
         }
     }
 
-    private void verifyReceivedMessage(Message message) {
+    private void verifyReceivedMessage(Message message) throws InterruptedException {
+        while (inMessage == null) {
+            //the send has completed, but the response might not be back yet.
+            //wait for it.
+            synchronized (this) {
+                wait(10);
+            }
+        }
         ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
         Assert.assertNotNull("The received message input stream should not be null", bis);
         byte bytes[] = new byte[bis.available()];
@@ -142,8 +150,8 @@
         } catch (IOException ex) {
             ex.printStackTrace();
         }
-        String reponse = IOUtils.newStringFromBytes(bytes);
-        assertEquals("The reponse date should be equals", reponse, "HelloWorld");
+        String response = IOUtils.newStringFromBytes(bytes);
+        assertEquals("The response data should be equal", "HelloWorld", response);
 
         JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);

Modified: cxf/branches/2.1.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java?rev=705280&r1=705279&r2=705280&view=diff
==============================================================================
--- cxf/branches/2.1.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java
(original)
+++ cxf/branches/2.1.x-fixes/systests/src/test/java/org/apache/cxf/systest/jaxws/JaxWsDynamicClientTest.java
Thu Oct 16 09:46:40 2008
@@ -24,6 +24,7 @@
 
 
 import org.apache.cxf.endpoint.Client;
+import org.apache.cxf.endpoint.ClientCallback;
 import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.jaxws.endpoint.dynamic.JaxWsDynamicClientFactory;
 import org.apache.cxf.no_body_parts.types.Operation1;
@@ -72,6 +73,12 @@
         Object[] rparts = client.invoke("operation1", parameters, bucketOfBytes);
         Operation1Response r = (Operation1Response)rparts[0];
         assertEquals(md5(bucketOfBytes), r.getStatus());
+        
+        ClientCallback callback = new ClientCallback();
+        client.invoke(callback, "operation1", parameters, bucketOfBytes);
+        rparts = callback.get();
+        r = (Operation1Response)rparts[0];
+        assertEquals(md5(bucketOfBytes), r.getStatus());
     }
 
 }



Mime
View raw message