cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject svn commit: r1564952 [1/4] - in /cxf/trunk: parent/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ rt/transp...
Date Wed, 05 Feb 2014 21:58:24 GMT
Author: cschneider
Date: Wed Feb  5 21:58:23 2014
New Revision: 1564952

URL: http://svn.apache.org/r1564952
Log:
CXF-5543 Make jms independent of spring. First part

Added:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java   (with props)
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java   (with props)
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java   (with props)
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java   (with props)
Removed:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigurationTest.java
Modified:
    cxf/trunk/parent/pom.xml
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSBrokerSetup.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/OldConfigTest.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java
    cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/JMSTestUtil.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerGzipTest.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerSoap12Test.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/swa/ClientServerSwaTest.java
    cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java
    cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/common/EmbeddedJMSBrokerLauncher.java

Modified: cxf/trunk/parent/pom.xml
URL: http://svn.apache.org/viewvc/cxf/trunk/parent/pom.xml?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/parent/pom.xml (original)
+++ cxf/trunk/parent/pom.xml Wed Feb  5 21:58:23 2014
@@ -72,7 +72,7 @@
         <cxf.osgi.javax.xml.ws.version>[0.0,3)</cxf.osgi.javax.xml.ws.version>
         <!-- please maintain alphabetical order here -->
         <cxf.abdera.version>1.1.3</cxf.abdera.version>
-        <cxf.activemq.version>5.8.0</cxf.activemq.version>
+        <cxf.activemq.version>5.9.0</cxf.activemq.version>
         <cxf.axiom.version>1.2.14</cxf.axiom.version>
         <cxf.bcprov.version>1.50</cxf.bcprov.version>
         <cxf.cglib.bundle.version>2.2_2</cxf.cglib.bundle.version>

Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java?rev=1564952&view=auto
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java (added)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java Wed Feb  5 21:58:23 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import java.io.IOException;
+import java.util.logging.Logger;
+
+import javax.jms.TextMessage;
+
+import org.apache.cxf.common.logging.LogUtils;
+import org.apache.cxf.message.Exchange;
+import org.apache.cxf.message.Message;
+import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.MessageObserver;
+import org.apache.cxf.ws.addressing.EndpointReferenceType;
+
+/**
+ * Conduit for sending the reply back to the client
+ */
+class BackChannelConduit extends AbstractConduit {
+    private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
+    protected Message inMessage;
+    private JMSExchangeSender sender;
+
+    BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) {
+        super(ref);
+        inMessage = message;
+        this.sender = sender;
+    }
+    @Override
+    public void close(Message msg) throws IOException {
+        MessageStreamUtil.closeStreams(msg);
+        super.close(msg);
+    }
+    
+    /**
+     * Register a message observer for incoming messages.
+     * 
+     * @param observer the observer to notify on receipt of incoming
+     */
+    public void setMessageObserver(MessageObserver observer) {
+        // shouldn't be called for a back channel conduit
+    }
+
+    /**
+     * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input
+     * message (if any).
+     * 
+     * @param message the message to be sent.
+     */
+    public void prepare(final Message message) throws IOException {
+        // setup the message to be sent back
+        javax.jms.Message jmsMessage = (javax.jms.Message)inMessage
+            .get(JMSConstants.JMS_REQUEST_MESSAGE);
+        message.put(JMSConstants.JMS_REQUEST_MESSAGE, jmsMessage);
+
+        if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)
+            && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) {
+            message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
+                .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
+        }
+
+        final Exchange exchange = inMessage.getExchange();
+        exchange.setOutMessage(message);
+        
+        boolean isTextMessage = (jmsMessage instanceof TextMessage) && !JMSMessageUtils.isMtomEnabled(message);
+        MessageStreamUtil.prepareStream(message, isTextMessage, sender);
+    }
+    
+    protected Logger getLogger() {
+        return LOG;
+    }
+}
\ No newline at end of file

Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java Wed Feb  5 21:58:23 2014
@@ -31,21 +31,17 @@ import javax.resource.spi.endpoint.Messa
 import javax.resource.spi.endpoint.MessageEndpointFactory;
 import javax.transaction.xa.XAResource;
 
-import org.apache.cxf.service.model.EndpointInfo;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
 import org.springframework.jms.support.JmsUtils;
 
 public class JCATransactionalMessageListenerContainer extends DefaultMessageListenerContainer {
     static final ThreadLocal<Map<Class<?>, ?>> ENDPOINT_LOCAL = new ThreadLocal<Map<Class<?>, ?>>();
-    static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
-    static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
     private MessageEndpointFactory factory;
     private Method method;
     
-    public JCATransactionalMessageListenerContainer(EndpointInfo ei) {
-        factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY, 
-                                 MessageEndpointFactory.class);
-        method = ei.getProperty(MDB_TRANSACTED_METHOD, Method.class);
+    public JCATransactionalMessageListenerContainer(MessageEndpointFactory factory, Method transactedMethod) {
+        this.factory = factory;
+        this.method = transactedMethod;
         this.setCacheLevel(CACHE_CONNECTION);
     }
     
@@ -68,7 +64,7 @@ public class JCATransactionalMessageList
             mp.put(MessageEndpoint.class, ep);
             
             ENDPOINT_LOCAL.set(mp);
-            ep.beforeDelivery(method);                
+            ep.beforeDelivery(this.method);                
             messageReceived = doReceiveAndExecute(invoker, s, mc, null);
             ep.afterDelivery();
         } catch (Exception ex) {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Wed Feb  5 21:58:23 2014
@@ -20,11 +20,7 @@
 package org.apache.cxf.transport.jms;
 
 import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
-import java.io.Writer;
 import java.lang.ref.WeakReference;
 import java.util.Map;
 import java.util.UUID;
@@ -37,7 +33,6 @@ import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.jms.TemporaryQueue;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.buslifecycle.BusLifeCycleListener;
@@ -46,16 +41,16 @@ import org.apache.cxf.common.logging.Log
 import org.apache.cxf.configuration.ConfigurationException;
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
-import org.apache.cxf.message.MessageImpl;
 import org.apache.cxf.message.MessageUtils;
-import org.apache.cxf.service.model.EndpointInfo;
+import org.apache.cxf.security.SecurityContext;
 import org.apache.cxf.transport.AbstractConduit;
+import org.apache.cxf.transport.jms.util.JMSSender;
+import org.apache.cxf.transport.jms.util.JMSUtil;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.listener.AbstractMessageListenerContainer;
+import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.jms.support.JmsUtils;
+import org.springframework.jms.support.destination.DestinationResolver;
 
 /**
  * JMSConduit is instantiated by the JMSTransportFactory which is selected by a client if the transport
@@ -69,24 +64,21 @@ public class JMSConduit extends Abstract
     
     private static final String CORRELATED = JMSConduit.class.getName() + ".correlated";
     
-    private EndpointInfo endpointInfo;
     private JMSConfiguration jmsConfig;
     private Map<String, Exchange> correlationMap = new ConcurrentHashMap<String, Exchange>();
     private DefaultMessageListenerContainer jmsListener;
-    private DefaultMessageListenerContainer allListener;
     private String conduitId;
     private AtomicLong messageCount;
     private JMSBusLifeCycleListener listener;
     private Bus bus;
+    private Destination staticReplyDestination;
 
-    public JMSConduit(EndpointInfo endpointInfo,
-                      EndpointReferenceType target,
+    public JMSConduit(EndpointReferenceType target,
                       JMSConfiguration jmsConfig,
                       Bus b) {
         super(target);
         bus = b;
         this.jmsConfig = jmsConfig;
-        this.endpointInfo = endpointInfo;
         conduitId = UUID.randomUUID().toString().replaceAll("-", "");
         messageCount = new AtomicLong(0);
     }
@@ -97,59 +89,24 @@ public class JMSConduit extends Abstract
      * JMSOutputStream will then call back the sendExchange method of this class. {@inheritDoc}
      */
     public void prepare(final Message message) throws IOException {
-        String name =  endpointInfo.getName().toString() + ".jms-conduit";
-        org.apache.cxf.common.i18n.Message msg = 
-            new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_CONDUIT", LOG, name);
-        jmsConfig.ensureProperlyConfigured(msg);
         boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
-        if (isTextPayload) {
-            message.setContent(Writer.class, new StringWriter() {
-                @Override
-                public void close() throws IOException {
-                    super.close();
-                    sendExchange(message.getExchange(), toString());
-                }
-            });
-        } else {
-            JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload);
-            message.setContent(OutputStream.class, out);
-        }
+        MessageStreamUtil.prepareStream(message, isTextPayload, this);
     }
