cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject svn commit: r1567186 [1/2] - in /cxf/trunk: core/src/main/java/org/apache/cxf/phase/ rt/transports/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ rt/transports/jms/s...
Date Tue, 11 Feb 2014 15:45:31 GMT
Author: cschneider
Date: Tue Feb 11 15:45:30 2014
New Revision: 1567186

URL: http://svn.apache.org/r1567186
Log:
CXF-5543 Replacing message listener container and removing spring jms dep

Added:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/DestinationResolver.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/UserCredentialsConnectionFactoryAdapter.java   (with props)
Removed:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SpringJMSListenerAdapter.java
Modified:
    cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
    cxf/trunk/rt/transports/jms/pom.xml
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSDestinationResolver.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
    cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
    cxf/trunk/rt/transports/jms/src/test/resources/jms_test_jndi.xml
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/JMSTestUtil.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java

Modified: cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java
URL: http://svn.apache.org/viewvc/cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java (original)
+++ cxf/trunk/core/src/main/java/org/apache/cxf/phase/PhaseInterceptorChain.java Tue Feb 11 15:45:30 2014
@@ -253,7 +253,6 @@ public class PhaseInterceptorChain imple
         }
     }
 
-
     public synchronized void pause() {
         state = State.PAUSED;
         pausedMessage = CURRENT_MESSAGE.get();

Modified: cxf/trunk/rt/transports/jms/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/pom.xml?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/pom.xml (original)
+++ cxf/trunk/rt/transports/jms/pom.xml Tue Feb 11 15:45:30 2014
@@ -108,17 +108,9 @@
         </dependency>
         <dependency>
             <groupId>org.springframework</groupId>
-            <artifactId>spring-jms</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework</groupId>
             <artifactId>spring-context</artifactId>
         </dependency>
         <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-tx</artifactId>
-        </dependency>
-        <dependency>
             <groupId>commons-lang</groupId>
             <artifactId>commons-lang</artifactId>
         </dependency>

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java Tue Feb 11 15:45:30 2014
@@ -19,29 +19,43 @@
 package org.apache.cxf.transport.jms;
 
 import java.io.IOException;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.SimpleTimeZone;
+import java.util.TimeZone;
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.MessageObserver;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
+import org.apache.cxf.transport.jms.util.JMSSender;
+import org.apache.cxf.transport.jms.util.JMSUtil;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
+import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
 
 /**
  * Conduit for sending the reply back to the client
  */
-class BackChannelConduit extends AbstractConduit {
+class BackChannelConduit extends AbstractConduit implements JMSExchangeSender {
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
-    protected Message inMessage;
-    private JMSExchangeSender sender;
-
-    BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) {
-        super(ref);
-        inMessage = message;
-        this.sender = sender;
+    private JMSConfiguration jmsConfig;
+    private Message inMessage;
+    
+    BackChannelConduit(Message inMessage, JMSConfiguration jmsConfig) {
+        super(EndpointReferenceUtils.getAnonymousEndpointReference());
+        this.inMessage = inMessage;
+        this.jmsConfig = jmsConfig;
     }
     @Override
     public void close(Message msg) throws IOException {
@@ -75,15 +89,122 @@ class BackChannelConduit extends Abstrac
             message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
                 .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
         }
-
-        final Exchange exchange = inMessage.getExchange();
-        exchange.setOutMessage(message);
         
+        Exchange exchange = inMessage.getExchange();
+        exchange.setOutMessage(message);
+
         boolean isTextMessage = (jmsMessage instanceof TextMessage) && !JMSMessageUtils.isMtomEnabled(message);
-        MessageStreamUtil.prepareStream(message, isTextMessage, sender);
+        MessageStreamUtil.prepareStream(message, isTextMessage, this);
     }
     
     protected Logger getLogger() {
         return LOG;
     }
+
+    public void sendExchange(Exchange exchange, final Object replyObj) {
+        if (exchange.isOneWay()) {
+            //Don't need to send anything
+            return;
+        }
+
+        final Message outMessage = exchange.getOutMessage();
+
+        ResourceCloser closer = new ResourceCloser();
+        try {
+            ConnectionFactory cf = jmsConfig.getConnectionFactory();
+            Connection connection = closer.register(cf.createConnection());
+            Session session = closer
+                .register(connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE));
+
+            final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
+                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+            JMSMessageHeadersType inMessageProperties = (JMSMessageHeadersType)inMessage
+                .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+            initResponseMessageProperties(messageProperties, inMessageProperties);
+
+            // setup the reply message
+            final javax.jms.Message request = (javax.jms.Message)inMessage
+                .get(JMSConstants.JMS_REQUEST_MESSAGE);
+            final String msgType = JMSMessageUtils.isMtomEnabled(outMessage)
+                ? JMSConstants.BINARY_MESSAGE_TYPE : JMSMessageUtils.getMessageType(request);
+            if (isTimedOut(request)) {
+                return;
+            }
+
+            Destination replyTo = getReplyToDestination(session, inMessage);
+            if (replyTo == null) {
+                throw new RuntimeException("No replyTo destination set");
+            }
+
+            getLogger().log(Level.FINE, "send out the message!");
+
+            String correlationId = determineCorrelationID(request);
+            javax.jms.Message reply = JMSMessageUtils.asJMSMessage(jmsConfig, 
+                                      outMessage, 
+                                      replyObj, 
+                                      msgType,
+                                      session,
+                                      correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+            JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties);
+            LOG.log(Level.FINE, "server sending reply: ", reply);
+            sender.sendMessage(closer, session, replyTo, reply);
+        } catch (JMSException ex) {
+            throw JMSUtil.convertJmsException(ex);
+        } finally {
+            closer.close();
+        }
+    }
+    
+    /**
+     * @param messageProperties
+     * @param inMessageProperties
+     */
+    public static void initResponseMessageProperties(JMSMessageHeadersType messageProperties,
+                                                     JMSMessageHeadersType inMessageProperties) {
+        messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode());
+        messageProperties.setJMSPriority(inMessageProperties.getJMSPriority());
+        messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI());
+        messageProperties.setSOAPJMSBindingVersion("1.0");
+    }
+
+    private boolean isTimedOut(final javax.jms.Message request) throws JMSException {
+        if (request.getJMSExpiration() > 0) {
+            TimeZone tz = new SimpleTimeZone(0, "GMT");
+            Calendar cal = new GregorianCalendar(tz);
+            long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+            if (timeToLive < 0) {
+                getLogger()
+                    .log(Level.INFO, "Message time to live is already expired skipping response.");
+                return true;
+            }
+        }
+        return false;
+    }
+    
+    private Destination getReplyToDestination(Session session, Message inMessage2) throws JMSException {
+        javax.jms.Message message = (javax.jms.Message)inMessage2.get(JMSConstants.JMS_REQUEST_MESSAGE);
+        // If WS-Addressing had set the replyTo header.
+        final String replyToName = (String)inMessage2.get(JMSConstants.JMS_REBASED_REPLY_TO);
+        if (replyToName != null) {
+            return jmsConfig.getReplyDestination(session, replyToName);
+        } else if (message.getJMSReplyTo() != null) {
+            return message.getJMSReplyTo();
+        } else {
+            return jmsConfig.getReplyDestination(session);
+        }
+    }
+    
+    /**
+     * Decides what correlationId to use for the reply by looking at the request headers
+     * 
+     * @param request jms request message
+     * @return correlation id of request if set else message id from request
+     * @throws JMSException
+     */
+    public String determineCorrelationID(javax.jms.Message request) throws JMSException {
+        return StringUtils.isEmpty(request.getJMSCorrelationID())
+            ? request.getJMSMessageID() 
+            : request.getJMSCorrelationID();
+    }
+
 }
