cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r892469 - in /cxf/trunk: rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ systests/transports/src/test/java/org...
Date Sat, 19 Dec 2009 13:06:06 GMT
Author: dkulp
Date: Sat Dec 19 13:06:05 2009
New Revision: 892469

URL: http://svn.apache.org/viewvc?rev=892469&view=rev
Log:
Restore Async methods over JMS to actually be async and not consume a
bunch of threads.

Added:
    cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml   (with props)
Modified:
    cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
    cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
    cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl

Modified: cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
(original)
+++ cxf/trunk/rt/frontend/jaxws/src/main/java/org/apache/cxf/jaxws/JAXWSMethodDispatcher.java
Sat Dec 19 13:06:05 2009
@@ -23,8 +23,11 @@
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.lang.reflect.TypeVariable;
+import java.util.concurrent.Future;
 import java.util.logging.Logger;
 
+import javax.xml.ws.Response;
+
 import org.apache.cxf.common.i18n.Message;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.endpoint.Endpoint;
@@ -49,8 +52,16 @@
         int i = 0;
         for (Method m : methods) {
             try {
-                newMethods[i++] = getImplementationMethod(m);
+                newMethods[i] = getImplementationMethod(m);
+                i++;
             } catch (NoSuchMethodException e) {
+                if (m.getName().endsWith("Async")
+                    && (Future.class.equals(m.getReturnType())
+                        || Response.class.equals(m.getReturnType()))) {
+                    newMethods[i] = m;
+                    i++;
+                    continue;
+                }
                 Class endpointClass = implInfo.getImplementorClass();
                 Message msg = new Message("SEI_METHOD_NOT_FOUND", LOG, 
                                           m.getName(), endpointClass.getName());

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Sat Dec 19 13:06:05 2009
@@ -22,15 +22,22 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
+import java.lang.ref.WeakReference;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.MessageListener;
 import javax.jms.Session;
 
+import org.apache.cxf.Bus;
+import org.apache.cxf.buslifecycle.BusLifeCycleListener;
+import org.apache.cxf.buslifecycle.BusLifeCycleManager;
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.configuration.ConfigurationException;
 import org.apache.cxf.message.Exchange;
@@ -39,9 +46,13 @@
 import org.apache.cxf.message.MessageUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.workqueue.WorkQueueManager;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.MessageCreator;
+import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.support.JmsUtils;
 
 /**
  * JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if
the transport
@@ -49,7 +60,7 @@
  * a JMS destination. If the Exchange is not oneway it then recevies the response and converts
it to a CXF
  * Message. This is then provided in the Exchange and also sent to the incomingObserver
  */
-public class JMSConduit extends AbstractConduit implements JMSExchangeSender {
+public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener
{
 
     static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class);
     
@@ -57,11 +68,21 @@
     
     private EndpointInfo endpointInfo;
     private JMSConfiguration jmsConfig;
+    private Map<String, Exchange> correlationMap = new ConcurrentHashMap<String,
Exchange>();
+    private DefaultMessageListenerContainer jmsListener;
+    private DefaultMessageListenerContainer allListener;
     private String conduitId;
     private AtomicLong messageCount;
-
-    public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration
jmsConfig) {
+    private int outstandingAsync;
+    private JMSBusLifeCycleListener listener;
+    private Bus bus;
+
+    public JMSConduit(EndpointInfo endpointInfo,
+                      EndpointReferenceType target,
+                      JMSConfiguration jmsConfig,
+                      Bus b) {
         super(target);
+        bus = b;
         this.jmsConfig = jmsConfig;
         this.endpointInfo = endpointInfo;
         conduitId = UUID.randomUUID().toString().replaceAll("-", "");
@@ -82,6 +103,31 @@
         JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload);
         message.setContent(OutputStream.class, out);
     }
+    
+    private synchronized AbstractMessageListenerContainer getJMSListener() {
+        if (jmsListener == null) {
+            jmsListener = JMSFactory.createJmsListener(jmsConfig, 
+                                                       this, 
+                                                       jmsConfig.getReplyDestination(), 
+                                                       conduitId, 
+                                                       false);
+            addBusListener();
+        }
+        ++outstandingAsync;
+        return jmsListener;
+    }
+    private synchronized AbstractMessageListenerContainer getAllListener() {
+        if (allListener == null) {
+            allListener = JMSFactory.createJmsListener(jmsConfig, 
+                                                       this, 
+                                                       jmsConfig.getReplyDestination(), 
+                                                       null, 
+                                                       true);
+            addBusListener();
+        }
+        ++outstandingAsync;
+        return allListener;
+    }
 
     /**
      * Send the JMS Request out and if not oneWay receive the response
@@ -126,8 +172,9 @@
             if (userCID != null) {
                 correlationId = userCID;
             } else if (!jmsConfig.isSetConduitSelectorPrefix()
-                       && (!jmsConfig.isSetUseConduitIdSelector() || !jmsConfig
-                           .isUseConduitIdSelector())) {
+                       && (exchange.isSynchronous() || exchange.isOneWay())
+                       && (!jmsConfig.isSetUseConduitIdSelector() 
+                           || !jmsConfig.isUseConduitIdSelector())) {
                 messageIdPattern = true;
             } else { 
                 if (jmsConfig.isUseConduitIdSelector()) {
@@ -145,10 +192,18 @@
         Destination replyToDestination = null;
         if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() && isSetReplyTo(outMessage)
             && replyTo != null) {
-            replyToDestination = JMSFactory.resolveOrCreateDestination(jmsTemplate, replyTo,
-                                                                       jmsConfig.isPubSubDomain());
+            if (exchange.isSynchronous() || exchange.isOneWay()) {
+                replyToDestination = JMSFactory.resolveOrCreateDestination(jmsTemplate, replyTo,
+                                                                           jmsConfig.isPubSubDomain());
+            } else {
+                if (userCID == null || !jmsConfig.isUseConduitIdSelector()) { 
+                    replyToDestination = getJMSListener().getDestination();
+                } else {
+                    replyToDestination = getAllListener().getDestination();
+                }
+            }
         }
-        
+
         final String cid = correlationId; 
         final Destination rtd = replyToDestination;
         class JMSConduitMessageCreator implements MessageCreator {
@@ -159,6 +214,9 @@
                 jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(jmsConfig, outMessage,
request,
                                                                     messageType, session,
rtd,
                                                                     cid);
+                if (!exchange.isSynchronous() && !exchange.isOneWay()) {
+                    correlationMap.put(cid, exchange);
+                }
                 LOG.log(Level.FINE, "client sending request: ", jmsMessage);
                 return jmsMessage;
             }
@@ -186,15 +244,17 @@
                     correlationId = messageCreator.getMessageID();
                 }
                 headers.setJMSMessageID(messageCreator.getMessageID());
-                
-                String messageSelector = "JMSCorrelationID = '" + correlationId + "'";
-                javax.jms.Message replyMessage = jmsTemplate.receiveSelected(replyToDestination,
-                                                                             messageSelector);
-                if (replyMessage == null) {
-                    throw new RuntimeException("Timeout receiving message with correlationId
"
-                                               + correlationId);
-                } else {
-                    doReplyMessage(exchange, replyMessage);
+
+                final String messageSelector = "JMSCorrelationID = '" + correlationId + "'";
+                if (exchange.isSynchronous()) {
+                    javax.jms.Message replyMessage = jmsTemplate.receiveSelected(replyToDestination,
+                                                                                 messageSelector);
+                    if (replyMessage == null) {
+                        throw new RuntimeException("Timeout receiving message with correlationId
"
+                                                   + correlationId);
+                    } else {
+                        doReplyMessage(exchange, replyMessage);
+                    }
                 }
             }
         } else {
@@ -203,6 +263,85 @@
         }
     }
 
+    static class JMSBusLifeCycleListener implements BusLifeCycleListener {
+        final WeakReference<JMSConduit> ref;
+        BusLifeCycleManager blcm;
+        JMSBusLifeCycleListener(JMSConduit c, BusLifeCycleManager b) {
+            ref = new WeakReference<JMSConduit>(c);
+            blcm = b;
+            blcm.registerLifeCycleListener(this);
+        }
+        
+        public void initComplete() {
+        }
+
+        public void postShutdown() {
+        }
+
+        public void preShutdown() {
+            unreg();
+            blcm = null;
+            JMSConduit c = ref.get();
+            if (c != null) {
+                c.listener = null;
+                c.close();
+            }
+        }
+        public void unreg() {
+            if (blcm != null) {
+                blcm.unregisterLifeCycleListener(this);
+            }
+        }
+    }
+    private synchronized void addBusListener() {
+        if (listener == null && bus != null) {
+            BusLifeCycleManager blcm = bus.getExtension(BusLifeCycleManager.class);
+            if (blcm != null) {
+                listener = new JMSBusLifeCycleListener(this,
+                                                       blcm);
+            }
+        }
+    }
+
+    /**
+     * When a message is received on the reply destination the correlation map is searched
for the
+     * correlationId. If it is found the message is converted to a CXF message and the thread
sending the
+     * request is notified {@inheritDoc}
+     */
+    public void onMessage(javax.jms.Message jmsMessage) {
+        String correlationId;
+        try {
+            correlationId = jmsMessage.getJMSCorrelationID();
+        } catch (JMSException e) {
+            throw JmsUtils.convertJmsAccessException(e);
+        }
+
+        Exchange exchange = correlationMap.remove(correlationId);
+        if (exchange == null) {
+            LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
+            return;
+        }
+        doReplyMessage(exchange, jmsMessage);
+        maybeShutdownListeners();
+    }
+    private synchronized void maybeShutdownListenersInternal() {
+        if (outstandingAsync == 0) {
+            shutdownListeners();
+        }        
+    }
+    private synchronized void maybeShutdownListeners() {
+        if (outstandingAsync > 0) {
+            --outstandingAsync;
+        }
+        if (outstandingAsync == 0) {
+            bus.getExtension(WorkQueueManager.class).getAutomaticWorkQueue().execute(new
Runnable() {
+                public void run() {
+                    maybeShutdownListenersInternal();
+                }
+            });
+        }
+    }
+
     /**
      * Here we just deal with the reply message
      */
@@ -222,7 +361,6 @@
                 }
             }
         