+
     @Override
     public void close(Message msg) throws IOException {
-        Writer writer = msg.getContent(Writer.class);
-        if (writer != null) {
-            writer.close();
-        }
-        Reader reader = msg.getContent(Reader.class);
-        if (reader != null) {
-            reader.close();
-        }
+        MessageStreamUtil.closeStreams(msg);
         super.close(msg);
     }
-    private synchronized AbstractMessageListenerContainer getJMSListener() {
+    private synchronized void getJMSListener(Destination replyTo) {
         if (jmsListener == null) {
             jmsListener = JMSFactory.createJmsListener(jmsConfig, 
                                                        this, 
-                                                       jmsConfig.getReplyDestination(), 
-                                                       conduitId, 
-                                                       false);
-            addBusListener();
-        }
-        return jmsListener;
-    }
-    private synchronized AbstractMessageListenerContainer getAllListener() {
-        if (allListener == null) {
-            allListener = JMSFactory.createJmsListener(jmsConfig, 
-                                                       this, 
-                                                       jmsConfig.getReplyDestination(), 
-                                                       null, 
-                                                       true);
+                                                       replyTo, 
+                                                       conduitId);
             addBusListener();
         }
-        return allListener;
     }
-
     /**
      * Send the JMS message and if the MEP is not oneway receive the response.
      * 
@@ -165,117 +122,103 @@ public class JMSConduit extends Abstract
         if (outMessage == null) {
             throw new RuntimeException("Exchange to be sent has no outMessage");
         }
-        
-        boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
-        if (isTextPayload && MessageUtils.isTrue(outMessage.getContextualProperty(
-            org.apache.cxf.message.Message.MTOM_ENABLED)) 
-            && outMessage.getAttachments() != null && outMessage.getAttachments().size() > 0) {
-            org.apache.cxf.common.i18n.Message msg = 
-                new org.apache.cxf.common.i18n.Message("INVALID_MESSAGE_TYPE", LOG);
-            throw new ConfigurationException(msg);
-        }
+
+        jmsConfig.ensureProperlyConfigured();        
+        assertIsNotTextMessageAndMtom(outMessage);
+        //assertIsNotSyncAndTopicReply(exchange);
         
         JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage);
-        String replyTo = headers.getJMSReplyTo();
-        if (replyTo == null) {
-            replyTo = jmsConfig.getReplyDestination();
-        }
-        final JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers);
-        
         String userCID = headers.getJMSCorrelationID();
+        assertIsNotAsyncSyncAndUserCID(exchange, userCID);
 
-        String correlationId = createCorrelationId(exchange, userCID);
-        
-        Destination replyToDestination = null;
-        if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() && isSetReplyTo(outMessage)
-            && replyTo != null) {
-            if (!jmsConfig.isReplyPubSubDomain()
-                && (exchange.isSynchronous() 
-                    || exchange.isOneWay())) {
-                replyToDestination = JMSFactory.resolveOrCreateDestination(jmsTemplate, replyTo,
-                                                                           jmsConfig.isReplyPubSubDomain());
-            } else {
-                if (userCID == null || !jmsConfig.isUseConduitIdSelector()) { 
-                    replyToDestination = getJMSListener().getDestination();
-                } else {
-                    replyToDestination = getAllListener().getDestination();
+        ResourceCloser closer = new ResourceCloser();
+        try {
+            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
+            DestinationResolver resolver = jmsConfig.getDestinationResolver();
+            Destination targetDest = resolver.resolveDestinationName(session, 
+                                                                     jmsConfig.getTargetDestination(), 
+                                                                     jmsConfig.isPubSubDomain());
+            
+            Destination replyToDestination = null;
+            if (!exchange.isOneWay()) {
+                if (!exchange.isSynchronous() && staticReplyDestination == null) {
+                    staticReplyDestination = jmsConfig.getReplyDestination(session);
+                    getJMSListener(staticReplyDestination);
                 }
+                replyToDestination = jmsConfig.getReplyToDestination(session, headers.getJMSReplyTo());
             }
-        }
 
-        final String cid = correlationId; 
-        final Destination rtd = replyToDestination;
-        class JMSConduitMessageCreator implements MessageCreator {
-            private javax.jms.Message jmsMessage;
-
-            public javax.jms.Message createMessage(Session session) throws JMSException {
-                String messageType = jmsConfig.getMessageType();
-                Destination destination = rtd;
-                String replyToAddress = jmsConfig.getReplyToDestination();
-                if (rtd == null && replyToAddress != null) {
-                    destination = JMSFactory.resolveOrCreateDestination(jmsTemplate, replyToAddress,
-                                                                        jmsConfig.isPubSubDomain());
-                }
-                jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(jmsConfig, outMessage, request,
-                                                                    messageType, session, destination,
-                                                                    cid);
-                if ((jmsConfig.isReplyPubSubDomain() || !exchange.isSynchronous()) && !exchange.isOneWay()) {
-                    correlationMap.put(cid, exchange);
-                }
-                LOG.log(Level.FINE, "client sending request: ", jmsMessage);
-                return jmsMessage;
+            String messageType = jmsConfig.getMessageType();
+            String correlationId = createCorrelationId(exchange, userCID);
+            if (correlationId != null) {
+                correlationMap.put(correlationId, exchange);
             }
-
-            public String getMessageID() {
-                if (jmsMessage != null) {
-                    try {
-                        return jmsMessage.getJMSMessageID();
-                    } catch (JMSException e) {
-                        return null;
-                    }
-                }
-                return null;
+            
+            javax.jms.Message message = JMSMessageUtils.asJMSMessage(jmsConfig, 
+                                                                     outMessage,
+                                                                     request, 
+                                                                     messageType,
+                                                                     session,  
+                                                                     correlationId, 
+                                                                     JMSConstants.JMS_CLIENT_REQUEST_HEADERS);
+            if (replyToDestination != null) {
+                message.setJMSReplyTo(replyToDestination);
             }
-        }
-        JMSConduitMessageCreator messageCreator = new JMSConduitMessageCreator();    
-        /**
-         * If the message is not oneWay we will expect to receive a reply on the listener. 
-         * 
-         */
-        if (!exchange.isOneWay()) {
+
+            JMSSender sender = JMSFactory.createJmsSender(jmsConfig, headers);
+
             synchronized (exchange) {
-                jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
+                sender.sendMessage(closer, session, targetDest, message);
+                LOG.log(Level.INFO, "client sending request message " 
+                    + message.getJMSMessageID() + " to " + targetDest);
+                headers.setJMSMessageID(message.getJMSMessageID());
                 if (correlationId == null) {
-                    correlationId = messageCreator.getMessageID();
-                }
-                headers.setJMSMessageID(messageCreator.getMessageID());
-
-                final String messageSelector = "JMSCorrelationID = '" + correlationId + "'";
-                if (exchange.isSynchronous() && !jmsConfig.isReplyPubSubDomain()) {
-                    javax.jms.Message replyMessage = jmsTemplate.receiveSelected(replyToDestination,
-                                                                                 messageSelector);
-                    if (replyMessage == null) {
-                        throw new RuntimeException("Timeout receiving message with correlationId "
-                                                   + correlationId);
-                    } else {
-                        doReplyMessage(exchange, replyMessage);
-                    }
-                    
-                    // TODO How do we delete the temp queue in case of an async request
-                    // or is async with a temp queue not possible ?
-                    if (replyToDestination instanceof TemporaryQueue) {
-                        try {
-                            ((TemporaryQueue)replyToDestination).delete();
-                        } catch (JMSException e) {
-                            // Only log the exception as the exchange should be able to proceed
-                            LOG.log(Level.WARNING, "Unable to remove temporary queue: " + e.getMessage(), e);
-                        }
-                    }
+                    // Warning: We might loose the reply if it already arrived at this point 
+                    correlationId = message.getJMSMessageID();
+                    correlationMap.put(correlationId, exchange);
                 }
             }
-        } else {
-            jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator);
-            headers.setJMSMessageID(messageCreator.getMessageID());
+            
+            /**
+             * If the message is not oneWay we will expect to receive a reply on the listener.
+             */
+            if (!exchange.isOneWay() && (exchange.isSynchronous())) {
+                Destination replyDestination = staticReplyDestination != null 
+                    ? staticReplyDestination : replyToDestination;
+                javax.jms.Message replyMessage = JMSUtil.receive(session, replyDestination, correlationId,
+                                                                 jmsConfig.getReceiveTimeout(),
+                                                                 jmsConfig.isPubSubNoLocal());
+                correlationMap.remove(correlationId);
+                doReplyMessage(exchange, replyMessage);
+            }
+        } catch (JMSException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        } finally {
+            closer.close();
+        }
+    }
+
+    private void assertIsNotAsyncSyncAndUserCID(Exchange exchange, String userCID) {
+        if (!exchange.isSynchronous() && userCID != null) {
+            throw new IllegalArgumentException("User CID can not be used for asynchronous exchanges");
+        }
+    }
+
+    private void assertIsNotTextMessageAndMtom(final Message outMessage) {
+        boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType());
+        if (isTextPayload && MessageUtils.isTrue(outMessage.getContextualProperty(
+            org.apache.cxf.message.Message.MTOM_ENABLED)) 
+            && outMessage.getAttachments() != null && outMessage.getAttachments().size() > 0) {
+            org.apache.cxf.common.i18n.Message msg = 
+                new org.apache.cxf.common.i18n.Message("INVALID_MESSAGE_TYPE", LOG);
+            throw new ConfigurationException(msg);
+        }
+    }
+    
+    @SuppressWarnings("unused")
+    private void assertIsNotSyncAndTopicReply(Exchange exchange) {
+        if (exchange.isSynchronous() && jmsConfig.isReplyPubSubDomain()) {
+            throw new IllegalArgumentException("Synchronous calls can not be combined with a response on a Topic");
         }
     }
 