\ No newline at end of file

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Tue Feb 11 15:45:30 2014
@@ -29,6 +29,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
+import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
@@ -49,7 +50,6 @@ import org.apache.cxf.transport.jms.util
 import org.apache.cxf.transport.jms.util.JMSUtil;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.springframework.jms.connection.SingleConnectionFactory;
 
 /**
  * JMSConduit is instantiated by the JMSTransportFactory which is selected by a client if the transport
@@ -70,6 +70,7 @@ public class JMSConduit extends Abstract
     private AtomicLong messageCount;
     private JMSBusLifeCycleListener listener;
     private Bus bus;
+    private Connection connection;
     private Destination staticReplyDestination;
 
     public JMSConduit(EndpointReferenceType target,
@@ -97,12 +98,11 @@ public class JMSConduit extends Abstract
         MessageStreamUtil.closeStreams(msg);
         super.close(msg);
     }
+
     private synchronized void getJMSListener(Destination replyTo) {
         if (jmsListener == null) {
-            jmsListener = JMSFactory.createJmsListener(jmsConfig, 
-                                                       this, 
-                                                       replyTo, 
-                                                       conduitId);
+            jmsListener = JMSFactory
+                .createSimpleJmsListener(jmsConfig, connection, this, replyTo, conduitId);
             addBusListener();
         }
     }
@@ -132,7 +132,11 @@ public class JMSConduit extends Abstract
 
         ResourceCloser closer = new ResourceCloser();
         try {
-            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
+            if (connection == null) {
+                connection = JMSFactory.createConnection(jmsConfig);
+            }
+            Session session = closer.register(connection.createSession(jmsConfig.isSessionTransacted(), 
+                                                                       Session.AUTO_ACKNOWLEDGE));
             Destination targetDest = jmsConfig.getTargetDestination(session);
             
             Destination replyToDestination = null;
@@ -143,6 +147,7 @@ public class JMSConduit extends Abstract
                 }
                 replyToDestination = jmsConfig.getReplyToDestination(session, headers.getJMSReplyTo());
             }
+            connection.start();
 
             String messageType = jmsConfig.getMessageType();
             String correlationId = createCorrelationId(exchange, userCID);
@@ -301,6 +306,8 @@ public class JMSConduit extends Abstract
             String correlationId = jmsMessage.getJMSCorrelationID();
             LOG.log(Level.INFO, "Received reply message with correlation id " + correlationId);
 
+            // Try to correlate the incoming message with some timeout as it may have been
+            // added to the map after the message was sent
             int count = 0;
             Exchange exchange = null;
             while (exchange == null && count < 100) {
@@ -364,11 +371,7 @@ public class JMSConduit extends Abstract
         }
     }
     public synchronized void close() {
-        try {
-            ((SingleConnectionFactory)jmsConfig.getConnectionFactory()).resetConnection();
-        } catch (Exception e) {
-            // Ignore
-        }
+        ResourceCloser.close(connection);
         shutdownListeners();
         LOG.log(Level.FINE, "JMSConduit closed ");
     }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Tue Feb 11 15:45:30 2014
@@ -27,14 +27,10 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.cxf.common.injection.NoJSR250Annotations;
+import org.apache.cxf.transport.jms.util.DestinationResolver;
 import org.apache.cxf.transport.jms.util.JMSDestinationResolver;
 import org.springframework.beans.factory.InitializingBean;
-import org.springframework.beans.factory.annotation.Required;
-import org.springframework.core.task.TaskExecutor;
-import org.springframework.jms.connection.SingleConnectionFactory;
-import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jndi.JndiTemplate;
-import org.springframework.transaction.PlatformTransactionManager;
 
 @NoJSR250Annotations
 public class JMSConfiguration implements InitializingBean {
@@ -49,8 +45,7 @@ public class JMSConfiguration implements
     private JndiTemplate jndiTemplate;
     private ConnectionFactory connectionFactory;
     private DestinationResolver destinationResolver = new JMSDestinationResolver();
-    private PlatformTransactionManager transactionManager;
-    private TaskExecutor taskExecutor;
+    private Executor taskExecutor;
     private boolean reconnectOnException = true;
     private boolean messageIdEnabled = true;
     private boolean messageTimestampEnabled = true;
@@ -103,8 +98,6 @@ public class JMSConfiguration implements
 
     private JNDIConfiguration jndiConfig;
 
-    private SingleConnectionFactory singleConnectionFactory;
-
     public void ensureProperlyConfigured() {
         if (connectionFactory == null) {
             connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this);
@@ -264,7 +257,6 @@ public class JMSConfiguration implements
         }
     }
 
-    @Required
     public void setConnectionFactory(ConnectionFactory connectionFactory) {
         this.connectionFactory = connectionFactory;
     }
@@ -333,12 +325,8 @@ public class JMSConfiguration implements
         this.sessionTransacted = sessionTransacted;
     }
 
-    public PlatformTransactionManager getTransactionManager() {
-        return transactionManager;
-    }
-
-    public void setTransactionManager(PlatformTransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
+    @Deprecated
+    public void setTransactionManager(Object transactionManager) {
     }
 
     public int getConcurrentConsumers() {
@@ -377,7 +365,7 @@ public class JMSConfiguration implements
         return taskExecutor;
     }
 
-    public void setTaskExecutor(TaskExecutor taskExecutor) {
+    public void setTaskExecutor(Executor taskExecutor) {
         this.taskExecutor = taskExecutor;
     }
 
@@ -428,23 +416,13 @@ public class JMSConfiguration implements
         this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
     }
 
-    public ConnectionFactory getPlainConnectionFactory() {
+    public ConnectionFactory getConnectionFactory() {
         if (connectionFactory == null) {
             connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this);
         }
         return connectionFactory;
     }
     
-    public ConnectionFactory getConnectionFactory() {
-        if (singleConnectionFactory == null) {
-            ConnectionFactory cf = getPlainConnectionFactory();
-            singleConnectionFactory = cf instanceof SingleConnectionFactory
-                ? (SingleConnectionFactory)cf : new SingleConnectionFactory(cf);
-            singleConnectionFactory.setClientId(durableSubscriptionClientId);
-        }
-        return singleConnectionFactory;
-    }
-
     public String getDurableSubscriptionClientId() {
         return durableSubscriptionClientId;
     }
@@ -531,4 +509,5 @@ public class JMSConfiguration implements
     public Destination getReplyDestination(Session session, String replyToName) throws JMSException {
         return destinationResolver.resolveDestinationName(session, replyToName, replyPubSubDomain);
     }
+
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Tue Feb 11 15:45:30 2014
@@ -20,29 +20,20 @@
 package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
-import java.util.Calendar;
-import java.util.GregorianCalendar;
-import java.util.Map;
-import java.util.SimpleTimeZone;
-import java.util.TimeZone;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
-import javax.jms.Session;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
 import org.apache.cxf.common.classloader.ClassLoaderUtils;
 import org.apache.cxf.common.classloader.ClassLoaderUtils.ClassLoaderHolder;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.common.util.StringUtils;
 import org.apache.cxf.continuations.ContinuationProvider;
 import org.apache.cxf.continuations.SuspendedInvocationException;
 import org.apache.cxf.interceptor.OneWayProcessorInterceptor;
-import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.security.SecurityContext;
@@ -51,14 +42,10 @@ import org.apache.cxf.transport.Abstract
 import org.apache.cxf.transport.Conduit;
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
-import org.apache.cxf.transport.jms.util.JMSSender;
 import org.apache.cxf.transport.jms.util.JMSUtil;
-import org.apache.cxf.transport.jms.util.ResourceCloser;
-import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
 
 public class JMSDestination extends AbstractMultiplexDestination 
-    implements MessageListener, JMSExchangeSender {
+    implements MessageListener {
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
 
@@ -83,8 +70,7 @@ public class JMSDestination extends Abst
      * @return the inbuilt backchannel
      */
     protected Conduit getInbuiltBackChannel(Message inMessage) {
-        EndpointReferenceType anon = EndpointReferenceUtils.getAnonymousEndpointReference();
-        return new BackChannelConduit(this, anon, inMessage);
+        return new BackChannelConduit(inMessage, jmsConfig);
     }
 
     /**
@@ -94,27 +80,13 @@ public class JMSDestination extends Abst
         getLogger().log(Level.FINE, "JMSDestination activate().... ");
         jmsConfig.ensureProperlyConfigured();
 
-        Destination targetDestination = resolveTargetDestination();
-        jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, 
-                                                   targetDestination);
+        jmsListener = JMSFactory.createTargetDestinationListener(ei, jmsConfig, this);
         int restartLimit = jmsConfig.getMaxSuspendedContinuations() * jmsConfig.getReconnectPercentOfMax() / 100;
         this.suspendedContinuations = new ThrottlingCounter(this.jmsListener, 
                                                             restartLimit,
                                                             jmsConfig.getMaxSuspendedContinuations());
     }
 
-    private Destination resolveTargetDestination() {
-        ResourceCloser closer = new ResourceCloser();
-        try {
-            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
-            return jmsConfig.getTargetDestination(session);
-        } catch (JMSException e) {
-            throw JMSUtil.convertJmsException(e);
-        } finally {
-            closer.close();
-        }
-    }
-
     public void deactivate() {
         if (jmsListener != null) {
             jmsListener.shutdown();
@@ -126,33 +98,6 @@ public class JMSDestination extends Abst
         this.deactivate();
     }
 
-    public Destination getReplyToDestination(Session session, 
-                                             Message inMessage) throws JMSException {
-        javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
-        // If WS-Addressing had set the replyTo header.
-        final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
-        if (replyToName != null) {
-            return jmsConfig.getReplyDestination(session, replyToName);
-        } else if (message.getJMSReplyTo() != null) {
-            return message.getJMSReplyTo();
-        } else {
-            return jmsConfig.getReplyDestination(session);
-        }
-    }
-
-    /**
-     * Decides what correlationId to use for the reply by looking at the request headers
-     * 
-     * @param request jms request message
-     * @return correlation id of request if set else message id from request
-     * @throws JMSException
-     */
-    public String determineCorrelationID(javax.jms.Message request) throws JMSException {
-        return StringUtils.isEmpty(request.getJMSCorrelationID())
-            ? request.getJMSMessageID() 
-            : request.getJMSCorrelationID();
-    }
-
     /**
      * Convert JMS message received by ListenerThread to CXF message and inform incomingObserver that a
      * message was received. The observer will call the service and then send the response CXF message by
@@ -186,14 +131,8 @@ public class JMSDestination extends Abst
             
             origBus = BusFactory.getAndSetThreadDefaultBus(bus);
 
-            
-            Map<Class<?>, ?> mp = JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.get();
-            if (mp != null) {
-                for (Map.Entry<Class<?>, ?> ent : mp.entrySet()) {
-                    inMessage.setContent(ent.getKey(), ent.getValue());
-                }
-                JCATransactionalMessageListenerContainer.ENDPOINT_LOCAL.remove();
-            }
+            // FIXME
+            //JCATransactionalMessageListenerContainer.setMessageEndpoint(inMessage);
 
             // handle the incoming message
             incomingObserver.onMessage(inMessage);
@@ -206,10 +145,12 @@ public class JMSDestination extends Abst
             // need to propagate any exceptions back so transactions can occur
             if (inMessage.getContent(Exception.class) != null) {
                 Exception ex = inMessage.getContent(Exception.class);
-                if (ex.getCause() instanceof RuntimeException) {
-                    throw (RuntimeException)ex.getCause();
-                } else {
-                    throw new RuntimeException(ex);
+                if (!(ex instanceof org.apache.cxf.interceptor.Fault)) {
+                    if (ex.getCause() instanceof RuntimeException) {
+                        throw (RuntimeException)ex.getCause();
+                    } else {
+                        throw new RuntimeException(ex);
+                    }
                 }
             }
             
@@ -229,84 +170,6 @@ public class JMSDestination extends Abst
         }
     }
 
-    public void sendExchange(Exchange exchange, final Object replyObj) {
-        if (exchange.isOneWay()) {
-            //Don't need to send anything
-            return;
-        }
-        final Message inMessage = exchange.getInMessage();
-        final Message outMessage = exchange.getOutMessage();
-
-        ResourceCloser closer = new ResourceCloser();
-        try {
-            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
-
-            final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
-                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
-            JMSMessageHeadersType inMessageProperties = (JMSMessageHeadersType)inMessage
-                .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-            initResponseMessageProperties(messageProperties, inMessageProperties);
-
-            // setup the reply message
-            final javax.jms.Message request = (javax.jms.Message)inMessage
-                .get(JMSConstants.JMS_REQUEST_MESSAGE);
-            final String msgType = JMSMessageUtils.isMtomEnabled(outMessage)
-                ? JMSConstants.BINARY_MESSAGE_TYPE : JMSMessageUtils.getMessageType(request);
-            if (isTimedOut(request)) {
-                return;
-            }
-
-            Destination replyTo = getReplyToDestination(session, inMessage);
-            if (replyTo == null) {
-                throw new RuntimeException("No replyTo destination set");
-            }
-
-            getLogger().log(Level.FINE, "send out the message!");
-
-            String correlationId = determineCorrelationID(request);
-            javax.jms.Message reply = JMSMessageUtils.asJMSMessage(jmsConfig, 
-                                      outMessage, 
-                                      replyObj, 
-                                      msgType,
-                                      session,
-                                      correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
-            JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties);
-            LOG.log(Level.FINE, "server sending reply: ", reply);
-            sender.sendMessage(closer, session, replyTo, reply);
-        } catch (JMSException ex) {
-            throw JMSUtil.convertJmsException(ex);
-        } finally {
-            closer.close();
-        }
-    }
-    
-    /**
-     * @param messageProperties
-     * @param inMessageProperties
-     */
-    public static void initResponseMessageProperties(JMSMessageHeadersType messageProperties,
-                                                     JMSMessageHeadersType inMessageProperties) {
-        messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode());
-        messageProperties.setJMSPriority(inMessageProperties.getJMSPriority());
-        messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI());
-        messageProperties.setSOAPJMSBindingVersion("1.0");
-    }
-
-
-    private boolean isTimedOut(final javax.jms.Message request) throws JMSException {
-        if (request.getJMSExpiration() > 0) {
-            TimeZone tz = new SimpleTimeZone(0, "GMT");
-            Calendar cal = new GregorianCalendar(tz);
-            long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
-            if (timeToLive < 0) {
-                getLogger()
-                    .log(Level.INFO, "Message time to live is already expired skipping response.");
-                return true;
-            }
-        }
-        return false;
-    }
-
     protected Logger getLogger() {
         return LOG;
     }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Tue Feb 11 15:45:30 2014