-            //REVISIT: put on a workqueue?
             if (incomingObserver != null) {
                 incomingObserver.onMessage(exchange.getInMessage());
             }
@@ -231,7 +369,22 @@
         }
     }
 
-    public void close() {
+    public synchronized void shutdownListeners() {
+        if (listener != null) {
+            listener.unreg();
+            listener = null;
+        }
+        if (jmsListener != null) {
+            jmsListener.shutdown();
+            jmsListener = null;
+        }
+        if (allListener != null) {
+            allListener.shutdown();
+            allListener = null;
+        }        
+    }
+    public synchronized void close() {
+        shutdownListeners();
         jmsConfig.destroyWrappedConnectionFactory();
         LOG.log(Level.FINE, "JMSConduit closed ");
     }
@@ -252,10 +405,11 @@
         Boolean ret = (Boolean)message.get(JMSConstants.JMS_SET_REPLY_TO);
         return ret == null || (ret != null && ret.booleanValue());
     }
-    
+
     @Override
     protected void finalize() throws Throwable {
         close();
         super.finalize();
     }
+
 }
\ No newline at end of file

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
Sat Dec 19 13:06:05 2009
@@ -160,14 +160,59 @@
         return createJmsListener(jmsListener,
                                  jmsConfig,
                                  listenerHandler,
-                                 destinationName);            
+                                 destinationName,
+                                 null, null, false);            
+    }
+    /**
+     * Create and start listener using configuration information from jmsConfig. Uses
+     * resolveOrCreateDestination to determine the destination for the listener.
+     * 
+     * @param jmsConfig configuration information
+     * @param listenerHandler object to be called when a message arrives
+     * @param destinationName null for temp dest or a destination name
+     * @param messageSelectorPrefix prefix for the messageselector
+     * @return
+     */
+    public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
+                                                                    MessageListener listenerHandler,
+                                                                    Destination destination,