@@ -296,7 +239,7 @@ public class JMSConduit extends Abstract
                 String prefix = (jmsConfig.isUseConduitIdSelector()) 
                     ? jmsConfig.getConduitSelectorPrefix() + conduitId 
                     : jmsConfig.getConduitSelectorPrefix();
-                correlationId = JMSUtils.createCorrelationId(prefix, messageCount.incrementAndGet());
+                correlationId = JMSUtil.createCorrelationId(prefix, messageCount.incrementAndGet());
             }
         }
         return correlationId;
@@ -358,33 +301,42 @@ public class JMSConduit extends Abstract
      * request is notified {@inheritDoc}
      */
     public void onMessage(javax.jms.Message jmsMessage) {
-        String correlationId;
         try {
-            correlationId = jmsMessage.getJMSCorrelationID();
+            String correlationId = jmsMessage.getJMSCorrelationID();
+            LOG.log(Level.INFO, "Received reply message with correlation id " + correlationId);
+
+            int count = 0;
+            Exchange exchange = null;
+            while (exchange == null && count < 100) {
+                exchange = correlationMap.remove(correlationId);
+                Thread.sleep(100);
+                count++;
+            }
+            if (exchange == null) {
+                LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
+                return;
+            }
+            doReplyMessage(exchange, jmsMessage);
         } catch (JMSException e) {
-            throw JmsUtils.convertJmsAccessException(e);
+            throw JMSUtil.convertJmsException(e);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while correlating", e);
         }
 
-        Exchange exchange = correlationMap.remove(correlationId);
-        if (exchange == null) {
-            LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId);
-            return;
-        }
-        doReplyMessage(exchange, jmsMessage);
     }
 
     /**
      * Process the reply message
+     * @throws JMSException 
      */