@@ -18,27 +18,21 @@
  */
 package org.apache.cxf.transport.jms;
 
-import java.lang.reflect.Method;
-import java.util.logging.Logger;
-
+import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.jms.XAConnectionFactory;
 import javax.naming.NamingException;
-import javax.resource.spi.endpoint.MessageEndpointFactory;
 
-import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.service.model.EndpointInfo;
 import org.apache.cxf.transport.jms.util.JMSListenerContainer;
 import org.apache.cxf.transport.jms.util.JMSSender;
+import org.apache.cxf.transport.jms.util.JMSUtil;
+import org.apache.cxf.transport.jms.util.MessageListenerContainer;
 import org.apache.cxf.transport.jms.util.ResourceCloser;
-import org.apache.cxf.transport.jms.util.SessionFactory;
-import org.apache.cxf.transport.jms.util.SpringJMSListenerAdapter;
-import org.springframework.jms.connection.SingleConnectionFactory;
-import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.apache.cxf.transport.jms.util.UserCredentialsConnectionFactoryAdapter;
 
 /**
  * Factory to create jms helper objects from configuration and context information
@@ -47,7 +41,7 @@ public final class JMSFactory {
     static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
     static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
 
-    private static final Logger LOG = LogUtils.getL7dLogger(JMSFactory.class);
+    //private static final Logger LOG = LogUtils.getL7dLogger(JMSFactory.class);
     
     private JMSFactory() {
     }
@@ -74,7 +68,7 @@ public final class JMSFactory {
         try {
             ConnectionFactory cf = (ConnectionFactory)jmsConfig.getJndiTemplate().
                 lookup(connectionFactoryName);
-            if (!(cf instanceof SingleConnectionFactory)) {
+            if (userName != null) {
                 UserCredentialsConnectionFactoryAdapter uccf = new UserCredentialsConnectionFactoryAdapter();
                 uccf.setUsername(userName);
                 uccf.setPassword(password);
@@ -122,7 +116,8 @@ public final class JMSFactory {
      * @param destination to listen on
      * @return
      */