+                                                                    String messageSelectorPrefix,
+                                                                    boolean userCID) {
+        DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
+            ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
+        
+        return createJmsListener(jmsListener,
+                                 jmsConfig,
+                                 listenerHandler,
+                                 null,
+                                 destination, 
+                                 messageSelectorPrefix,
+                                 userCID);    
+    }
+    public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
+                                                                    MessageListener listenerHandler,
+                                                                    String destination, 
+                                                                    String messageSelectorPrefix,
+                                                                    boolean userCID) {
+        DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
+            ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
+        
+        return createJmsListener(jmsListener,
+                                 jmsConfig,
+                                 listenerHandler,
+                                 destination,
+                                 null, 
+                                 messageSelectorPrefix,
+                                 userCID);    
     }
-    
     public static DefaultMessageListenerContainer createJmsListener(
                           DefaultMessageListenerContainer jmsListener,
                           JMSConfiguration jmsConfig,
                           MessageListener listenerHandler,
-                          String destinationName) {
+                          String destinationName,
+                          Destination destination,
+                          String messageSelectorPrefix,
+                          boolean userCID) {
         
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
         jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
@@ -202,18 +247,24 @@
             jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
         }
         String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
-        if (staticSelectorPrefix.length() > 0) {
+        if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector())
{
+            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
+                                        + staticSelectorPrefix 
+                                        + messageSelectorPrefix + "%'");
+        } else if (staticSelectorPrefix.length() > 0) {
             jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
                                         + staticSelectorPrefix +  "%'");
         }