-    public void doReplyMessage(Exchange exchange, javax.jms.Message jmsMessage) {
-        Message inMessage = new MessageImpl();
-        exchange.setInMessage(inMessage);
+    public void doReplyMessage(Exchange exchange, javax.jms.Message jmsMessage) throws JMSException {
+        
         LOG.log(Level.FINE, "client received reply: ", jmsMessage);
         try {
-            JMSUtils.populateIncomingContext(jmsMessage, inMessage, 
-                                             JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, jmsConfig);
-        
-            JMSUtils.retrieveAndSetPayload(inMessage, jmsMessage, (String)inMessage.get(Message.ENCODING));
+            Message inMessage = JMSMessageUtils.asCXFMessage(jmsMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS);
+            SecurityContext securityContext = JMSMessageUtils.buildSecurityContext(jmsMessage, jmsConfig);
+            inMessage.put(SecurityContext.class, securityContext);
+            exchange.setInMessage(inMessage);
 
             if (exchange.isSynchronous()) {
                 synchronized (exchange) {
@@ -407,17 +359,18 @@ public class JMSConduit extends Abstract
             listener = null;
         }
         if (jmsListener != null) {
+            jmsListener.stop();
             jmsListener.shutdown();
             jmsListener = null;
         }
-        if (allListener != null) {
-            allListener.shutdown();
-            allListener = null;
-        }        
     }
     public synchronized void close() {
+        try {
+            ((SingleConnectionFactory)jmsConfig.getConnectionFactory()).resetConnection();
+        } catch (Exception e) {
+            // Ignore
+        }
         shutdownListeners();
-        jmsConfig.destroyWrappedConnectionFactory();
         LOG.log(Level.FINE, "JMSConduit closed ");
     }
 

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Wed Feb  5 21:58:23 2014
@@ -18,19 +18,22 @@
  */
 package org.apache.cxf.transport.jms;
 
+import java.util.concurrent.Executor;
+
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.XAConnectionFactory;
+import javax.jms.Session;
 
 import org.apache.cxf.common.injection.NoJSR250Annotations;
-import org.apache.cxf.configuration.ConfigurationException;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Required;
 import org.springframework.core.task.TaskExecutor;
 import org.springframework.jms.connection.SingleConnectionFactory;
-import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.support.destination.DestinationResolver;
+import org.springframework.jms.support.destination.DynamicDestinationResolver;
 import org.springframework.jndi.JndiTemplate;
 import org.springframework.transaction.PlatformTransactionManager;
 
@@ -44,20 +47,18 @@ public class JMSConfiguration implements
 
     private boolean usingEndpointInfo = true;
 
-    private JmsTemplate jmsTemplate;
     private AbstractMessageListenerContainer messageListenerContainer;
 
     private JndiTemplate jndiTemplate;
     private ConnectionFactory connectionFactory;
-    private DestinationResolver destinationResolver;
+    private DestinationResolver destinationResolver = new DynamicDestinationResolver();
     private PlatformTransactionManager transactionManager;
-    private boolean wrapInSingleConnectionFactory = true;
     private TaskExecutor taskExecutor;
     private boolean reconnectOnException = true;
     private boolean messageIdEnabled = true;
     private boolean messageTimestampEnabled = true;
     private boolean pubSubNoLocal;
-    private Long clientReceiveTimeout;
+    private Long clientReceiveTimeout = 0L;
     private Long serverReceiveTimeout;
     private boolean explicitQosEnabled;
     private int deliveryMode = Message.DEFAULT_DELIVERY_MODE;
@@ -81,6 +82,7 @@ public class JMSConfiguration implements
      * Destination name to listen on for reply messages
      */
     private String replyDestination;
+    private Destination replyDestinationDest;
     
     /**
      * Destination name to send out as replyTo address in the message 
@@ -91,7 +93,6 @@ public class JMSConfiguration implements
     private boolean replyPubSubDomain;
     private Boolean useConduitIdSelector;
     private String conduitSelectorPrefix;
-    private boolean autoResolveDestination;
     private long recoveryInterval = DEFAULT_VALUE;
     private int cacheLevel = DEFAULT_VALUE;
     private String cacheLevelName;
@@ -103,13 +104,19 @@ public class JMSConfiguration implements
     private String targetService;
     private String requestURI;
 
-    private ConnectionFactory wrappedConnectionFactory;
-    private boolean autoWrappedConnectionFactory;
     private JNDIConfiguration jndiConfig;
 
-    public void ensureProperlyConfigured(org.apache.cxf.common.i18n.Message msg) {
-        if (targetDestination == null || getOrCreateWrappedConnectionFactory() == null) {
-            throw new ConfigurationException(msg);
+    private SingleConnectionFactory singleConnectionFactory;
+
+    public void ensureProperlyConfigured() {
+        if (connectionFactory == null) {
+            connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this);
+        }
+        if (connectionFactory == null) {
+            throw new IllegalArgumentException("JMSConfiguration.connectionFactory may not be null");
+        }
+        if (targetDestination == null) {
+            throw new IllegalArgumentException("JMSConfigruation.targetDestination may not be null");
         }
     }
 
@@ -137,14 +144,6 @@ public class JMSConfiguration implements
         this.recoveryInterval = recoveryInterval;
     }
 
-    public boolean isAutoResolveDestination() {
-        return autoResolveDestination;
-    }
-
-    public void setAutoResolveDestination(boolean autoResolveDestination) {
-        this.autoResolveDestination = autoResolveDestination;
-    }
-
     public boolean isUsingEndpointInfo() {
         return this.usingEndpointInfo;
     }
@@ -290,7 +289,7 @@ public class JMSConfiguration implements
     }
 
     public String getReplyToDestination() {
-        return replyToDestination;
+        return replyToDestination != null ? replyToDestination : replyDestination;
     }
 
     public void setReplyToDestination(String replyToDestination) {
@@ -377,7 +376,7 @@ public class JMSConfiguration implements
         this.reconnectPercentOfMax = reconnectPercentOfMax;
     }
 
-    public TaskExecutor getTaskExecutor() {
+    public Executor getTaskExecutor() {
         return taskExecutor;
     }
 
@@ -432,70 +431,21 @@ public class JMSConfiguration implements
         this.acceptMessagesWhileStopping = acceptMessagesWhileStopping;
     }
 
-    /**
-     * Tries to creates a ConnectionFactory from jndi if none was set as a property
-     * by using the jndConfig. Then it determines if the connectionFactory should be wrapped
-     * into a SingleConnectionFactory and wraps it if necessary. After the first call the
-     * same connectionFactory will be returned for all subsequent calls
-     *
-     * @return usable connectionFactory
-     */
-    public synchronized ConnectionFactory getOrCreateWrappedConnectionFactory() {
-        if (wrappedConnectionFactory == null) {
-            if (connectionFactory == null) {
-                connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this);
-            }
-            if (wrapInSingleConnectionFactory && !(connectionFactory instanceof SingleConnectionFactory)) {
-                SingleConnectionFactory scf;
-                if (connectionFactory instanceof XAConnectionFactory) {
-                    scf = new XASingleConnectionFactory(connectionFactory);
-                } else {
-                    scf = new SingleConnectionFactory(connectionFactory);
-                }
-                autoWrappedConnectionFactory = true;
-                if (getDurableSubscriptionClientId() != null) {
-                    scf.setClientId(getDurableSubscriptionClientId());
-                }
-                scf.setReconnectOnException(isReconnectOnException());
-                wrappedConnectionFactory = scf;
-            } else {
-                wrappedConnectionFactory = connectionFactory;
-            }
-        }
-        return wrappedConnectionFactory;
-    }
-
-    public ConnectionFactory getWrappedConnectionFactory() {
-        return wrappedConnectionFactory;
-    }
-
-    public synchronized void destroyWrappedConnectionFactory() {
-        if (autoWrappedConnectionFactory
-            &&
-            wrappedConnectionFactory instanceof SingleConnectionFactory) {
-            ((SingleConnectionFactory) wrappedConnectionFactory).destroy();
-            if (connectionFactory == wrappedConnectionFactory) {
-                connectionFactory = null;
-            }
-            wrappedConnectionFactory = null;
-            autoWrappedConnectionFactory = false;
+    public ConnectionFactory getPlainConnectionFactory() {
+        if (connectionFactory == null) {
+            connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this);
         }
-    }
-
-    /**
-     * Only for tests
-     * @return
-     */
-    protected ConnectionFactory getConnectionFactory() {
         return connectionFactory;
     }
-
-    public boolean isWrapInSingleConnectionFactory() {
-        return wrapInSingleConnectionFactory;
-    }
-
-    public void setWrapInSingleConnectionFactory(boolean wrapInSingleConnectionFactory) {
-        this.wrapInSingleConnectionFactory = wrapInSingleConnectionFactory;
+    
+    public ConnectionFactory getConnectionFactory() {
+        if (singleConnectionFactory == null) {
+            ConnectionFactory cf = getPlainConnectionFactory();
+            singleConnectionFactory = cf instanceof SingleConnectionFactory
+                ? (SingleConnectionFactory)cf : new SingleConnectionFactory(cf);
+            singleConnectionFactory.setClientId(durableSubscriptionClientId);
+        }
+        return singleConnectionFactory;
     }
 
     public String getDurableSubscriptionClientId() {
@@ -537,14 +487,6 @@ public class JMSConfiguration implements
         return this.enforceSpec != null;
     }
 
-    public void setJmsTemplate(JmsTemplate jmsTemplate) {
-        this.jmsTemplate = jmsTemplate;
-    }
-
-    public JmsTemplate getJmsTemplate() {
-        return jmsTemplate;
-    }
-
     public AbstractMessageListenerContainer getMessageListenerContainer() {
         return messageListenerContainer;
     }
@@ -565,4 +507,35 @@ public class JMSConfiguration implements
     public void setJmsProviderTibcoEms(boolean jmsProviderTibcoEms) {
         this.jmsProviderTibcoEms = jmsProviderTibcoEms;
     }
+
+    public static Destination resolveOrCreateDestination(final Session session,
+                                                         final DestinationResolver resolver,
+                                                         final String replyToDestinationName,
+                                                         final boolean pubSubDomain) throws JMSException {
+        if (replyToDestinationName == null) {
+            return session.createTemporaryQueue();
+        }
+        return resolver.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
+    }
+    
+    public Destination getReplyToDestination(Session session, String userDestination) throws JMSException {
+        String replyTo = userDestination;
+        if (replyTo == null) {
+            return getReplyDestination(session);
+        }
+        return getDestinationResolver().resolveDestinationName(session, replyTo, replyPubSubDomain);
+    }
+    
+    public Destination getReplyDestination(Session session) throws JMSException {
+        if (replyDestinationDest == null) {
+            replyDestinationDest = replyDestination == null 
+                ? session.createTemporaryQueue()
+                : getDestinationResolver().resolveDestinationName(session, replyDestination, replyPubSubDomain);
+        }
+        return replyDestinationDest;
+    }
+
+    public Destination getTargetDestination(Session session) throws JMSException {
+        return destinationResolver.resolveDestinationName(session, targetDestination, pubSubDomain);
+    }
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java Wed Feb  5 21:58:23 2014
@@ -54,6 +54,7 @@ public final class JMSConstants {
     public static final String JMS_CLIENT_CONFIG_ID = "jms-client";
     public static final String JMS_SERVER_CONFIG_ID = "jms-server";
     
+    // Is used by WS-Addressing
     public static final String JMS_REBASED_REPLY_TO = "org.apache.cxf.jms.server.replyto";
     public static final String JMS_SET_REPLY_TO = "org.apache.cxf.jms.client.set.replyto";
     

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Wed Feb  5 21:58:23 2014
@@ -19,12 +19,7 @@
 
 package org.apache.cxf.transport.jms;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Reader;
-import java.io.StringWriter;
 import java.io.UnsupportedEncodingException;
-import java.io.Writer;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.GregorianCalendar;
@@ -35,12 +30,10 @@ import java.util.concurrent.ConcurrentLi
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 
 import org.apache.cxf.Bus;
 import org.apache.cxf.BusFactory;
@@ -54,31 +47,23 @@ import org.apache.cxf.interceptor.OneWay
 import org.apache.cxf.message.Exchange;
 import org.apache.cxf.message.Message;
 import org.apache.cxf.message.MessageImpl;
-import org.apache.cxf.message.MessageUtils;
+import org.apache.cxf.security.SecurityContext;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.apache.cxf.transport.AbstractConduit;
 import org.apache.cxf.transport.AbstractMultiplexDestination;
 import org.apache.cxf.transport.Conduit;
-import org.apache.cxf.transport.MessageObserver;
 import org.apache.cxf.transport.jms.continuations.JMSContinuation;
 import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider;
+import org.apache.cxf.transport.jms.util.JMSSender;
+import org.apache.cxf.transport.jms.util.JMSUtil;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
 import org.apache.cxf.ws.addressing.EndpointReferenceType;
 import org.apache.cxf.ws.addressing.EndpointReferenceUtils;
-import org.springframework.jms.connection.JmsResourceHolder;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
-import org.springframework.jms.listener.SessionAwareMessageListener;
 import org.springframework.jms.support.JmsUtils;
 import org.springframework.jms.support.destination.DestinationResolver;
-import org.springframework.transaction.PlatformTransactionManager;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionSynchronizationManager;
 
 public class JMSDestination extends AbstractMultiplexDestination 
-    implements SessionAwareMessageListener<javax.jms.Message>,
-        MessageListener, JMSExchangeSender {
+    implements MessageListener, JMSExchangeSender {
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class);
 
@@ -113,26 +98,33 @@ public class JMSDestination extends Abst
      */
     public void activate() {
         getLogger().log(Level.FINE, "JMSDestination activate().... ");
-        String name = endpointInfo.getName().toString() + ".jms-destination";
-        org.apache.cxf.common.i18n.Message msg = 
-            new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name);
-        jmsConfig.ensureProperlyConfigured(msg);
+        jmsConfig.ensureProperlyConfigured();
         Object o = ei.getProperty(AbstractMessageListenerContainer.class.getName());
         if (o instanceof AbstractMessageListenerContainer
             && jmsConfig.getMessageListenerContainer() == null) {
             jmsConfig.setMessageListenerContainer((AbstractMessageListenerContainer)o);
         }
+
+        Destination targetDestination = resolveTargetDestination();
         jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, 
-                                                   jmsConfig.getTargetDestination());
+                                                   targetDestination);
+    }
+
+    private Destination resolveTargetDestination() {
+        ResourceCloser closer = new ResourceCloser();
+        try {
+            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
+            return jmsConfig.getTargetDestination(session);
+        } catch (JMSException e) {
+            throw JMSUtil.convertJmsException(e);
+        } finally {
+            closer.close();
+        }
     }
 
     public void deactivate() {
         if (jmsListener != null) {
             jmsListener.shutdown();
-            // CXF-2788: SingleConnectionFactory ignores the call to
-            // javax.jms.Connection#close(),
-            // use this to really close the target connection.
-            jmsConfig.destroyWrappedConnectionFactory();
         }
     }
 
@@ -141,45 +133,32 @@ public class JMSDestination extends Abst
         this.deactivate();
     }
 
-    private Destination resolveDestinationName(final JmsTemplate jmsTemplate, final String name) {
-        SessionCallback<Destination> sc = new SessionCallback<Destination>() {
-            public Destination doInJms(Session session) throws JMSException {
-                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
-                return resolv.resolveDestinationName(session, name, jmsConfig.isPubSubDomain());
-            }
-        };
-        return jmsTemplate.execute(sc);
-    }
-
-    public Destination getReplyToDestination(JmsTemplate jmsTemplate, Message inMessage) throws JMSException {
+    public Destination getReplyToDestination(Session session, 
+                                             Message inMessage) throws JMSException {
         javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE);
         // If WS-Addressing had set the replyTo header.
         final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO);
         if (replyToName != null) {
-            return resolveDestinationName(jmsTemplate, replyToName);
+            DestinationResolver resolver = jmsConfig.getDestinationResolver();
+            return resolver.resolveDestinationName(session, replyToName, jmsConfig.isReplyPubSubDomain());
         } else if (message.getJMSReplyTo() != null) {
             return message.getJMSReplyTo();
-        } else if (!StringUtils.isEmpty(jmsConfig.getReplyDestination())) {
-            return resolveDestinationName(jmsTemplate, jmsConfig.getReplyDestination());
         } else {
-            throw new RuntimeException("No replyTo destination set on request message or cxf message");
+            return jmsConfig.getReplyDestination(session);
         }
     }
 
     /**
-     * Decides what correlationId to use for the reply by looking at the request headers. If the request has a
-     * correlationId set this is taken. Else the messageId from the request message is used as correlation Id
+     * Decides what correlationId to use for the reply by looking at the request headers
      * 
-     * @param request
-     * @return
+     * @param request jms request message
+     * @return correlation id of request if set else message id from request
      * @throws JMSException
      */
     public String determineCorrelationID(javax.jms.Message request) throws JMSException {
-        String correlationID = request.getJMSCorrelationID();
-        if (correlationID == null || "".equals(correlationID)) {
-            correlationID = request.getJMSMessageID();
-        }
-        return correlationID;
+        return StringUtils.isEmpty(request.getJMSCorrelationID())
+            ? request.getJMSMessageID() 
+            : request.getJMSCorrelationID();
     }
 
     /**
@@ -191,22 +170,17 @@ public class JMSDestination extends Abst
      * @throws IOException
      */
     public void onMessage(javax.jms.Message message) {
-        onMessage(message, null);
-    }
-    public void onMessage(javax.jms.Message message, Session session) {
         ClassLoaderHolder origLoader = null;
         Bus origBus = null;
         try {
             if (loader != null) {
                 origLoader = ClassLoaderUtils.setThreadContextClassloader(loader);
             }
-            getLogger().log(Level.FINE, "server received request: ", message);
-             // Build CXF message from JMS message
-            Message inMessage = new MessageImpl();            
-            JMSUtils.populateIncomingContext(message, inMessage, 
-                                             JMSConstants.JMS_SERVER_REQUEST_HEADERS, jmsConfig);
-            
-            JMSUtils.retrieveAndSetPayload(inMessage, message, (String)inMessage.get(Message.ENCODING));
+            getLogger().log(Level.INFO, "JMS destination received message " + message + " on " 
+                + jmsConfig.getTargetDestination());
+            Message inMessage = JMSMessageUtils.asCXFMessage(message, JMSConstants.JMS_SERVER_REQUEST_HEADERS);
+            SecurityContext securityContext = JMSMessageUtils.buildSecurityContext(message, jmsConfig);
+            inMessage.put(SecurityContext.class, securityContext);
             inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType());
             inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message);
             ((MessageImpl)inMessage).setDestination(this);
@@ -238,25 +212,14 @@ public class JMSDestination extends Abst
                 && inMessage.getExchange().getInMessage() != null) {
                 inMessage = inMessage.getExchange().getInMessage();
             }