-    public static JMSListenerContainer createJmsListener(EndpointInfo ei,
+    /*
+    protected static JMSListenerContainer createJmsListener(EndpointInfo ei,
                                                                     JMSConfiguration jmsConfig,
                                                                     MessageListener listenerHandler,
                                                                     Destination destination) {
@@ -141,44 +136,6 @@ public final class JMSFactory {
             jmsListener = new DefaultMessageListenerContainer();
         }
         
-        createJmsListener(jmsListener,
-                                 jmsConfig,
-                                 listenerHandler,
-                                 destination,
-                                 null);
-        return new SpringJMSListenerAdapter(jmsListener);
-    }
-
-    /**
-     * Create and start listener using configuration information from jmsConfig. Uses
-     * resolveOrCreateDestination to determine the destination for the listener.
-     * 
-     * @param jmsConfig configuration information
-     * @param listenerHandler object to be called when a message arrives
-     * @param destinationName null for temp dest or a destination name
-     * @param conduitId id for message selector
-     * @return
-     */
-    public static JMSListenerContainer createJmsListener(JMSConfiguration jmsConfig,
-                                                                    MessageListener listenerHandler,
-                                                                    Destination destination, 
-                                                                    String conduitId) {
-        DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer(); 
-        createJmsListener(jmsListener,
-                                 jmsConfig,
-                                 listenerHandler,
-                                 destination,
-                                 conduitId);
-        return new SpringJMSListenerAdapter(jmsListener);
-    }
-
-    private static void createJmsListener(
-                          DefaultMessageListenerContainer jmsListener,
-                          JMSConfiguration jmsConfig,
-                          MessageListener listenerHandler,
-                          Destination destination,
-                          String conduitId) {
-        
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
         jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
         
@@ -220,28 +177,62 @@ public final class JMSFactory {
         if (jmsConfig.isAcceptMessagesWhileStopping()) {
             jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
         }
-        String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
-        String conduitIdSt = jmsConfig.isUseConduitIdSelector() && conduitId != null ? conduitId : "";
-        String correlationIdPrefix = staticSelectorPrefix + conduitIdSt;
-        
-        if (!correlationIdPrefix.isEmpty()) {
-            String messageSelector = "JMSCorrelationID LIKE '" + correlationIdPrefix + "%'";
-            jmsListener.setMessageSelector(messageSelector);
-        }
+        String messageSelector = getMessageSelector(jmsConfig, null);
+        jmsListener.setMessageSelector(messageSelector);
         
         jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor());
-
+        
         jmsListener.setDestination(destination);
         jmsListener.initialize();
         jmsListener.start();
+        return new SpringJMSListenerAdapter(jmsListener);
+    }
+    */
+
+    private static String getMessageSelector(JMSConfiguration jmsConfig, String conduitId) {
+        String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
+        String conduitIdSt = jmsConfig.isUseConduitIdSelector() && conduitId != null ? conduitId : "";
+        String correlationIdPrefix = staticSelectorPrefix + conduitIdSt;
+        return correlationIdPrefix.isEmpty() ? null : "JMSCorrelationID LIKE '" + correlationIdPrefix + "%'";
     }
     
-    public static SessionFactory createJmsSessionFactory(JMSConfiguration jmsConfig, ResourceCloser closer) {
-        SessionFactory sf = new SessionFactory(jmsConfig.getConnectionFactory(), closer);
-        sf.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
-        sf.setSessionTransacted(jmsConfig.isSessionTransacted());
-        sf.setDurableSubscriptionClientId(jmsConfig.getDurableSubscriptionClientId());
-        return sf;
+    public static JMSListenerContainer createTargetDestinationListener(EndpointInfo ei, JMSConfiguration jmsConfig,
+                                                               MessageListener listenerHandler) {
+        Session session = null;
+        try {
+            Connection connection = createConnection(jmsConfig);
+            session = connection.createSession(jmsConfig.isSessionTransacted(), Session.AUTO_ACKNOWLEDGE);
+            Destination destination = jmsConfig.getTargetDestination(session);
+            MessageListenerContainer container = new MessageListenerContainer(connection, destination, listenerHandler);
+            container.setMessageSelector(jmsConfig.getMessageSelector());
+            container.start();
+            connection.start();
+            return container;
+        } catch (JMSException e) {
+            throw JMSUtil.convertJmsException(e);
+        } finally {
+            ResourceCloser.close(session);
+        }
+    }
+
+    public static JMSListenerContainer createSimpleJmsListener(JMSConfiguration jmsConfig,
+                                                               Connection connection,
+                                                               MessageListener listenerHandler, 
+                                                               Destination destination,
+                                                               String conduitId) {
+        MessageListenerContainer container = new MessageListenerContainer(connection, destination, listenerHandler);
+        String messageSelector = getMessageSelector(jmsConfig, conduitId);
+        container.setMessageSelector(messageSelector);
+        container.start();
+        return container;
+    }
+
+    public static Connection createConnection(JMSConfiguration jmsConfig) throws JMSException {
+        Connection connection = jmsConfig.getConnectionFactory().createConnection();
+        if (jmsConfig.getDurableSubscriptionClientId() != null) {
+            connection.setClientID(jmsConfig.getDurableSubscriptionClientId());
+        }
+        return connection;
     }
     
 }

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/DestinationResolver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/DestinationResolver.java?rev=1567186&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/DestinationResolver.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/DestinationResolver.java Tue Feb 11 15:45:30 2014
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public interface DestinationResolver {
+    Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain)
+        throws JMSException;
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/DestinationResolver.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSDestinationResolver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSDestinationResolver.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSDestinationResolver.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSDestinationResolver.java Tue Feb 11 15:45:30 2014
@@ -23,7 +23,6 @@ import javax.jms.JMSException;
 import javax.jms.Session;
 import javax.naming.NamingException;
 
-import org.springframework.jms.support.destination.DestinationResolver;
 import org.springframework.jndi.JndiTemplate;
 
 public class JMSDestinationResolver implements DestinationResolver {

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java?rev=1567186&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java Tue Feb 11 15:45:30 2014
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+
+public class MessageListenerContainer implements JMSListenerContainer {
+    
+    private Connection connection;
+    private Destination replyTo;
+    private MessageListener listenerHandler;
+    private boolean transacted;
+    private int acknowledgeMode = Session.AUTO_ACKNOWLEDGE;
+    private String messageSelector;
+    private boolean running;
+    private MessageConsumer consumer;
+    private Session session;
+    private ExecutorService executor;
+
+    public MessageListenerContainer(Connection connection, 
+                                    Destination replyTo,
+                                    MessageListener listenerHandler) {
+        this.connection = connection;
+        this.replyTo = replyTo;
+        this.listenerHandler = listenerHandler;
+        executor = Executors.newFixedThreadPool(20);
+    }
+    
+    public void setTransacted(boolean transacted) {
+        this.transacted = transacted;
+    }
+
+    public void setAcknowledgeMode(int acknowledgeMode) {
+        this.acknowledgeMode = acknowledgeMode;
+    }
+
+    public void setMessageSelector(String messageSelector) {
+        this.messageSelector = messageSelector;
+    }
+
+    @Override
+    public boolean isRunning() {
+        return running;
+    }
+
+    @Override
+    public void stop() {
+        ResourceCloser.close(consumer);
+        ResourceCloser.close(session);
+    }
+
+    @Override
+    public void start() {
+        try {
+            session = connection.createSession(transacted, acknowledgeMode);
+            consumer = session.createConsumer(replyTo, messageSelector);
+            consumer.setMessageListener(listenerHandler);
+            running = true;
+        } catch (JMSException e) {
+            throw JMSUtil.convertJmsException(e);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        stop();
+        ResourceCloser.close(connection);
+    }
+
+    class DispachingListener implements MessageListener {
+
+        @Override
+        public void onMessage(final Message message) {
+            executor.execute(new Runnable() {
+                
+                @Override
+                public void run() {
+                    try {
+                        listenerHandler.onMessage(message);
+                    } catch (Exception e) {
+                        // Ignore
+                    }
+                }
+            });
+        }
+        
+    }
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/MessageListenerContainer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java Tue Feb 11 15:45:30 2014
@@ -19,97 +19,59 @@
 package org.apache.cxf.transport.jms.util;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.AbstractSequentialList;
 import java.util.LinkedList;
 
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 public class ResourceCloser implements Closeable {
-    private AbstractSequentialList<Closeable> resources;
+    private AbstractSequentialList<Object> resources;
 
     public ResourceCloser() {
-        resources = new LinkedList<Closeable>();
+        resources = new LinkedList<Object>();
     }
     
-    public <E extends Closeable> E register(E resource) {
+    public <E> E register(E resource) {
         resources.add(0, resource);
         return resource;
     }
-    
-    public javax.jms.Connection register(final javax.jms.Connection connection) {
-        resources.add(0, new Closeable() {
-            
-            @Override
-            public void close() throws IOException {
-                try {
-                    connection.close();
-                } catch (JMSException e) {
-                    // Ignore
-                }
-            }
-        });
-        return connection;
-    }
-    
-    public Session register(final Session session) {
-        resources.add(0, new Closeable() {
-            
-            @Override
-            public void close() throws IOException {
-                try {
-                    session.close();
-                } catch (JMSException e) {
-                    // Ignore
-                }
-            }
-        });
-        return session;
+
+    @Override
+    public void close() {
+        for (Object resource : resources) {
+            close(resource);
+        }
     }
     
-    public MessageConsumer register(final MessageConsumer consumer) {
-        resources.add(0, new Closeable() {
-            
-            @Override
-            public void close() throws IOException {
-                try {
-                    consumer.close();
-                } catch (JMSException e) {
-                    // Ignore
-                }
-            }
-        });
-        return consumer;
+    public void close(Object ...resources2) {
+        for (Object resource : resources2) {
+            close(resource);
+        }
     }
     
-    public MessageProducer register(final MessageProducer producer) {
-        resources.add(0, new Closeable() {
-            
-            @Override
-            public void close() throws IOException {
-                try {
-                    producer.close();
-                } catch (JMSException e) {
-                    // Ignore
-                }
-            }
-        });
-        return producer;
-    }
-
-    @Override
-    public void close() {
-        for (Closeable resource : resources) {
-            try {
-                resource.close();
-            } catch (Exception e) {
-                // Ignore
+    public static void close(Object resource) {
+        if (resource == null) {
+            return;
+        }
+        try {
+            if (resource instanceof MessageProducer) {
+                ((MessageProducer)resource).close();
+            } else if (resource instanceof MessageConsumer) {
+                ((MessageConsumer)resource).close();
+            } else if (resource instanceof Session) {
+                ((Session)resource).close();
+            } else if (resource instanceof Connection) {
+                ((Connection)resource).close();
+            } else {
+                throw new IllegalArgumentException("Can not handle resource " + resource.getClass());
             }
+        } catch (JMSException e) {
+            // Ignore
         }
     }
-    
 
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java Tue Feb 11 15:45:30 2014
@@ -20,6 +20,7 @@ package org.apache.cxf.transport.jms.uti
 
 import java.util.concurrent.Executors;
 
+import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -33,10 +34,15 @@ public class TestReceiver {
     private String receiveQueueName;
     private String requestMessageId;
     private String staticReplyQueue;
+    private Throwable ex;
+    private boolean forceMessageIdAsCorrelationId;
     
-    public TestReceiver(ConnectionFactory connectionFactory, String receiveQueueName) {
+    public TestReceiver(ConnectionFactory connectionFactory, 
+                        String receiveQueueName, 
+                        boolean forceMessageIdAsCorrelationId) {
         this.connectionFactory = connectionFactory;
         this.receiveQueueName = receiveQueueName;
+        this.forceMessageIdAsCorrelationId = forceMessageIdAsCorrelationId;
         assert this.connectionFactory != null;
         assert this.receiveQueueName != null;
     }
@@ -52,7 +58,8 @@ public class TestReceiver {
     private void drainQueue() {
         ResourceCloser closer = new ResourceCloser();
         try {
-            Session session = new SessionFactory(connectionFactory, closer).createSession();
+            Connection connection = closer.register(connectionFactory.createConnection());
+            Session session = closer.register(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
             MessageConsumer consumer = closer.register(session.createConsumer(session.createQueue(receiveQueueName)));
             javax.jms.Message message = null;
             do {
@@ -68,17 +75,20 @@ public class TestReceiver {
     private void receiveAndRespondWithMessageIdAsCorrelationId() {
         ResourceCloser closer = new ResourceCloser();
         try {
-            Session session = new SessionFactory(connectionFactory, closer).createSession();
+            Connection connection = closer.register(connectionFactory.createConnection());
+            Session session = closer.register(connection.createSession(false, Session.AUTO_ACKNOWLEDGE));
             MessageConsumer consumer = closer.register(session.createConsumer(session
                 .createQueue(receiveQueueName)));
-            final javax.jms.Message inMessage = consumer.receive(2000);
+            final javax.jms.Message inMessage = consumer.receive(1000);
             if (inMessage == null) {
                 throw new RuntimeException("No message received on destination " + receiveQueueName);
             }
             requestMessageId = inMessage.getJMSMessageID();
             System.out.println("Received message " + requestMessageId);
             final TextMessage replyMessage = session.createTextMessage("Result");
-            replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
+            String correlationId = (forceMessageIdAsCorrelationId || inMessage.getJMSCorrelationID() == null) 
+                ? inMessage.getJMSMessageID() : inMessage.getJMSCorrelationID();
+            replyMessage.setJMSCorrelationID(correlationId);
             Destination replyDest = staticReplyQueue != null 
                 ? session.createQueue(staticReplyQueue) : inMessage.getJMSReplyTo();
             if (replyDest != null) {
@@ -87,8 +97,8 @@ public class TestReceiver {
                 System.out.println("Sending reply to " + replyDest);
                 producer.send(replyMessage);
             }
-        } catch (JMSException e) {
-            throw JMSUtil.convertJmsException(e);
+        } catch (Throwable e) {
+            ex = e;
         } finally {
             closer.close();
         }
@@ -102,4 +112,10 @@ public class TestReceiver {
             }
         });
     }
+    
+    public void close() {
+        if (ex != null) {
+            throw new RuntimeException("Error while receiving message or sending reply", ex);
+        }
+    }
 }

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/UserCredentialsConnectionFactoryAdapter.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/UserCredentialsConnectionFactoryAdapter.java?rev=1567186&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/UserCredentialsConnectionFactoryAdapter.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/UserCredentialsConnectionFactoryAdapter.java Tue Feb 11 15:45:30 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms.util;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+public class UserCredentialsConnectionFactoryAdapter implements ConnectionFactory {
+    private String userName;
+    private String password;
+    private ConnectionFactory targetConnectionFactory;
+
+    public void setUsername(String userName2) {
+        this.userName = userName2;
+    }
+
+    public void setPassword(String password2) {
+        this.password = password2;
+    }
+
+    public void setTargetConnectionFactory(ConnectionFactory cf) {
+        this.targetConnectionFactory = cf;
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        return createConnection(userName, password);
+    }
+
+    @Override
+    public Connection createConnection(String userName2, String password2) throws JMSException {
+        return targetConnectionFactory.createConnection(userName2, password2);
+    }
+
+}

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/UserCredentialsConnectionFactoryAdapter.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Tue Feb 11 15:45:30 2014
@@ -45,6 +45,7 @@ import org.junit.Before;
 
 public abstract class AbstractJMSTester extends Assert {
     public static final String JMS_PORT = EmbeddedJMSBrokerLauncher.PORT;
+    public enum ExchangePattern { oneway, requestReply };
     
     protected static final String MESSAGE_CONTENT = "HelloWorld";
     
@@ -94,11 +95,23 @@ public abstract class AbstractJMSTester 
 
     }
     
-    protected void sendoutMessage(Conduit conduit, Message message, boolean isOneWay) throws IOException {
-        sendoutMessage(conduit, message, isOneWay, true);
+    protected void sendMessageAsync(Conduit conduit, Message message) throws IOException {
+        sendoutMessage(conduit, message, false, false);
     }
     
-    protected void sendoutMessage(Conduit conduit, 
+    protected void sendMessageSync(Conduit conduit, Message message) throws IOException {
+        sendoutMessage(conduit, message, false, true);
+    }
+    
+    protected void sendMessage(Conduit conduit, Message message, boolean synchronous) throws IOException {
+        sendoutMessage(conduit, message, false, synchronous);
+    }
+    
+    protected void sendOneWayMessage(Conduit conduit, Message message) throws IOException {
+        sendoutMessage(conduit, message, true, true);
+    }
+    
+    private void sendoutMessage(Conduit conduit, 
                                   Message message, 
                                   boolean isOneWay, 
                                   boolean synchronous) throws IOException {
@@ -111,8 +124,7 @@ public abstract class AbstractJMSTester 
         try {
             conduit.prepare(message);
         } catch (IOException ex) {
-            assertFalse("JMSConduit can't prepare to send out message", false);
-            ex.printStackTrace();
+            throw new RuntimeException("JMSConduit can't prepare to send out message");
         }
         OutputStream os = message.getContent(OutputStream.class);
         Writer writer = message.getContent(Writer.class);
@@ -148,7 +160,7 @@ public abstract class AbstractJMSTester 
 
         JMSConfiguration jmsConfig = new JMSOldConfigHolder()
             .createJMSConfigurationFromEndpointInfo(bus, ei, null, true);
-        if (jmsConfig != null && jmsConfig.getReceiveTimeout() == null) {
+        if (jmsConfig.getReceiveTimeout() == null || jmsConfig.getReceiveTimeout() == 0) {
             jmsConfig.setReceiveTimeout(5000L);
         }
         JMSConduit jmsConduit = new JMSConduit(target, jmsConfig, bus);

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Tue Feb 11 15:45:30 2014
@@ -19,23 +19,20 @@
 
 package org.apache.cxf.transport.jms;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Writer;
 import java.nio.charset.Charset;
 import java.util.logging.Logger;
 
 import javax.jms.BytesMessage;
+import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.cxf.BusFactory;
 import org.apache.cxf.bus.spring.SpringBusFactory;
 import org.apache.cxf.common.logging.LogUtils;
-import org.apache.cxf.helpers.IOUtils;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.service.model.EndpointInfo;
@@ -51,24 +48,21 @@ public class JMSConduitTest extends Abst
 
     @BeforeClass
     public static void createAndStartBroker() throws Exception {
-        startBroker(new JMSBrokerSetup("tcp://localhost:" + JMS_PORT));
+        startBroker(new JMSBrokerSetup("tcp://localhost:" + JMS_PORT + "?persistent=false"));
     }
 
     @Test
     public void testGetConfiguration() throws Exception {
         // setup the new bus to get the configuration file
         SpringBusFactory bf = new SpringBusFactory();
-        BusFactory.setDefaultBus(null);
         bus = bf.createBus("/jms_test_config.xml");
-        BusFactory.setDefaultBus(bus);
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_conf_test", "/wsdl/others/jms_test_no_addr.wsdl",
                          "HelloWorldQueueBinMsgService", "HelloWorldQueueBinMsgPort");
         JMSConduit conduit = setupJMSConduit(ei, false);
         assertEquals("Can't get the right ClientReceiveTimeout", 500L, conduit.getJmsConfig()
             .getReceiveTimeout().longValue());
-        bus.shutdown(false);
-        BusFactory.setDefaultBus(null);
         conduit.close();
+        bus.shutdown(false);
     }
 
     @Test
@@ -100,42 +94,16 @@ public class JMSConduitTest extends Abst
         conduit.getJmsConfig().setReceiveTimeout(Long.valueOf(1));
         Message message = new MessageImpl();
         try {
-            sendoutMessage(conduit, message, false);
-            verifyReceivedMessage(message);
+            sendMessageSync(conduit, message);
             fail("Expected a timeout here");
         } catch (RuntimeException e) {
-            LOG.info("Received exception. This is expected");
+            Assert.assertTrue("Incorrect exception", 
+                              e.getMessage().startsWith("Timeout receiving message with correlationId"));
         } finally {
             conduit.close();
         }
     }
 
-    private void verifyReceivedMessage(Message message) throws InterruptedException {
-        while (inMessage == null) {
-            //the send has completed, but the response might not be back yet.
-            //wait for it.
-            synchronized (this) {
-                wait(10);
-            }
-        }
-        ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
-        Assert.assertNotNull("The received message input stream should not be null", bis);
-        byte bytes[] = new byte[bis.available()];
-        try {
-            bis.read(bytes);
-        } catch (IOException ex) {
-            ex.printStackTrace();
-        }
-        String response = IOUtils.newStringFromBytes(bytes);
-        assertEquals("The response data should be equal", "HelloWorld", response);
-
-        JMSMessageHeadersType inHeader = (JMSMessageHeadersType)inMessage
-            .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
-
-        assertTrue("The inMessage JMS Header should not be null", inHeader != null);
-
-    }
-
     @Test
     public void testJMSMessageMarshal() throws IOException, JMSException {
         String testMsg = "Test Message";
@@ -145,7 +113,8 @@ public class JMSConduitTest extends Abst
         
         ResourceCloser closer = new ResourceCloser();
         try {
-            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
+            Connection connection = JMSFactory.createConnection(jmsConfig);
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             javax.jms.Message jmsMessage = 
                 JMSUtil.createAndSetPayload(testBytes, session, JMSConstants.BYTE_MESSAGE_TYPE);
             assertTrue("Message should have been of type BytesMessage ", jmsMessage instanceof BytesMessage);

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Tue Feb 11 15:45:30 2014
@@ -85,8 +85,8 @@ public class JMSDestinationTest extends 
             }
             waitTime++;
         }
-        assertTrue("Can't receive the Destination message in " + MAX_RECEIVE_TIME 
-                   + " seconds", destMessage != null);
+        assertNotNull("Can't receive the Destination message in " + MAX_RECEIVE_TIME 
+                   + " seconds", destMessage);
     }
 
     protected MessageObserver createMessageObserver() {
@@ -126,8 +126,8 @@ public class JMSDestinationTest extends 
                    jmsConfig.isAcceptMessagesWhileStopping());
         assertNotNull("The connectionFactory should not be null", jmsConfig.getConnectionFactory());
         assertTrue("Should get the instance of ActiveMQConnectionFactory", 
-                   jmsConfig.getPlainConnectionFactory() instanceof ActiveMQConnectionFactory);
-        ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory)jmsConfig.getPlainConnectionFactory();
+                   jmsConfig.getConnectionFactory() instanceof ActiveMQConnectionFactory);
+        ActiveMQConnectionFactory cf = (ActiveMQConnectionFactory)jmsConfig.getConnectionFactory();
         assertEquals("The borker URL is wrong", cf.getBrokerURL(), "tcp://localhost:61500");
         assertEquals("Get a wrong TargetDestination", jmsConfig.getTargetDestination(), "queue:test");
         assertEquals("Get the wrong pubSubDomain value", jmsConfig.isPubSubDomain(), false);
@@ -174,7 +174,7 @@ public class JMSDestinationTest extends 
         destination.setMessageObserver(createMessageObserver());
         // The JMSBroker (ActiveMQ 5.x) need to take some time to setup the DurableSubscriber
         Thread.sleep(2000);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         // wait for the message to be get from the destination
         //long time = System.currentTimeMillis();
         waitForReceiveDestMessage();
@@ -199,7 +199,7 @@ public class JMSDestinationTest extends 
         Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage);
         
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         // wait for the message to be get from the destination
         waitForReceiveDestMessage();
         // just verify the Destination inMessage
@@ -225,7 +225,7 @@ public class JMSDestinationTest extends 
         setupMessageHeader(outMessage);
         JMSDestination destination = setupJMSDestination(ei);
         destination.setMessageObserver(createMessageObserver());
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         // just verify the Destination inMessage
         assertTrue("The destination should have got the message ", destMessage != null);
@@ -236,7 +236,7 @@ public class JMSDestinationTest extends 
          * in spec non-compliant mode */
         
         conduit.getJmsConfig().setEnforceSpec(false);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         assertTrue("The destination should have got the message ", destMessage != null);
         String exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
@@ -249,7 +249,7 @@ public class JMSDestinationTest extends 
         String contextReplyTo = conduit.getJmsConfig().getReplyDestination() + ".context";
         exName += ".context";
         setupMessageHeader(outMessage, "cidValue", contextReplyTo);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         assertTrue("The destiantion should have got the message ", destMessage != null);
         verifyReplyToSet(destMessage, Queue.class, exName);
@@ -261,7 +261,7 @@ public class JMSDestinationTest extends 
 
         setupMessageHeader(outMessage);
         outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.FALSE);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         assertTrue("The destiantion should have got the message ", destMessage != null);
         verifyReplyToNotSet(destMessage);
@@ -272,7 +272,7 @@ public class JMSDestinationTest extends 
 
         setupMessageHeader(outMessage);
         outMessage.put(JMSConstants.JMS_SET_REPLY_TO, Boolean.TRUE);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         assertTrue("The destiantion should have got the message ", destMessage != null);
         exName = getQueueName(conduit.getJmsConfig().getReplyDestination());
@@ -430,15 +430,14 @@ public class JMSDestinationTest extends 
                     backConduit = destination.getBackChannel(m);
                     // wait for the message to be got from the conduit
                     Message replyMessage = new MessageImpl();
-                    sendoutMessage(backConduit, replyMessage, true);
+                    sendOneWayMessage(backConduit, replyMessage);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
             }
         };
         destination.setMessageObserver(observer);
-        // set is oneway false for get response from destination
-        sendoutMessage(conduit, outMessage, false);
+        sendMessageSync(conduit, outMessage);
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
 
@@ -449,7 +448,7 @@ public class JMSDestinationTest extends 
         inMessage = null;
         // Send a second message to check for an issue
         // Where the session was closed the second time
-        sendoutMessage(conduit, outMessage, false);
+        sendMessageSync(conduit, outMessage);
         waitForReceiveInMessage();
         verifyReceivedMessage(inMessage);
 
@@ -496,15 +495,14 @@ public class JMSDestinationTest extends 
                     Message replyMessage = new MessageImpl();
                     // copy the message encoding
                     replyMessage.put(Message.ENCODING, m.get(Message.ENCODING));
-                    sendoutMessage(backConduit, replyMessage, true);
+                    sendOneWayMessage(backConduit, replyMessage);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
             }
         };
         destination.setMessageObserver(observer);
-        // set is oneway false for get response from destination
-        sendoutMessage(conduit, outMessage, false);
+        sendMessageSync(conduit, outMessage);
         // wait for the message to be got from the destination,
         // create the thread to handler the Destination incoming message
 
@@ -545,7 +543,7 @@ public class JMSDestinationTest extends 
         JMSConduit conduit = setupJMSConduit(ei, true);
         final Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage, null);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         SecurityContext securityContext = destMessage.get(SecurityContext.class);
         assertNotNull("SecurityContext should be set in message received by JMSDestination", securityContext);
@@ -565,7 +563,7 @@ public class JMSDestinationTest extends 
         JMSConduit conduit = setupJMSConduit(ei, true);
         final Message outMessage = new MessageImpl();
         setupMessageHeader(outMessage, null);
-        sendoutMessage(conduit, outMessage, true);
+        sendOneWayMessage(conduit, outMessage);
         waitForReceiveDestMessage();
         conduit.close();
         destination.shutdown();

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java Tue Feb 11 15:45:30 2014
@@ -43,22 +43,22 @@ public class MessageIdAsCorrelationIdJMS
 
     @Test
     public void testSendReceiveWithTempReplyQueue() throws Exception {
-        sendAndReceive(null);
+        sendAndReceive(true, null);
     }
-
+    
     @Test
     public void testSendReceive() throws Exception {
-        sendAndReceive("testreply");
+        sendAndReceive(true, "testreply");
     }
     
-    public void sendAndReceive(String replyDestination) throws Exception {
+    public void sendAndReceive(boolean synchronous, String replyDestination) throws Exception {
         BusFactory bf = BusFactory.newInstance();
         Bus bus = bf.createBus();
         BusFactory.setDefaultBus(bus);
         EndpointReferenceType target = new EndpointReferenceType();
 
         connectionFactory = new PooledConnectionFactory(BROKER_URI);
-        TestReceiver receiver = new TestReceiver(connectionFactory, SERVICE_QUEUE);
+        TestReceiver receiver = new TestReceiver(connectionFactory, SERVICE_QUEUE, true);
         receiver.runAsync();
 
         JMSConfiguration jmsConfig = new JMSConfiguration();
@@ -68,9 +68,15 @@ public class MessageIdAsCorrelationIdJMS
 
         JMSConduit conduit = new JMSConduit(target, jmsConfig, bus);
         Exchange exchange = new ExchangeImpl();
+        exchange.setSynchronous(synchronous);
         Message message = new MessageImpl();
         exchange.setOutMessage(message);
         conduit.sendExchange(exchange, "Request");
+        waitForAsyncReply(exchange);
+        receiver.close();
+        if (exchange.getInMessage() == null) {
+            throw new RuntimeException("No reply received within 2 seconds");
+        }
         JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)exchange.getInMessage()
             .get(JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
         Assert.assertEquals(receiver.getRequestMessageId(), inHeaders.getJMSCorrelationID());
@@ -78,7 +84,12 @@ public class MessageIdAsCorrelationIdJMS
         bus.shutdown(true);
     }
 
-
-
+    private void waitForAsyncReply(Exchange exchange) throws InterruptedException {
+        int count = 0;
+        while (exchange.getInMessage() == null && count <= 20) {
+            Thread.sleep(100);
+            count++;
+        }
+    }
 
 }

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java Tue Feb 11 15:45:30 2014
@@ -43,7 +43,6 @@ public class PooledConnectionTempQueueTe
 
         Connection con1 = cf.createConnection();
         con1.start();
-        //Session session = con1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         
         // This order seems to matter to reproduce the issue
         con1.close();

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java (original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java Tue Feb 11 15:45:30 2014
@@ -120,28 +120,32 @@ public class RequestResponseTest extends
     public void testRequestQueueResponseTempQueue() throws Exception {
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
                          "JMSSimpleService002X", "SimplePortQueueRequest");
-        sendAndReceiveMessages(ei);
+        sendAndReceiveMessages(ei, true);
+        sendAndReceiveMessages(ei, false);
     }
     
     @Test
     public void testRequestQueueResponseStaticQueue() throws Exception {
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
                          "JMSSimpleService002X", "SimplePortQueueRequestQueueResponse");
-        sendAndReceiveMessages(ei);
+        sendAndReceiveMessages(ei, true);
+        sendAndReceiveMessages(ei, false);
     }
     
     @Test
     public void testRequestTopicResponseTempQueue() throws Exception {
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
                          "JMSSimpleService002X", "SimplePortTopicRequest");
-        sendAndReceiveMessages(ei);
+        sendAndReceiveMessages(ei, true);
+        sendAndReceiveMessages(ei, false);
     }
     
     @Test
     public void testRequestTopicResponseStaticQueue() throws Exception {
         EndpointInfo ei = setupServiceInfo("http://cxf.apache.org/jms_simple", "/wsdl/jms_spec_testsuite.wsdl",
                          "JMSSimpleService002X", "SimplePortTopicRequestQueueResponse");
-        sendAndReceiveMessages(ei);
+        sendAndReceiveMessages(ei, true);
+        sendAndReceiveMessages(ei, false);
     }
     
     private Message createMessage() {
@@ -155,7 +159,8 @@ public class RequestResponseTest extends
         return outMessage;
     }
 
-    protected void sendAndReceiveMessages(EndpointInfo ei) throws IOException {
+    protected void sendAndReceiveMessages(EndpointInfo ei, boolean synchronous) throws IOException {
+        inMessage = null;
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduit(ei, true);
         final Message outMessage = createMessage();
@@ -174,7 +179,7 @@ public class RequestResponseTest extends
                     backConduit = destination.getBackChannel(m);
                     // wait for the message to be got from the conduit
                     Message replyMessage = new MessageImpl();
-                    sendoutMessage(backConduit, replyMessage, true);
+                    sendOneWayMessage(backConduit, replyMessage);
                 } catch (Exception e) {
                     throw new RuntimeException(e);
                 }
@@ -183,7 +188,7 @@ public class RequestResponseTest extends
         destination.setMessageObserver(observer);
         
         try {
-            sendoutMessage(conduit, outMessage, false, true);
+            sendMessage(conduit, outMessage, synchronous);
             // wait for the message to be got from the destination,
             // create the thread to handler the Destination incoming message
     

Modified: cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml (original)
+++ cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml Tue Feb 11 15:45:30 2014
@@ -53,10 +53,6 @@
     </jms:destination>
     <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     </bean>
-    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" 
-    	p:connectionFactory-ref="connectionFactory" 
-    	p:pubSubDomain="false" 
-    	p:receiveTimeout="1000"/>
     <bean id="jmsConf1" class="org.apache.cxf.transport.jms.JMSConfiguration" 
     	p:connectionFactory-ref="connectionFactory"
     	p:targetDestination="queue:test"

Modified: cxf/trunk/rt/transports/jms/src/test/resources/jms_test_jndi.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/resources/jms_test_jndi.xml?rev=1567186&r1=1567185&r2=1567186&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/resources/jms_test_jndi.xml (original)
+++ cxf/trunk/rt/transports/jms/src/test/resources/jms_test_jndi.xml Tue Feb 11 15:45:30 2014
@@ -18,15 +18,12 @@
   under the License.
 -->
 <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:ct="http://cxf.apache.org/configuration/types" xmlns:jms="http://cxf.apache.org/transports/jms" xmlns:p="http://www.springframework.org/schema/p" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation=" http://cxf.apache.org/transports/jms http://cxf.apache.org/schemas/configuration/jms.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-    <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="vm://localhost"/>
     <bean id="jndi" class="org.apache.xbean.spring.jndi.SpringInitialContextFactory" factory-method="makeInitialContext" singleton="true">
         <property name="entries" ref="jndiEntries"/>
     </bean>
     <util:map id="jndiEntries">
         <entry key="ConnectionFactory">
-            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
-                <property name="targetConnectionFactory" ref="jmsConnectionFactory"/>
-            </bean>
+            <bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory" p:brokerURL="vm://localhost"/>
         </entry>
     </util:map>
 </beans>



Mime
View raw message