+        
         if (jmsConfig.getDestinationResolver() != null) {
             jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
         }
         if (jmsConfig.getTaskExecutor() != null) {
             jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor());
         } 
-        
-        if (jmsConfig.isAutoResolveDestination()) {
+        if (destination != null) {
+            jmsListener.setDestination(destination);
+        } else if (jmsConfig.isAutoResolveDestination()) {
             jmsListener.setDestinationName(destinationName);
         } else {
             JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null);

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
Sat Dec 19 13:06:05 2009
@@ -68,7 +68,7 @@
     public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws
IOException {
         JMSOldConfigHolder old = new JMSOldConfigHolder();
         JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo,
true);
-        return new JMSConduit(endpointInfo, target, jmsConf);
+        return new JMSConduit(endpointInfo, target, jmsConf, bus);
     }
 
     /**

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
(original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
Sat Dec 19 13:06:05 2009
@@ -117,7 +117,7 @@
         
         JMSConfiguration jmsConfig = new JMSOldConfigHolder()
             .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true);
-        JMSConduit jmsConduit = new JMSConduit(endpointInfo, target, jmsConfig);
+        JMSConduit jmsConduit = new JMSConduit(endpointInfo, target, jmsConfig, bus);
         if (send) {
             // setMessageObserver
             observer = new MessageObserver() {

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
(original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
Sat Dec 19 13:06:05 2009
@@ -25,14 +25,18 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import javax.activation.DataHandler;
 import javax.jms.DeliveryMode;
 import javax.xml.namespace.QName;
+import javax.xml.ws.AsyncHandler;
 import javax.xml.ws.Binding;
 import javax.xml.ws.BindingProvider;
 import javax.xml.ws.Endpoint;
 import javax.xml.ws.Holder;
+import javax.xml.ws.Response;
 import javax.xml.ws.soap.SOAPBinding;
 import javax.xml.ws.soap.SOAPFaultException;
 
@@ -153,8 +157,7 @@
                     fail("Should have thrown FaultException");
                 } catch (PingMeFault ex) {
                     assertNotNull(ex.getFaultInfo());
-                }                
-              
+                }
             }
         } catch (UndeclaredThrowableException ex) {
             throw (Exception)ex.getCause();
@@ -205,6 +208,117 @@
     }
 
     @Test
+    public void testAsyncCall() throws Exception {
+        QName serviceName = getServiceName(new QName("http://cxf.apache.org/hello_world_jms",

+            "HelloWorldService"));
+        QName portName = getPortName(new QName("http://cxf.apache.org/hello_world_jms", "HelloWorldPort"));
+        URL wsdl = getWSDLURL("/wsdl/jms_test.wsdl");
+        assertNotNull(wsdl);
+        
+        HelloWorldService service = new HelloWorldService(wsdl, serviceName);
+        assertNotNull(service);
+        HelloWorldPortType greeter = service.getPort(portName, HelloWorldPortType.class);
+        final Thread thread = Thread.currentThread(); 
+        
+        class TestAsyncHandler implements AsyncHandler<String> {
+            String expected;
+            
+            public TestAsyncHandler(String x) {
+                expected = x;
+            }
+            
+            public String getExpected() {
+                return expected;
+            }
+            public void handleResponse(Response<String> response) {
+                try {
+                    Thread thread2 = Thread.currentThread();
+                    assertNotSame(thread, thread2);
+                    assertEquals("Hello " + expected, response.get());
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                } catch (ExecutionException e) {
+                    e.printStackTrace();
+                }
+            }
+        }
+        TestAsyncHandler h1 = new TestAsyncHandler("Homer");
+        TestAsyncHandler h2 = new TestAsyncHandler("Maggie");
+        TestAsyncHandler h3 = new TestAsyncHandler("Bart");
+        TestAsyncHandler h4 = new TestAsyncHandler("Lisa");
+        TestAsyncHandler h5 = new TestAsyncHandler("Marge");
+        
+        Future<?> f1 = greeter.greetMeAsync("Santa's Little Helper", 
+                                            new TestAsyncHandler("Santa's Little Helper"));
+        f1.get();
+        f1 = greeter.greetMeAsync("PauseForTwoSecs Santa's Little Helper", 
+                                  new TestAsyncHandler("Santa's Little Helper"));
+        long start = System.currentTimeMillis();
+        f1 = greeter.greetMeAsync("PauseForTwoSecs " + h1.getExpected(), h1);
+        Future<?> f2 = greeter.greetMeAsync("PauseForTwoSecs " + h2.getExpected(),
h2);
+        Future<?> f3 = greeter.greetMeAsync("PauseForTwoSecs " + h3.getExpected(),
h3);
+        Future<?> f4 = greeter.greetMeAsync("PauseForTwoSecs " + h4.getExpected(),
h4);
+        Future<?> f5 = greeter.greetMeAsync("PauseForTwoSecs " + h5.getExpected(),
h5);
+        long mid = System.currentTimeMillis();
+        assertEquals("Hello " + h1.getExpected(), f1.get());
+        assertEquals("Hello " + h2.getExpected(), f2.get());
+        assertEquals("Hello " + h3.getExpected(), f3.get());
+        assertEquals("Hello " + h4.getExpected(), f4.get());
+        assertEquals("Hello " + h5.getExpected(), f5.get());
+        long end = System.currentTimeMillis();
+
+        assertTrue("Time too long: " + (mid - start), (mid - start) < 1000);
+        assertTrue((end - mid) > 1000);
+        f1 = null;
+        f2 = null;
+        f3 = null;
+        f4 = null;
+        f5 = null;
+
+        /*
+        int count = 20;
+        TestAsyncHandler handlers[] = new TestAsyncHandler[count];
+        Future<?> futures[] = new Future<?>[count];
+        for (int x = 0; x < count; x++) {
+            handlers[x] = new TestAsyncHandler("Handler" + x);
+            futures[x] = greeter.greetMeAsync("PauseForTwoSecs " + handlers[x].getExpected(),
+                                              handlers[x]);
+            //intersperse some sync calls in there....
+            if (x == 2 || x == 5) {
+                assertEquals("Hello World", greeter.greetMe("World"));
+            }
+            if (x == 10) {
+                assertEquals("Hello World", greeter.greetMe("PauseForTwoSecs World"));  
             
+            }
+        }
+        int countDone = 0;
+        for (int x = 0; x < count; x++) {
+            if (futures[x].isDone()) {
+                countDone++;
+            }
+        }
+        assertTrue("Should not all be done.", countDone < count);
+        for (int x = 0; x < count; x++) {
+            assertEquals("Hello " + handlers[x].getExpected(), futures[x].get());
+        }
+        countDone = 0;
+        for (int x = 0; x < count; x++) {
+            if (futures[x].isDone()) {
+                countDone++;
+            }
+        }
+        assertEquals(count, countDone);
+        */
+        
+        greeter = null;
+        service = null;
+        
+        System.gc();
+        System.gc();
+        System.gc();
+    }
+    
+    @Test
     public void testBasicConnection() throws Exception {
         QName serviceName = getServiceName(new QName("http://cxf.apache.org/hello_world_jms",

                                  "HelloWorldService"));
@@ -243,11 +357,10 @@
                     assertNotNull(nslf.getFaultInfo());
                     assertNotNull(nslf.getFaultInfo().getCode());
                 } 