-            //need to propagate any exceptions back to Spring container 
-            //so transactions can occur
-            if (inMessage.getContent(Exception.class) != null && session != null) {
-                PlatformTransactionManager m = jmsConfig.getTransactionManager();
-                if (m != null) {
-                    TransactionStatus status = m.getTransaction(null);
-                    JmsResourceHolder resourceHolder =
-                        (JmsResourceHolder) TransactionSynchronizationManager
-                            .getResource(jmsConfig.getConnectionFactory());
-                    boolean trans = resourceHolder == null 
-                        || !resourceHolder.containsSession(session);
-                    if (status != null && !status.isCompleted() && trans) {
-                        Exception ex = inMessage.getContent(Exception.class);
-                        if (ex.getCause() instanceof RuntimeException) {
-                            throw (RuntimeException)ex.getCause();
-                        } else {
-                            throw new RuntimeException(ex);
-                        }
-                    }
+
+            // need to propagate any exceptions back so transactions can occur
+            if (inMessage.getContent(Exception.class) != null) {
+                Exception ex = inMessage.getContent(Exception.class);
+                if (ex.getCause() instanceof RuntimeException) {
+                    throw (RuntimeException)ex.getCause();
+                } else {
+                    throw new RuntimeException(ex);
                 }
             }
             
@@ -264,6 +227,8 @@ public class JMSDestination extends Abst
             getLogger().log(Level.FINE, "Request message has been suspended");
         } catch (UnsupportedEncodingException ex) {
             getLogger().log(Level.WARNING, "can't get the right encoding information. " + ex);
+        } catch (JMSException e) {
+            JMSUtil.convertJmsException(e);
         } finally {
             if (origBus != bus) {
                 BusFactory.setThreadDefaultBus(origBus);
@@ -279,62 +244,77 @@ public class JMSDestination extends Abst
             //Don't need to send anything
             return;
         }
-        Message inMessage = exchange.getInMessage();
+        final Message inMessage = exchange.getInMessage();
         final Message outMessage = exchange.getOutMessage();
 
+        ResourceCloser closer = new ResourceCloser();
         try {
+            Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession();
+
             final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage
                 .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
             JMSMessageHeadersType inMessageProperties = (JMSMessageHeadersType)inMessage
                 .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS);
-            JMSUtils.initResponseMessageProperties(messageProperties, inMessageProperties);
-            JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, messageProperties);
+            initResponseMessageProperties(messageProperties, inMessageProperties);
 
             // setup the reply message
             final javax.jms.Message request = (javax.jms.Message)inMessage
                 .get(JMSConstants.JMS_REQUEST_MESSAGE);
-            final String msgType;
-            if (isMtomEnabled(outMessage)) {
-                msgType = JMSConstants.BINARY_MESSAGE_TYPE;
-            } else if (request instanceof TextMessage) {
-                msgType = JMSConstants.TEXT_MESSAGE_TYPE;
-            } else if (request instanceof BytesMessage) {
-                msgType = JMSConstants.BYTE_MESSAGE_TYPE;
-            } else {
-                msgType = JMSConstants.BINARY_MESSAGE_TYPE;
+            final String msgType = JMSMessageUtils.isMtomEnabled(outMessage)
+                ? JMSConstants.BINARY_MESSAGE_TYPE : JMSMessageUtils.getMessageType(request);
+            if (isTimedOut(request)) {
+                return;
             }
 
-            Destination replyTo = getReplyToDestination(jmsTemplate, inMessage);
-
-            if (request.getJMSExpiration() > 0) {
-                TimeZone tz = new SimpleTimeZone(0, "GMT");
-                Calendar cal = new GregorianCalendar(tz);
-                long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
-                if (timeToLive < 0) {
-                    getLogger()
-                        .log(Level.INFO, "Message time to live is already expired skipping response.");
-                    return;
-                }
+            Destination replyTo = getReplyToDestination(session, inMessage);
+            if (replyTo == null) {
+                throw new RuntimeException("No replyTo destination set");
             }
 
             getLogger().log(Level.FINE, "send out the message!");
-            jmsTemplate.send(replyTo, new MessageCreator() {
-                public javax.jms.Message createMessage(Session session) throws JMSException {
-                    javax.jms.Message reply = JMSUtils.createAndSetPayload(replyObj, session, msgType);
-
-                    reply.setJMSCorrelationID(determineCorrelationID(request));
-
-                    JMSUtils.prepareJMSProperties(messageProperties, outMessage, jmsConfig);
-                    JMSUtils.setJMSProperties(reply, messageProperties);
-
-                    LOG.log(Level.FINE, "server sending reply: ", reply);
-                    return reply;
-                }
-            });
 
+            String correlationId = determineCorrelationID(request);
+            javax.jms.Message reply = JMSMessageUtils.asJMSMessage(jmsConfig, 
+                                      outMessage, 
+                                      replyObj, 
+                                      msgType,
+                                      session,
+                                      correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS);
+            JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties);
+            LOG.log(Level.FINE, "server sending reply: ", reply);
+            sender.sendMessage(closer, session, replyTo, reply);
         } catch (JMSException ex) {
             throw JmsUtils.convertJmsAccessException(ex);
+        } finally {
+            closer.close();
+        }
+    }
+    
+    /**
+     * @param messageProperties
+     * @param inMessageProperties
+     */
+    public static void initResponseMessageProperties(JMSMessageHeadersType messageProperties,
+                                                     JMSMessageHeadersType inMessageProperties) {
+        messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode());
+        messageProperties.setJMSPriority(inMessageProperties.getJMSPriority());
+        messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI());
+        messageProperties.setSOAPJMSBindingVersion("1.0");
+    }
+
+
+    private boolean isTimedOut(final javax.jms.Message request) throws JMSException {
+        if (request.getJMSExpiration() > 0) {
+            TimeZone tz = new SimpleTimeZone(0, "GMT");
+            Calendar cal = new GregorianCalendar(tz);
+            long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis();
+            if (timeToLive < 0) {
+                getLogger()
+                    .log(Level.INFO, "Message time to live is already expired skipping response.");
+                return true;
+            }
         }
+        return false;
     }
 
     protected Logger getLogger() {
@@ -349,83 +329,4 @@ public class JMSDestination extends Abst
         this.jmsConfig = jmsConfig;
     }
 