+                
             }
         } catch (UndeclaredThrowableException ex) {
             throw (Exception)ex.getCause();
-        } catch (Exception t) {
-            throw t;
         }
     }
     

Modified: cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
URL: http://svn.apache.org/viewvc/cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
(original)
+++ cxf/trunk/systests/transports/src/test/java/org/apache/cxf/systest/jms/TwoWayJMSImplBase.java
Sat Dec 19 13:06:05 2009
@@ -18,7 +18,11 @@
  */
 package org.apache.cxf.systest.jms;
 
+import java.util.concurrent.Future;
+
 import javax.annotation.Resource;
+import javax.xml.ws.AsyncHandler;
+import javax.xml.ws.Response;
 import javax.xml.ws.WebServiceContext;
 import javax.xml.ws.handler.MessageContext;
 
@@ -38,6 +42,15 @@
     @Resource
     protected WebServiceContext wsContext;
     public String greetMe(String me) {
+        if (me.startsWith("PauseForTwoSecs")) {
+            try {
+                Thread.sleep(2000);
+            } catch (InterruptedException e) {
+                //ignore
+            }
+            me = me.substring("PauseForTwoSecs".length()).trim();
+        }
+        
         MessageContext mc = wsContext.getMessageContext();
         JMSMessageHeadersType headers =
             (JMSMessageHeadersType) mc.get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
@@ -86,4 +99,34 @@
         return new TestRpcLitFaultResponse();
     }
 
+    public Response<String> greetMeAsync(String stringParam0) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Future<?> greetMeAsync(String stringParam0, AsyncHandler<String> asyncHandler)
{
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Response<String> sayHiAsync() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Future<?> sayHiAsync(AsyncHandler<String> asyncHandler) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Response<TestRpcLitFaultResponse> testRpcLitFaultAsync(String in) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public Future<?> testRpcLitFaultAsync(String in, AsyncHandler<TestRpcLitFaultResponse>
asyncHandler) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }

Added: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml?rev=892469&view=auto
==============================================================================
--- cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml (added)
+++ cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml Sat Dec 19 13:06:05 2009
@@ -0,0 +1,27 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements. See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership. The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License. You may obtain a copy of the License at
+ 
+  http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied. See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+<bindings
+    xmlns:xsd="http://www.w3.org/2001/XMLSchema"
+    xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
+    wsdlLocation="jms_test.wsdl"
+    xmlns="http://java.sun.com/xml/ns/jaxws">
+    <bindings node="wsdl:definitions">
+        <enableAsyncMapping>true</enableAsyncMapping>
+    </bindings>
+</bindings>
\ No newline at end of file

Propchange: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: cxf/trunk/testutils/src/main/resources/wsdl/jms_test-binding.xml
------------------------------------------------------------------------------
    svn:mime-type = text/xml

Modified: cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl
URL: http://svn.apache.org/viewvc/cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl?rev=892469&r1=892468&r2=892469&view=diff
==============================================================================
--- cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl (original)
+++ cxf/trunk/testutils/src/main/resources/wsdl/jms_test.wsdl Sat Dec 19 13:06:05 2009
@@ -347,8 +347,8 @@
                    <jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
                    <jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:61500"/>
                </jms:address>
-            
                <jms:server durableSubscriberName="CXF_subscriber"/>
+               <jms:sessionPool hightWaterMark="5" lowWaterMark="5"/>
            </port>
     </service>
         



Mime
View raw message