-    /**
-     * Conduit for sending the reply back to the client
-     */
-    protected class BackChannelConduit extends AbstractConduit {
-
-        protected Message inMessage;
-        private JMSExchangeSender sender;
-
-        BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) {
-            super(ref);
-            inMessage = message;
-            this.sender = sender;
-        }
-        @Override
-        public void close(Message msg) throws IOException {
-            Writer writer = msg.getContent(Writer.class);
-            if (writer != null) {
-                writer.close();
-            }
-            Reader reader = msg.getContent(Reader.class);
-            if (reader != null) {
-                reader.close();
-            }
-            super.close(msg);
-        }
-        /**
-         * Register a message observer for incoming messages.
-         * 
-         * @param observer the observer to notify on receipt of incoming
-         */
-        public void setMessageObserver(MessageObserver observer) {
-            // shouldn't be called for a back channel conduit
-        }
-
-        /**
-         * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input
-         * message (if any).
-         * 
-         * @param message the message to be sent.
-         */
-        public void prepare(final Message message) throws IOException {
-            // setup the message to be send back
-            javax.jms.Message jmsMessage = (javax.jms.Message)inMessage
-                .get(JMSConstants.JMS_REQUEST_MESSAGE);
-            message.put(JMSConstants.JMS_REQUEST_MESSAGE, jmsMessage);
-
-            if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)
-                && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) {
-                message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage
-                    .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS));
-            }
-
-            Exchange exchange = inMessage.getExchange();
-            exchange.setOutMessage(message);
-            
-            if ((jmsMessage instanceof TextMessage) && !isMtomEnabled(message)) {
-                message.setContent(Writer.class, new StringWriter() {
-                    @Override
-                    public void close() throws IOException {
-                        super.close();
-                        sender.sendExchange(message.getExchange(), toString());
-                    }
-                });
-
-            } else {
-                message.setContent(OutputStream.class, new JMSOutputStream(sender, exchange, false));
-            }
-        }
-        
-        protected Logger getLogger() {
-            return LOG;
-        }
-    }
-
-    private boolean isMtomEnabled(final Message message) {
-        return MessageUtils.isTrue(message.getContextualProperty(
-                                                       org.apache.cxf.message.Message.MTOM_ENABLED));
-    }
-
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java Wed Feb  5 21:58:23 2014
@@ -21,14 +21,13 @@ package org.apache.cxf.transport.jms;
 import org.apache.cxf.message.Exchange;
 
 /**
- * Callback interface for JMSOutputStream
+ * Callback interface for SendingOutputStream and SendingWriter
  */
 interface JMSExchangeSender {
     
     /**
-     * Is called from JMSOutputStream.doClose() when the stream is fully
-     * written. Sends the outMessage of the given exchange with the given payload
-     * from the JMSOutputStream. If the exchange is not oneway a reply should be recieved
+     * Sends the outMessage of the given exchange with the given payload.
+     * If the exchange is not oneway a reply should be recieved
      * and set as inMessage
      * 
      * @param exchange

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=1564952&r1=1564951&r2=1564952&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Wed Feb  5 21:58:23 2014
@@ -19,33 +19,32 @@
 package org.apache.cxf.transport.jms;
 
 import java.lang.reflect.Method;
-import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
-import javax.jms.JMSException;
 import javax.jms.MessageListener;
-import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.XAConnectionFactory;
 import javax.naming.NamingException;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
 
 import org.apache.cxf.common.logging.LogUtils;
 import org.apache.cxf.service.model.EndpointInfo;
-import org.springframework.core.task.TaskExecutor;
+import org.apache.cxf.transport.jms.util.JMSSender;
+import org.apache.cxf.transport.jms.util.ResourceCloser;
+import org.apache.cxf.transport.jms.util.SessionFactory;
 import org.springframework.jms.connection.SingleConnectionFactory;
 import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.listener.AbstractMessageListenerContainer;
 import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.jms.support.destination.DestinationResolver;
 
 /**
  * Factory to create JmsTemplates and JmsListeners from configuration and context information
  */
 public final class JMSFactory {
+    static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory";
+    static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod";
 
     private static final Logger LOG = LogUtils.getL7dLogger(JMSFactory.class);
     
@@ -53,7 +52,7 @@ public final class JMSFactory {
     }
 
     /**
-     * Retreive connection factory from jndi, wrap it in a UserCredentialsConnectionFactoryAdapter,
+     * Retrieve connection factory from jndi, wrap it in a UserCredentialsConnectionFactoryAdapter,
      * set username and password and return the ConnectionFactory
      * 
      * @param jmsConfig
@@ -87,7 +86,7 @@ public final class JMSFactory {
             throw new RuntimeException(e);
         }
     }
-
+    
     /**
      * Create JmsTemplate from configuration information. Most settings are taken from jmsConfig. The QoS
      * settings in headers override the settings from jmsConfig
@@ -96,33 +95,22 @@ public final class JMSFactory {
      * @param messageProperties context headers
      * @return
      */
-    public static JmsTemplate createJmsTemplate(JMSConfiguration jmsConfig,
-                                                JMSMessageHeadersType messageProperties) {
-        if (jmsConfig.getJmsTemplate() != null) {
-            return jmsConfig.getJmsTemplate();
-        }
-        JmsTemplate jmsTemplate = new JmsTemplate();
-        jmsTemplate.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory());
-        jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
-        if (jmsConfig.getReceiveTimeout() != null) {
-            jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
-        }
+    public static JMSSender createJmsSender(JMSConfiguration jmsConfig,
+                                            JMSMessageHeadersType messageProperties) {
+        JMSSender sender = new JMSSender();
         long timeToLive = (messageProperties != null && messageProperties.isSetTimeToLive())
             ? messageProperties.getTimeToLive() : jmsConfig.getTimeToLive();
-        jmsTemplate.setTimeToLive(timeToLive);
+        sender.setTimeToLive(timeToLive);
         int priority = (messageProperties != null && messageProperties.isSetJMSPriority())
             ? messageProperties.getJMSPriority() : jmsConfig.getPriority();
-        jmsTemplate.setPriority(priority);
+        sender.setPriority(priority);
         int deliveryMode = (messageProperties != null && messageProperties.isSetJMSDeliveryMode())
             ? messageProperties.getJMSDeliveryMode() : jmsConfig.getDeliveryMode();
-        jmsTemplate.setDeliveryMode(deliveryMode);
-        jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled());
-        jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
-        if (jmsConfig.getDestinationResolver() != null) {
-            jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
-        }
-        return jmsTemplate;
+        sender.setDeliveryMode(deliveryMode);
+        sender.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled());
+        return sender;
     }
+
     /**
      * Create and start listener using configuration information from jmsConfig. Uses
      * resolveOrCreateDestination to determine the destination for the listener.
@@ -130,13 +118,13 @@ public final class JMSFactory {
      * @param ei the EndpointInfo for the listener
      * @param jmsConfig configuration information
      * @param listenerHandler object to be called when a message arrives
-     * @param destinationName null for temp dest or a destination name
+     * @param destination to listen on
      * @return
      */
     public static AbstractMessageListenerContainer createJmsListener(EndpointInfo ei,
                                                                     JMSConfiguration jmsConfig,
                                                                     MessageListener listenerHandler,
-                                                                    String destinationName) {
+                                                                    Destination destination) {
         
         if (jmsConfig.getMessageListenerContainer() != null) {
             AbstractMessageListenerContainer  jmsListener =  jmsConfig.getMessageListenerContainer();
@@ -148,18 +136,16 @@ public final class JMSFactory {
             return jmsListener;
         }
         
-        if (jmsConfig.getMessageListenerContainer() != null) {
-            return jmsConfig.getMessageListenerContainer();
-        }
         DefaultMessageListenerContainer jmsListener = null;
         
         //Check to see if transport is being used in JCA RA with XA
-        Method method = ei.getProperty(JCATransactionalMessageListenerContainer.MDB_TRANSACTED_METHOD,
+        Method method = ei.getProperty(MDB_TRANSACTED_METHOD,
                                        java.lang.reflect.Method.class);
+        MessageEndpointFactory factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY, 
+                                      MessageEndpointFactory.class);
         if (method != null 
-            && 
-            jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) {
-            jmsListener = new JCATransactionalMessageListenerContainer(ei); 
+            && jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) {
+            jmsListener = new JCATransactionalMessageListenerContainer(factory, method); 
         } else {
             jmsListener = new DefaultMessageListenerContainer();
         }
@@ -167,9 +153,10 @@ public final class JMSFactory {
         return createJmsListener(jmsListener,
                                  jmsConfig,
                                  listenerHandler,
-                                 destinationName,
-                                 null, null, false);            
+                                 destination,
+                                 null);            
     }
+
     /**
      * Create and start listener using configuration information from jmsConfig. Uses
      * resolveOrCreateDestination to determine the destination for the listener.
@@ -177,56 +164,36 @@ public final class JMSFactory {
      * @param jmsConfig configuration information
      * @param listenerHandler object to be called when a message arrives
      * @param destinationName null for temp dest or a destination name
-     * @param messageSelectorPrefix prefix for the messageselector
+     * @param conduitId id for message selector
      * @return
      */
     public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
                                                                     MessageListener listenerHandler,
                                                                     Destination destination, 
-                                                                    String messageSelectorPrefix,
-                                                                    boolean userCID) {
+                                                                    String conduitId) {
         DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer(); 
-        
-        return createJmsListener(jmsListener,
-                                 jmsConfig,
-                                 listenerHandler,
-                                 null,
-                                 destination, 
-                                 messageSelectorPrefix,
-                                 userCID);    
-    }
-    public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
-                                                                    MessageListener listenerHandler,
-                                                                    String destination, 
-                                                                    String messageSelectorPrefix,
-                                                                    boolean userCID) {
-        DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer(); 
-        
         return createJmsListener(jmsListener,
                                  jmsConfig,
                                  listenerHandler,
                                  destination,
-                                 null, 
-                                 messageSelectorPrefix,
-                                 userCID);    
+                                 conduitId);    
     }
-    public static DefaultMessageListenerContainer createJmsListener(
+
+    private static DefaultMessageListenerContainer createJmsListener(
                           DefaultMessageListenerContainer jmsListener,
                           JMSConfiguration jmsConfig,
                           MessageListener listenerHandler,
-                          String destinationName,
                           Destination destination,
-                          String messageSelectorPrefix,
-                          boolean userCID) {
+                          String conduitId) {
         
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
         jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
-        jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+        
         jmsListener.setPubSubNoLocal(jmsConfig.isPubSubNoLocal());
         
-        jmsListener.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory());
-        jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
+        jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
         jmsListener.setSubscriptionDurable(jmsConfig.isSubscriptionDurable());
+        jmsListener.setClientId(jmsConfig.getDurableSubscriptionClientId());
         jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
         jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
         jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
@@ -236,8 +203,12 @@ public final class JMSFactory {
             if (jmsConfig.getServerReceiveTimeout() != null) {
                 jmsListener.setReceiveTimeout(jmsConfig.getServerReceiveTimeout());
             }
-        } else if (jmsConfig.getReceiveTimeout() != null) {
-            jmsListener.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+            jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+        } else {
+            if (jmsConfig.getReceiveTimeout() != null) {
+                jmsListener.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+            }
+            jmsListener.setPubSubDomain(jmsConfig.isReplyPubSubDomain());
         }
         if (jmsConfig.getRecoveryInterval() != JMSConfiguration.DEFAULT_VALUE) {
             jmsListener.setRecoveryInterval(jmsConfig.getRecoveryInterval());
@@ -257,88 +228,28 @@ public final class JMSFactory {
             jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping());
         }
         String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix();
-        if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) {
-            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
-                                        + staticSelectorPrefix 
-                                        + messageSelectorPrefix + "%'");
-        } else if (staticSelectorPrefix.length() > 0) {
-            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" 
-                                        + staticSelectorPrefix +  "%'");
-        }
+        String conduitIdSt = jmsConfig.isUseConduitIdSelector() && conduitId != null ? conduitId : "";
+        String correlationIdPrefix = staticSelectorPrefix + conduitIdSt;
         
-        if (jmsConfig.getDestinationResolver() != null) {
-            jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
-        }
-        if (jmsConfig.getTaskExecutor() != null) {
-            setTaskExecutor(jmsListener, jmsConfig.getTaskExecutor());
-        } 
-        if (destination != null) {
-            jmsListener.setDestination(destination);
-        } else if (jmsConfig.isAutoResolveDestination()) {
-            jmsListener.setDestinationName(destinationName);
-        } else {
-            JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null);
-            Destination dest = JMSFactory.resolveOrCreateDestination(jmsTemplate, destinationName, jmsConfig
-                .isPubSubDomain());
-            jmsListener.setDestination(dest);
+        if (!correlationIdPrefix.isEmpty()) {
+            String messageSelector = "JMSCorrelationID LIKE '" + correlationIdPrefix + "%'";
+            jmsListener.setMessageSelector(messageSelector);
         }
+        
+        jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor());
+
+        jmsListener.setDestination(destination);
         jmsListener.initialize();
         jmsListener.start();
         return jmsListener;
     }
     
-    private static void setTaskExecutor(DefaultMessageListenerContainer jmsListener, TaskExecutor exec) {
-        //CXF-2630 - The method sig for DefaultMessageListenerContainer.setTaskExecutor changed between 
-        //Spring 2.5 and 3.0 and code compiled for one won't run on the other.   Thus, we need
-        //to revert to using some reflection to make this call
-        Exception ex = null;
-        for (Method m : jmsListener.getClass().getMethods()) {
-            if ("setTaskExecutor".equals(m.getName())
-                && m.getParameterTypes().length == 1
-                && m.getParameterTypes()[0].isInstance(exec)) {
-                try {
-                    m.invoke(jmsListener, exec);
-                    return;
-                } catch (Exception e) {
-                    ex = e;
-                }
-            }
-        }
-        //if we get here, we couldn't find a valid method or something else went wrong
-        if (ex != null) {
-            LOG.log(Level.WARNING, "ERROR_SETTING_TASKEXECUTOR", ex);
-        } else {
-            LOG.log(Level.WARNING, "NO_SETTASKEXECUTOR_METHOD", jmsListener.getClass().getName());
-        }
-    }
-
-    /**
-     * If the destinationName given is null then a temporary destination is created else the destination name
-     * is resolved using the resolver from the jmsConfig
-     * 
-     * @param jmsTemplate template to use for session and resolver
-     * @param replyToDestinationName null for temporary destination or a destination name
-     * @param pubSubDomain true=pubSub, false=Queues
-     * @return resolved destination
-     */
-    public static Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
-                                                          final String replyToDestinationName,
-                                                          final boolean pubSubDomain) {
-        return jmsTemplate.execute(new SessionCallback<Destination>() {
-            public Destination doInJms(Session session) throws JMSException {
-                if (replyToDestinationName == null) {
-                    if (session instanceof QueueSession) {
-                        // For JMS 1.0.2
-                        return ((QueueSession)session).createTemporaryQueue();
-                    } else {
-                        // For JMS 1.1
-                        return session.createTemporaryQueue();
-                    }
-                }
-                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
-                return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
-            }
-        });
+    public static SessionFactory createJmsSessionFactory(JMSConfiguration jmsConfig, ResourceCloser closer) {
+        SessionFactory sf = new SessionFactory(jmsConfig.getConnectionFactory(), closer);
+        sf.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
+        sf.setSessionTransacted(jmsConfig.isSessionTransacted());
+        sf.setDurableSubscriptionClientId(jmsConfig.getDurableSubscriptionClientId());
+        return sf;
     }
-
+    
 }



Mime
View raw message