Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 409BD10B84 for ; Wed, 5 Feb 2014 21:59:03 +0000 (UTC) Received: (qmail 98778 invoked by uid 500); 5 Feb 2014 21:59:01 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 98715 invoked by uid 500); 5 Feb 2014 21:59:01 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 98707 invoked by uid 99); 5 Feb 2014 21:59:00 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Feb 2014 21:59:00 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_FRT_ADULT2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Feb 2014 21:58:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4247C23888E2; Wed, 5 Feb 2014 21:58:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1564952 [1/4] - in /cxf/trunk: parent/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/ rt/transp... Date: Wed, 05 Feb 2014 21:58:24 -0000 To: commits@cxf.apache.org From: cschneider@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140205215826.4247C23888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cschneider Date: Wed Feb 5 21:58:23 2014 New Revision: 1564952 URL: http://svn.apache.org/r1564952 Log: CXF-5543 Make jms independent of spring. First part Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSMessageUtils.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/MessageStreamUtil.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSMessageConverter.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSSender.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/JMSUtil.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/ResourceCloser.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/SessionFactory.java (with props) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/util/TestReceiver.java (with props) cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/PooledConnectionTempQueueTest.java (with props) cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSSharedQueueTest.java (with props) Removed: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/XASingleConnectionFactory.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConfigurationTest.java Modified: cxf/trunk/parent/pom.xml cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSBrokerSetup.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/MessageIdAsCorrelationIdJMSConduitTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/OldConfigTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/RequestResponseTest.java cxf/trunk/rt/transports/jms/src/test/resources/jms_test_config.xml cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/testcases/SOAPJMSTestSuiteTest.java cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/jms/testsuite/util/JMSTestUtil.java cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerGzipTest.java cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerSoap12Test.java cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/JMSClientServerTest.java cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/swa/ClientServerSwaTest.java cxf/trunk/systests/transport-jms/src/test/java/org/apache/cxf/systest/jms/tx/JMSTransactionClientServerTest.java cxf/trunk/testutils/src/main/java/org/apache/cxf/testutil/common/EmbeddedJMSBrokerLauncher.java Modified: cxf/trunk/parent/pom.xml URL: http://svn.apache.org/viewvc/cxf/trunk/parent/pom.xml?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/parent/pom.xml (original) +++ cxf/trunk/parent/pom.xml Wed Feb 5 21:58:23 2014 @@ -72,7 +72,7 @@ [0.0,3) 1.1.3 - 5.8.0 + 5.9.0 1.2.14 1.50 2.2_2 Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java?rev=1564952&view=auto ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java (added) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java Wed Feb 5 21:58:23 2014 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.jms; + +import java.io.IOException; +import java.util.logging.Logger; + +import javax.jms.TextMessage; + +import org.apache.cxf.common.logging.LogUtils; +import org.apache.cxf.message.Exchange; +import org.apache.cxf.message.Message; +import org.apache.cxf.transport.AbstractConduit; +import org.apache.cxf.transport.MessageObserver; +import org.apache.cxf.ws.addressing.EndpointReferenceType; + +/** + * Conduit for sending the reply back to the client + */ +class BackChannelConduit extends AbstractConduit { + private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class); + protected Message inMessage; + private JMSExchangeSender sender; + + BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) { + super(ref); + inMessage = message; + this.sender = sender; + } + @Override + public void close(Message msg) throws IOException { + MessageStreamUtil.closeStreams(msg); + super.close(msg); + } + + /** + * Register a message observer for incoming messages. + * + * @param observer the observer to notify on receipt of incoming + */ + public void setMessageObserver(MessageObserver observer) { + // shouldn't be called for a back channel conduit + } + + /** + * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input + * message (if any). + * + * @param message the message to be sent. + */ + public void prepare(final Message message) throws IOException { + // setup the message to be sent back + javax.jms.Message jmsMessage = (javax.jms.Message)inMessage + .get(JMSConstants.JMS_REQUEST_MESSAGE); + message.put(JMSConstants.JMS_REQUEST_MESSAGE, jmsMessage); + + if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS) + && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) { + message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage + .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)); + } + + final Exchange exchange = inMessage.getExchange(); + exchange.setOutMessage(message); + + boolean isTextMessage = (jmsMessage instanceof TextMessage) && !JMSMessageUtils.isMtomEnabled(message); + MessageStreamUtil.prepareStream(message, isTextMessage, sender); + } + + protected Logger getLogger() { + return LOG; + } +} \ No newline at end of file Propchange: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/BackChannelConduit.java ------------------------------------------------------------------------------ svn:mime-type = text/plain Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JCATransactionalMessageListenerContainer.java Wed Feb 5 21:58:23 2014 @@ -31,21 +31,17 @@ import javax.resource.spi.endpoint.Messa import javax.resource.spi.endpoint.MessageEndpointFactory; import javax.transaction.xa.XAResource; -import org.apache.cxf.service.model.EndpointInfo; import org.springframework.jms.listener.DefaultMessageListenerContainer; import org.springframework.jms.support.JmsUtils; public class JCATransactionalMessageListenerContainer extends DefaultMessageListenerContainer { static final ThreadLocal, ?>> ENDPOINT_LOCAL = new ThreadLocal, ?>>(); - static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory"; - static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod"; private MessageEndpointFactory factory; private Method method; - public JCATransactionalMessageListenerContainer(EndpointInfo ei) { - factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY, - MessageEndpointFactory.class); - method = ei.getProperty(MDB_TRANSACTED_METHOD, Method.class); + public JCATransactionalMessageListenerContainer(MessageEndpointFactory factory, Method transactedMethod) { + this.factory = factory; + this.method = transactedMethod; this.setCacheLevel(CACHE_CONNECTION); } @@ -68,7 +64,7 @@ public class JCATransactionalMessageList mp.put(MessageEndpoint.class, ep); ENDPOINT_LOCAL.set(mp); - ep.beforeDelivery(method); + ep.beforeDelivery(this.method); messageReceived = doReceiveAndExecute(invoker, s, mc, null); ep.afterDelivery(); } catch (Exception ex) { Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Wed Feb 5 21:58:23 2014 @@ -20,11 +20,7 @@ package org.apache.cxf.transport.jms; import java.io.IOException; -import java.io.OutputStream; -import java.io.Reader; -import java.io.StringWriter; import java.io.UnsupportedEncodingException; -import java.io.Writer; import java.lang.ref.WeakReference; import java.util.Map; import java.util.UUID; @@ -37,7 +33,6 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageListener; import javax.jms.Session; -import javax.jms.TemporaryQueue; import org.apache.cxf.Bus; import org.apache.cxf.buslifecycle.BusLifeCycleListener; @@ -46,16 +41,16 @@ import org.apache.cxf.common.logging.Log import org.apache.cxf.configuration.ConfigurationException; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; -import org.apache.cxf.message.MessageImpl; import org.apache.cxf.message.MessageUtils; -import org.apache.cxf.service.model.EndpointInfo; +import org.apache.cxf.security.SecurityContext; import org.apache.cxf.transport.AbstractConduit; +import org.apache.cxf.transport.jms.util.JMSSender; +import org.apache.cxf.transport.jms.util.JMSUtil; +import org.apache.cxf.transport.jms.util.ResourceCloser; import org.apache.cxf.ws.addressing.EndpointReferenceType; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.MessageCreator; -import org.springframework.jms.listener.AbstractMessageListenerContainer; +import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.listener.DefaultMessageListenerContainer; -import org.springframework.jms.support.JmsUtils; +import org.springframework.jms.support.destination.DestinationResolver; /** * JMSConduit is instantiated by the JMSTransportFactory which is selected by a client if the transport @@ -69,24 +64,21 @@ public class JMSConduit extends Abstract private static final String CORRELATED = JMSConduit.class.getName() + ".correlated"; - private EndpointInfo endpointInfo; private JMSConfiguration jmsConfig; private Map correlationMap = new ConcurrentHashMap(); private DefaultMessageListenerContainer jmsListener; - private DefaultMessageListenerContainer allListener; private String conduitId; private AtomicLong messageCount; private JMSBusLifeCycleListener listener; private Bus bus; + private Destination staticReplyDestination; - public JMSConduit(EndpointInfo endpointInfo, - EndpointReferenceType target, + public JMSConduit(EndpointReferenceType target, JMSConfiguration jmsConfig, Bus b) { super(target); bus = b; this.jmsConfig = jmsConfig; - this.endpointInfo = endpointInfo; conduitId = UUID.randomUUID().toString().replaceAll("-", ""); messageCount = new AtomicLong(0); } @@ -97,59 +89,24 @@ public class JMSConduit extends Abstract * JMSOutputStream will then call back the sendExchange method of this class. {@inheritDoc} */ public void prepare(final Message message) throws IOException { - String name = endpointInfo.getName().toString() + ".jms-conduit"; - org.apache.cxf.common.i18n.Message msg = - new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_CONDUIT", LOG, name); - jmsConfig.ensureProperlyConfigured(msg); boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType()); - if (isTextPayload) { - message.setContent(Writer.class, new StringWriter() { - @Override - public void close() throws IOException { - super.close(); - sendExchange(message.getExchange(), toString()); - } - }); - } else { - JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload); - message.setContent(OutputStream.class, out); - } + MessageStreamUtil.prepareStream(message, isTextPayload, this); } + @Override public void close(Message msg) throws IOException { - Writer writer = msg.getContent(Writer.class); - if (writer != null) { - writer.close(); - } - Reader reader = msg.getContent(Reader.class); - if (reader != null) { - reader.close(); - } + MessageStreamUtil.closeStreams(msg); super.close(msg); } - private synchronized AbstractMessageListenerContainer getJMSListener() { + private synchronized void getJMSListener(Destination replyTo) { if (jmsListener == null) { jmsListener = JMSFactory.createJmsListener(jmsConfig, this, - jmsConfig.getReplyDestination(), - conduitId, - false); - addBusListener(); - } - return jmsListener; - } - private synchronized AbstractMessageListenerContainer getAllListener() { - if (allListener == null) { - allListener = JMSFactory.createJmsListener(jmsConfig, - this, - jmsConfig.getReplyDestination(), - null, - true); + replyTo, + conduitId); addBusListener(); } - return allListener; } - /** * Send the JMS message and if the MEP is not oneway receive the response. * @@ -165,117 +122,103 @@ public class JMSConduit extends Abstract if (outMessage == null) { throw new RuntimeException("Exchange to be sent has no outMessage"); } - - boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType()); - if (isTextPayload && MessageUtils.isTrue(outMessage.getContextualProperty( - org.apache.cxf.message.Message.MTOM_ENABLED)) - && outMessage.getAttachments() != null && outMessage.getAttachments().size() > 0) { - org.apache.cxf.common.i18n.Message msg = - new org.apache.cxf.common.i18n.Message("INVALID_MESSAGE_TYPE", LOG); - throw new ConfigurationException(msg); - } + + jmsConfig.ensureProperlyConfigured(); + assertIsNotTextMessageAndMtom(outMessage); + //assertIsNotSyncAndTopicReply(exchange); JMSMessageHeadersType headers = getOrCreateJmsHeaders(outMessage); - String replyTo = headers.getJMSReplyTo(); - if (replyTo == null) { - replyTo = jmsConfig.getReplyDestination(); - } - final JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers); - String userCID = headers.getJMSCorrelationID(); + assertIsNotAsyncSyncAndUserCID(exchange, userCID); - String correlationId = createCorrelationId(exchange, userCID); - - Destination replyToDestination = null; - if (!exchange.isOneWay() || !jmsConfig.isEnforceSpec() && isSetReplyTo(outMessage) - && replyTo != null) { - if (!jmsConfig.isReplyPubSubDomain() - && (exchange.isSynchronous() - || exchange.isOneWay())) { - replyToDestination = JMSFactory.resolveOrCreateDestination(jmsTemplate, replyTo, - jmsConfig.isReplyPubSubDomain()); - } else { - if (userCID == null || !jmsConfig.isUseConduitIdSelector()) { - replyToDestination = getJMSListener().getDestination(); - } else { - replyToDestination = getAllListener().getDestination(); + ResourceCloser closer = new ResourceCloser(); + try { + Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession(); + DestinationResolver resolver = jmsConfig.getDestinationResolver(); + Destination targetDest = resolver.resolveDestinationName(session, + jmsConfig.getTargetDestination(), + jmsConfig.isPubSubDomain()); + + Destination replyToDestination = null; + if (!exchange.isOneWay()) { + if (!exchange.isSynchronous() && staticReplyDestination == null) { + staticReplyDestination = jmsConfig.getReplyDestination(session); + getJMSListener(staticReplyDestination); } + replyToDestination = jmsConfig.getReplyToDestination(session, headers.getJMSReplyTo()); } - } - final String cid = correlationId; - final Destination rtd = replyToDestination; - class JMSConduitMessageCreator implements MessageCreator { - private javax.jms.Message jmsMessage; - - public javax.jms.Message createMessage(Session session) throws JMSException { - String messageType = jmsConfig.getMessageType(); - Destination destination = rtd; - String replyToAddress = jmsConfig.getReplyToDestination(); - if (rtd == null && replyToAddress != null) { - destination = JMSFactory.resolveOrCreateDestination(jmsTemplate, replyToAddress, - jmsConfig.isPubSubDomain()); - } - jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(jmsConfig, outMessage, request, - messageType, session, destination, - cid); - if ((jmsConfig.isReplyPubSubDomain() || !exchange.isSynchronous()) && !exchange.isOneWay()) { - correlationMap.put(cid, exchange); - } - LOG.log(Level.FINE, "client sending request: ", jmsMessage); - return jmsMessage; + String messageType = jmsConfig.getMessageType(); + String correlationId = createCorrelationId(exchange, userCID); + if (correlationId != null) { + correlationMap.put(correlationId, exchange); } - - public String getMessageID() { - if (jmsMessage != null) { - try { - return jmsMessage.getJMSMessageID(); - } catch (JMSException e) { - return null; - } - } - return null; + + javax.jms.Message message = JMSMessageUtils.asJMSMessage(jmsConfig, + outMessage, + request, + messageType, + session, + correlationId, + JMSConstants.JMS_CLIENT_REQUEST_HEADERS); + if (replyToDestination != null) { + message.setJMSReplyTo(replyToDestination); } - } - JMSConduitMessageCreator messageCreator = new JMSConduitMessageCreator(); - /** - * If the message is not oneWay we will expect to receive a reply on the listener. - * - */ - if (!exchange.isOneWay()) { + + JMSSender sender = JMSFactory.createJmsSender(jmsConfig, headers); + synchronized (exchange) { - jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator); + sender.sendMessage(closer, session, targetDest, message); + LOG.log(Level.INFO, "client sending request message " + + message.getJMSMessageID() + " to " + targetDest); + headers.setJMSMessageID(message.getJMSMessageID()); if (correlationId == null) { - correlationId = messageCreator.getMessageID(); - } - headers.setJMSMessageID(messageCreator.getMessageID()); - - final String messageSelector = "JMSCorrelationID = '" + correlationId + "'"; - if (exchange.isSynchronous() && !jmsConfig.isReplyPubSubDomain()) { - javax.jms.Message replyMessage = jmsTemplate.receiveSelected(replyToDestination, - messageSelector); - if (replyMessage == null) { - throw new RuntimeException("Timeout receiving message with correlationId " - + correlationId); - } else { - doReplyMessage(exchange, replyMessage); - } - - // TODO How do we delete the temp queue in case of an async request - // or is async with a temp queue not possible ? - if (replyToDestination instanceof TemporaryQueue) { - try { - ((TemporaryQueue)replyToDestination).delete(); - } catch (JMSException e) { - // Only log the exception as the exchange should be able to proceed - LOG.log(Level.WARNING, "Unable to remove temporary queue: " + e.getMessage(), e); - } - } + // Warning: We might loose the reply if it already arrived at this point + correlationId = message.getJMSMessageID(); + correlationMap.put(correlationId, exchange); } } - } else { - jmsTemplate.send(jmsConfig.getTargetDestination(), messageCreator); - headers.setJMSMessageID(messageCreator.getMessageID()); + + /** + * If the message is not oneWay we will expect to receive a reply on the listener. + */ + if (!exchange.isOneWay() && (exchange.isSynchronous())) { + Destination replyDestination = staticReplyDestination != null + ? staticReplyDestination : replyToDestination; + javax.jms.Message replyMessage = JMSUtil.receive(session, replyDestination, correlationId, + jmsConfig.getReceiveTimeout(), + jmsConfig.isPubSubNoLocal()); + correlationMap.remove(correlationId); + doReplyMessage(exchange, replyMessage); + } + } catch (JMSException e) { + throw new RuntimeException(e.getMessage(), e); + } finally { + closer.close(); + } + } + + private void assertIsNotAsyncSyncAndUserCID(Exchange exchange, String userCID) { + if (!exchange.isSynchronous() && userCID != null) { + throw new IllegalArgumentException("User CID can not be used for asynchronous exchanges"); + } + } + + private void assertIsNotTextMessageAndMtom(final Message outMessage) { + boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType()); + if (isTextPayload && MessageUtils.isTrue(outMessage.getContextualProperty( + org.apache.cxf.message.Message.MTOM_ENABLED)) + && outMessage.getAttachments() != null && outMessage.getAttachments().size() > 0) { + org.apache.cxf.common.i18n.Message msg = + new org.apache.cxf.common.i18n.Message("INVALID_MESSAGE_TYPE", LOG); + throw new ConfigurationException(msg); + } + } + + @SuppressWarnings("unused") + private void assertIsNotSyncAndTopicReply(Exchange exchange) { + if (exchange.isSynchronous() && jmsConfig.isReplyPubSubDomain()) { + throw new IllegalArgumentException("Synchronous calls can not be combined with a response on a Topic"); } } @@ -296,7 +239,7 @@ public class JMSConduit extends Abstract String prefix = (jmsConfig.isUseConduitIdSelector()) ? jmsConfig.getConduitSelectorPrefix() + conduitId : jmsConfig.getConduitSelectorPrefix(); - correlationId = JMSUtils.createCorrelationId(prefix, messageCount.incrementAndGet()); + correlationId = JMSUtil.createCorrelationId(prefix, messageCount.incrementAndGet()); } } return correlationId; @@ -358,33 +301,42 @@ public class JMSConduit extends Abstract * request is notified {@inheritDoc} */ public void onMessage(javax.jms.Message jmsMessage) { - String correlationId; try { - correlationId = jmsMessage.getJMSCorrelationID(); + String correlationId = jmsMessage.getJMSCorrelationID(); + LOG.log(Level.INFO, "Received reply message with correlation id " + correlationId); + + int count = 0; + Exchange exchange = null; + while (exchange == null && count < 100) { + exchange = correlationMap.remove(correlationId); + Thread.sleep(100); + count++; + } + if (exchange == null) { + LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId); + return; + } + doReplyMessage(exchange, jmsMessage); } catch (JMSException e) { - throw JmsUtils.convertJmsAccessException(e); + throw JMSUtil.convertJmsException(e); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while correlating", e); } - Exchange exchange = correlationMap.remove(correlationId); - if (exchange == null) { - LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId); - return; - } - doReplyMessage(exchange, jmsMessage); } /** * Process the reply message + * @throws JMSException */ - public void doReplyMessage(Exchange exchange, javax.jms.Message jmsMessage) { - Message inMessage = new MessageImpl(); - exchange.setInMessage(inMessage); + public void doReplyMessage(Exchange exchange, javax.jms.Message jmsMessage) throws JMSException { + LOG.log(Level.FINE, "client received reply: ", jmsMessage); try { - JMSUtils.populateIncomingContext(jmsMessage, inMessage, - JMSConstants.JMS_CLIENT_RESPONSE_HEADERS, jmsConfig); - - JMSUtils.retrieveAndSetPayload(inMessage, jmsMessage, (String)inMessage.get(Message.ENCODING)); + Message inMessage = JMSMessageUtils.asCXFMessage(jmsMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); + SecurityContext securityContext = JMSMessageUtils.buildSecurityContext(jmsMessage, jmsConfig); + inMessage.put(SecurityContext.class, securityContext); + exchange.setInMessage(inMessage); if (exchange.isSynchronous()) { synchronized (exchange) { @@ -407,17 +359,18 @@ public class JMSConduit extends Abstract listener = null; } if (jmsListener != null) { + jmsListener.stop(); jmsListener.shutdown(); jmsListener = null; } - if (allListener != null) { - allListener.shutdown(); - allListener = null; - } } public synchronized void close() { + try { + ((SingleConnectionFactory)jmsConfig.getConnectionFactory()).resetConnection(); + } catch (Exception e) { + // Ignore + } shutdownListeners(); - jmsConfig.destroyWrappedConnectionFactory(); LOG.log(Level.FINE, "JMSConduit closed "); } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Wed Feb 5 21:58:23 2014 @@ -18,19 +18,22 @@ */ package org.apache.cxf.transport.jms; +import java.util.concurrent.Executor; + import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.XAConnectionFactory; +import javax.jms.Session; import org.apache.cxf.common.injection.NoJSR250Annotations; -import org.apache.cxf.configuration.ConfigurationException; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Required; import org.springframework.core.task.TaskExecutor; import org.springframework.jms.connection.SingleConnectionFactory; -import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.support.destination.DestinationResolver; +import org.springframework.jms.support.destination.DynamicDestinationResolver; import org.springframework.jndi.JndiTemplate; import org.springframework.transaction.PlatformTransactionManager; @@ -44,20 +47,18 @@ public class JMSConfiguration implements private boolean usingEndpointInfo = true; - private JmsTemplate jmsTemplate; private AbstractMessageListenerContainer messageListenerContainer; private JndiTemplate jndiTemplate; private ConnectionFactory connectionFactory; - private DestinationResolver destinationResolver; + private DestinationResolver destinationResolver = new DynamicDestinationResolver(); private PlatformTransactionManager transactionManager; - private boolean wrapInSingleConnectionFactory = true; private TaskExecutor taskExecutor; private boolean reconnectOnException = true; private boolean messageIdEnabled = true; private boolean messageTimestampEnabled = true; private boolean pubSubNoLocal; - private Long clientReceiveTimeout; + private Long clientReceiveTimeout = 0L; private Long serverReceiveTimeout; private boolean explicitQosEnabled; private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; @@ -81,6 +82,7 @@ public class JMSConfiguration implements * Destination name to listen on for reply messages */ private String replyDestination; + private Destination replyDestinationDest; /** * Destination name to send out as replyTo address in the message @@ -91,7 +93,6 @@ public class JMSConfiguration implements private boolean replyPubSubDomain; private Boolean useConduitIdSelector; private String conduitSelectorPrefix; - private boolean autoResolveDestination; private long recoveryInterval = DEFAULT_VALUE; private int cacheLevel = DEFAULT_VALUE; private String cacheLevelName; @@ -103,13 +104,19 @@ public class JMSConfiguration implements private String targetService; private String requestURI; - private ConnectionFactory wrappedConnectionFactory; - private boolean autoWrappedConnectionFactory; private JNDIConfiguration jndiConfig; - public void ensureProperlyConfigured(org.apache.cxf.common.i18n.Message msg) { - if (targetDestination == null || getOrCreateWrappedConnectionFactory() == null) { - throw new ConfigurationException(msg); + private SingleConnectionFactory singleConnectionFactory; + + public void ensureProperlyConfigured() { + if (connectionFactory == null) { + connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this); + } + if (connectionFactory == null) { + throw new IllegalArgumentException("JMSConfiguration.connectionFactory may not be null"); + } + if (targetDestination == null) { + throw new IllegalArgumentException("JMSConfigruation.targetDestination may not be null"); } } @@ -137,14 +144,6 @@ public class JMSConfiguration implements this.recoveryInterval = recoveryInterval; } - public boolean isAutoResolveDestination() { - return autoResolveDestination; - } - - public void setAutoResolveDestination(boolean autoResolveDestination) { - this.autoResolveDestination = autoResolveDestination; - } - public boolean isUsingEndpointInfo() { return this.usingEndpointInfo; } @@ -290,7 +289,7 @@ public class JMSConfiguration implements } public String getReplyToDestination() { - return replyToDestination; + return replyToDestination != null ? replyToDestination : replyDestination; } public void setReplyToDestination(String replyToDestination) { @@ -377,7 +376,7 @@ public class JMSConfiguration implements this.reconnectPercentOfMax = reconnectPercentOfMax; } - public TaskExecutor getTaskExecutor() { + public Executor getTaskExecutor() { return taskExecutor; } @@ -432,70 +431,21 @@ public class JMSConfiguration implements this.acceptMessagesWhileStopping = acceptMessagesWhileStopping; } - /** - * Tries to creates a ConnectionFactory from jndi if none was set as a property - * by using the jndConfig. Then it determines if the connectionFactory should be wrapped - * into a SingleConnectionFactory and wraps it if necessary. After the first call the - * same connectionFactory will be returned for all subsequent calls - * - * @return usable connectionFactory - */ - public synchronized ConnectionFactory getOrCreateWrappedConnectionFactory() { - if (wrappedConnectionFactory == null) { - if (connectionFactory == null) { - connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this); - } - if (wrapInSingleConnectionFactory && !(connectionFactory instanceof SingleConnectionFactory)) { - SingleConnectionFactory scf; - if (connectionFactory instanceof XAConnectionFactory) { - scf = new XASingleConnectionFactory(connectionFactory); - } else { - scf = new SingleConnectionFactory(connectionFactory); - } - autoWrappedConnectionFactory = true; - if (getDurableSubscriptionClientId() != null) { - scf.setClientId(getDurableSubscriptionClientId()); - } - scf.setReconnectOnException(isReconnectOnException()); - wrappedConnectionFactory = scf; - } else { - wrappedConnectionFactory = connectionFactory; - } - } - return wrappedConnectionFactory; - } - - public ConnectionFactory getWrappedConnectionFactory() { - return wrappedConnectionFactory; - } - - public synchronized void destroyWrappedConnectionFactory() { - if (autoWrappedConnectionFactory - && - wrappedConnectionFactory instanceof SingleConnectionFactory) { - ((SingleConnectionFactory) wrappedConnectionFactory).destroy(); - if (connectionFactory == wrappedConnectionFactory) { - connectionFactory = null; - } - wrappedConnectionFactory = null; - autoWrappedConnectionFactory = false; + public ConnectionFactory getPlainConnectionFactory() { + if (connectionFactory == null) { + connectionFactory = JMSFactory.getConnectionFactoryFromJndi(this); } - } - - /** - * Only for tests - * @return - */ - protected ConnectionFactory getConnectionFactory() { return connectionFactory; } - - public boolean isWrapInSingleConnectionFactory() { - return wrapInSingleConnectionFactory; - } - - public void setWrapInSingleConnectionFactory(boolean wrapInSingleConnectionFactory) { - this.wrapInSingleConnectionFactory = wrapInSingleConnectionFactory; + + public ConnectionFactory getConnectionFactory() { + if (singleConnectionFactory == null) { + ConnectionFactory cf = getPlainConnectionFactory(); + singleConnectionFactory = cf instanceof SingleConnectionFactory + ? (SingleConnectionFactory)cf : new SingleConnectionFactory(cf); + singleConnectionFactory.setClientId(durableSubscriptionClientId); + } + return singleConnectionFactory; } public String getDurableSubscriptionClientId() { @@ -537,14 +487,6 @@ public class JMSConfiguration implements return this.enforceSpec != null; } - public void setJmsTemplate(JmsTemplate jmsTemplate) { - this.jmsTemplate = jmsTemplate; - } - - public JmsTemplate getJmsTemplate() { - return jmsTemplate; - } - public AbstractMessageListenerContainer getMessageListenerContainer() { return messageListenerContainer; } @@ -565,4 +507,35 @@ public class JMSConfiguration implements public void setJmsProviderTibcoEms(boolean jmsProviderTibcoEms) { this.jmsProviderTibcoEms = jmsProviderTibcoEms; } + + public static Destination resolveOrCreateDestination(final Session session, + final DestinationResolver resolver, + final String replyToDestinationName, + final boolean pubSubDomain) throws JMSException { + if (replyToDestinationName == null) { + return session.createTemporaryQueue(); + } + return resolver.resolveDestinationName(session, replyToDestinationName, pubSubDomain); + } + + public Destination getReplyToDestination(Session session, String userDestination) throws JMSException { + String replyTo = userDestination; + if (replyTo == null) { + return getReplyDestination(session); + } + return getDestinationResolver().resolveDestinationName(session, replyTo, replyPubSubDomain); + } + + public Destination getReplyDestination(Session session) throws JMSException { + if (replyDestinationDest == null) { + replyDestinationDest = replyDestination == null + ? session.createTemporaryQueue() + : getDestinationResolver().resolveDestinationName(session, replyDestination, replyPubSubDomain); + } + return replyDestinationDest; + } + + public Destination getTargetDestination(Session session) throws JMSException { + return destinationResolver.resolveDestinationName(session, targetDestination, pubSubDomain); + } } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConstants.java Wed Feb 5 21:58:23 2014 @@ -54,6 +54,7 @@ public final class JMSConstants { public static final String JMS_CLIENT_CONFIG_ID = "jms-client"; public static final String JMS_SERVER_CONFIG_ID = "jms-server"; + // Is used by WS-Addressing public static final String JMS_REBASED_REPLY_TO = "org.apache.cxf.jms.server.replyto"; public static final String JMS_SET_REPLY_TO = "org.apache.cxf.jms.client.set.replyto"; Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Wed Feb 5 21:58:23 2014 @@ -19,12 +19,7 @@ package org.apache.cxf.transport.jms; -import java.io.IOException; -import java.io.OutputStream; -import java.io.Reader; -import java.io.StringWriter; import java.io.UnsupportedEncodingException; -import java.io.Writer; import java.util.Calendar; import java.util.Collection; import java.util.GregorianCalendar; @@ -35,12 +30,10 @@ import java.util.concurrent.ConcurrentLi import java.util.logging.Level; import java.util.logging.Logger; -import javax.jms.BytesMessage; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageListener; import javax.jms.Session; -import javax.jms.TextMessage; import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; @@ -54,31 +47,23 @@ import org.apache.cxf.interceptor.OneWay import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; -import org.apache.cxf.message.MessageUtils; +import org.apache.cxf.security.SecurityContext; import org.apache.cxf.service.model.EndpointInfo; -import org.apache.cxf.transport.AbstractConduit; import org.apache.cxf.transport.AbstractMultiplexDestination; import org.apache.cxf.transport.Conduit; -import org.apache.cxf.transport.MessageObserver; import org.apache.cxf.transport.jms.continuations.JMSContinuation; import org.apache.cxf.transport.jms.continuations.JMSContinuationProvider; +import org.apache.cxf.transport.jms.util.JMSSender; +import org.apache.cxf.transport.jms.util.JMSUtil; +import org.apache.cxf.transport.jms.util.ResourceCloser; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.ws.addressing.EndpointReferenceUtils; -import org.springframework.jms.connection.JmsResourceHolder; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.MessageCreator; -import org.springframework.jms.core.SessionCallback; import org.springframework.jms.listener.AbstractMessageListenerContainer; -import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.destination.DestinationResolver; -import org.springframework.transaction.PlatformTransactionManager; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionSynchronizationManager; public class JMSDestination extends AbstractMultiplexDestination - implements SessionAwareMessageListener, - MessageListener, JMSExchangeSender { + implements MessageListener, JMSExchangeSender { private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class); @@ -113,26 +98,33 @@ public class JMSDestination extends Abst */ public void activate() { getLogger().log(Level.FINE, "JMSDestination activate().... "); - String name = endpointInfo.getName().toString() + ".jms-destination"; - org.apache.cxf.common.i18n.Message msg = - new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION", LOG, name); - jmsConfig.ensureProperlyConfigured(msg); + jmsConfig.ensureProperlyConfigured(); Object o = ei.getProperty(AbstractMessageListenerContainer.class.getName()); if (o instanceof AbstractMessageListenerContainer && jmsConfig.getMessageListenerContainer() == null) { jmsConfig.setMessageListenerContainer((AbstractMessageListenerContainer)o); } + + Destination targetDestination = resolveTargetDestination(); jmsListener = JMSFactory.createJmsListener(ei, jmsConfig, this, - jmsConfig.getTargetDestination()); + targetDestination); + } + + private Destination resolveTargetDestination() { + ResourceCloser closer = new ResourceCloser(); + try { + Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession(); + return jmsConfig.getTargetDestination(session); + } catch (JMSException e) { + throw JMSUtil.convertJmsException(e); + } finally { + closer.close(); + } } public void deactivate() { if (jmsListener != null) { jmsListener.shutdown(); - // CXF-2788: SingleConnectionFactory ignores the call to - // javax.jms.Connection#close(), - // use this to really close the target connection. - jmsConfig.destroyWrappedConnectionFactory(); } } @@ -141,45 +133,32 @@ public class JMSDestination extends Abst this.deactivate(); } - private Destination resolveDestinationName(final JmsTemplate jmsTemplate, final String name) { - SessionCallback sc = new SessionCallback() { - public Destination doInJms(Session session) throws JMSException { - DestinationResolver resolv = jmsTemplate.getDestinationResolver(); - return resolv.resolveDestinationName(session, name, jmsConfig.isPubSubDomain()); - } - }; - return jmsTemplate.execute(sc); - } - - public Destination getReplyToDestination(JmsTemplate jmsTemplate, Message inMessage) throws JMSException { + public Destination getReplyToDestination(Session session, + Message inMessage) throws JMSException { javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); // If WS-Addressing had set the replyTo header. final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO); if (replyToName != null) { - return resolveDestinationName(jmsTemplate, replyToName); + DestinationResolver resolver = jmsConfig.getDestinationResolver(); + return resolver.resolveDestinationName(session, replyToName, jmsConfig.isReplyPubSubDomain()); } else if (message.getJMSReplyTo() != null) { return message.getJMSReplyTo(); - } else if (!StringUtils.isEmpty(jmsConfig.getReplyDestination())) { - return resolveDestinationName(jmsTemplate, jmsConfig.getReplyDestination()); } else { - throw new RuntimeException("No replyTo destination set on request message or cxf message"); + return jmsConfig.getReplyDestination(session); } } /** - * Decides what correlationId to use for the reply by looking at the request headers. If the request has a - * correlationId set this is taken. Else the messageId from the request message is used as correlation Id + * Decides what correlationId to use for the reply by looking at the request headers * - * @param request - * @return + * @param request jms request message + * @return correlation id of request if set else message id from request * @throws JMSException */ public String determineCorrelationID(javax.jms.Message request) throws JMSException { - String correlationID = request.getJMSCorrelationID(); - if (correlationID == null || "".equals(correlationID)) { - correlationID = request.getJMSMessageID(); - } - return correlationID; + return StringUtils.isEmpty(request.getJMSCorrelationID()) + ? request.getJMSMessageID() + : request.getJMSCorrelationID(); } /** @@ -191,22 +170,17 @@ public class JMSDestination extends Abst * @throws IOException */ public void onMessage(javax.jms.Message message) { - onMessage(message, null); - } - public void onMessage(javax.jms.Message message, Session session) { ClassLoaderHolder origLoader = null; Bus origBus = null; try { if (loader != null) { origLoader = ClassLoaderUtils.setThreadContextClassloader(loader); } - getLogger().log(Level.FINE, "server received request: ", message); - // Build CXF message from JMS message - Message inMessage = new MessageImpl(); - JMSUtils.populateIncomingContext(message, inMessage, - JMSConstants.JMS_SERVER_REQUEST_HEADERS, jmsConfig); - - JMSUtils.retrieveAndSetPayload(inMessage, message, (String)inMessage.get(Message.ENCODING)); + getLogger().log(Level.INFO, "JMS destination received message " + message + " on " + + jmsConfig.getTargetDestination()); + Message inMessage = JMSMessageUtils.asCXFMessage(message, JMSConstants.JMS_SERVER_REQUEST_HEADERS); + SecurityContext securityContext = JMSMessageUtils.buildSecurityContext(message, jmsConfig); + inMessage.put(SecurityContext.class, securityContext); inMessage.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, new JMSMessageHeadersType()); inMessage.put(JMSConstants.JMS_REQUEST_MESSAGE, message); ((MessageImpl)inMessage).setDestination(this); @@ -238,25 +212,14 @@ public class JMSDestination extends Abst && inMessage.getExchange().getInMessage() != null) { inMessage = inMessage.getExchange().getInMessage(); } - //need to propagate any exceptions back to Spring container - //so transactions can occur - if (inMessage.getContent(Exception.class) != null && session != null) { - PlatformTransactionManager m = jmsConfig.getTransactionManager(); - if (m != null) { - TransactionStatus status = m.getTransaction(null); - JmsResourceHolder resourceHolder = - (JmsResourceHolder) TransactionSynchronizationManager - .getResource(jmsConfig.getConnectionFactory()); - boolean trans = resourceHolder == null - || !resourceHolder.containsSession(session); - if (status != null && !status.isCompleted() && trans) { - Exception ex = inMessage.getContent(Exception.class); - if (ex.getCause() instanceof RuntimeException) { - throw (RuntimeException)ex.getCause(); - } else { - throw new RuntimeException(ex); - } - } + + // need to propagate any exceptions back so transactions can occur + if (inMessage.getContent(Exception.class) != null) { + Exception ex = inMessage.getContent(Exception.class); + if (ex.getCause() instanceof RuntimeException) { + throw (RuntimeException)ex.getCause(); + } else { + throw new RuntimeException(ex); } } @@ -264,6 +227,8 @@ public class JMSDestination extends Abst getLogger().log(Level.FINE, "Request message has been suspended"); } catch (UnsupportedEncodingException ex) { getLogger().log(Level.WARNING, "can't get the right encoding information. " + ex); + } catch (JMSException e) { + JMSUtil.convertJmsException(e); } finally { if (origBus != bus) { BusFactory.setThreadDefaultBus(origBus); @@ -279,62 +244,77 @@ public class JMSDestination extends Abst //Don't need to send anything return; } - Message inMessage = exchange.getInMessage(); + final Message inMessage = exchange.getInMessage(); final Message outMessage = exchange.getOutMessage(); + ResourceCloser closer = new ResourceCloser(); try { + Session session = JMSFactory.createJmsSessionFactory(jmsConfig, closer).createSession(); + final JMSMessageHeadersType messageProperties = (JMSMessageHeadersType)outMessage .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); JMSMessageHeadersType inMessageProperties = (JMSMessageHeadersType)inMessage .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - JMSUtils.initResponseMessageProperties(messageProperties, inMessageProperties); - JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, messageProperties); + initResponseMessageProperties(messageProperties, inMessageProperties); // setup the reply message final javax.jms.Message request = (javax.jms.Message)inMessage .get(JMSConstants.JMS_REQUEST_MESSAGE); - final String msgType; - if (isMtomEnabled(outMessage)) { - msgType = JMSConstants.BINARY_MESSAGE_TYPE; - } else if (request instanceof TextMessage) { - msgType = JMSConstants.TEXT_MESSAGE_TYPE; - } else if (request instanceof BytesMessage) { - msgType = JMSConstants.BYTE_MESSAGE_TYPE; - } else { - msgType = JMSConstants.BINARY_MESSAGE_TYPE; + final String msgType = JMSMessageUtils.isMtomEnabled(outMessage) + ? JMSConstants.BINARY_MESSAGE_TYPE : JMSMessageUtils.getMessageType(request); + if (isTimedOut(request)) { + return; } - Destination replyTo = getReplyToDestination(jmsTemplate, inMessage); - - if (request.getJMSExpiration() > 0) { - TimeZone tz = new SimpleTimeZone(0, "GMT"); - Calendar cal = new GregorianCalendar(tz); - long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); - if (timeToLive < 0) { - getLogger() - .log(Level.INFO, "Message time to live is already expired skipping response."); - return; - } + Destination replyTo = getReplyToDestination(session, inMessage); + if (replyTo == null) { + throw new RuntimeException("No replyTo destination set"); } getLogger().log(Level.FINE, "send out the message!"); - jmsTemplate.send(replyTo, new MessageCreator() { - public javax.jms.Message createMessage(Session session) throws JMSException { - javax.jms.Message reply = JMSUtils.createAndSetPayload(replyObj, session, msgType); - - reply.setJMSCorrelationID(determineCorrelationID(request)); - - JMSUtils.prepareJMSProperties(messageProperties, outMessage, jmsConfig); - JMSUtils.setJMSProperties(reply, messageProperties); - - LOG.log(Level.FINE, "server sending reply: ", reply); - return reply; - } - }); + String correlationId = determineCorrelationID(request); + javax.jms.Message reply = JMSMessageUtils.asJMSMessage(jmsConfig, + outMessage, + replyObj, + msgType, + session, + correlationId, JMSConstants.JMS_SERVER_RESPONSE_HEADERS); + JMSSender sender = JMSFactory.createJmsSender(jmsConfig, messageProperties); + LOG.log(Level.FINE, "server sending reply: ", reply); + sender.sendMessage(closer, session, replyTo, reply); } catch (JMSException ex) { throw JmsUtils.convertJmsAccessException(ex); + } finally { + closer.close(); + } + } + + /** + * @param messageProperties + * @param inMessageProperties + */ + public static void initResponseMessageProperties(JMSMessageHeadersType messageProperties, + JMSMessageHeadersType inMessageProperties) { + messageProperties.setJMSDeliveryMode(inMessageProperties.getJMSDeliveryMode()); + messageProperties.setJMSPriority(inMessageProperties.getJMSPriority()); + messageProperties.setSOAPJMSRequestURI(inMessageProperties.getSOAPJMSRequestURI()); + messageProperties.setSOAPJMSBindingVersion("1.0"); + } + + + private boolean isTimedOut(final javax.jms.Message request) throws JMSException { + if (request.getJMSExpiration() > 0) { + TimeZone tz = new SimpleTimeZone(0, "GMT"); + Calendar cal = new GregorianCalendar(tz); + long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); + if (timeToLive < 0) { + getLogger() + .log(Level.INFO, "Message time to live is already expired skipping response."); + return true; + } } + return false; } protected Logger getLogger() { @@ -349,83 +329,4 @@ public class JMSDestination extends Abst this.jmsConfig = jmsConfig; } - /** - * Conduit for sending the reply back to the client - */ - protected class BackChannelConduit extends AbstractConduit { - - protected Message inMessage; - private JMSExchangeSender sender; - - BackChannelConduit(JMSExchangeSender sender, EndpointReferenceType ref, Message message) { - super(ref); - inMessage = message; - this.sender = sender; - } - @Override - public void close(Message msg) throws IOException { - Writer writer = msg.getContent(Writer.class); - if (writer != null) { - writer.close(); - } - Reader reader = msg.getContent(Reader.class); - if (reader != null) { - reader.close(); - } - super.close(msg); - } - /** - * Register a message observer for incoming messages. - * - * @param observer the observer to notify on receipt of incoming - */ - public void setMessageObserver(MessageObserver observer) { - // shouldn't be called for a back channel conduit - } - - /** - * Send an outbound message, assumed to contain all the name-value mappings of the corresponding input - * message (if any). - * - * @param message the message to be sent. - */ - public void prepare(final Message message) throws IOException { - // setup the message to be send back - javax.jms.Message jmsMessage = (javax.jms.Message)inMessage - .get(JMSConstants.JMS_REQUEST_MESSAGE); - message.put(JMSConstants.JMS_REQUEST_MESSAGE, jmsMessage); - - if (!message.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS) - && inMessage.containsKey(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)) { - message.put(JMSConstants.JMS_SERVER_RESPONSE_HEADERS, inMessage - .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS)); - } - - Exchange exchange = inMessage.getExchange(); - exchange.setOutMessage(message); - - if ((jmsMessage instanceof TextMessage) && !isMtomEnabled(message)) { - message.setContent(Writer.class, new StringWriter() { - @Override - public void close() throws IOException { - super.close(); - sender.sendExchange(message.getExchange(), toString()); - } - }); - - } else { - message.setContent(OutputStream.class, new JMSOutputStream(sender, exchange, false)); - } - } - - protected Logger getLogger() { - return LOG; - } - } - - private boolean isMtomEnabled(final Message message) { - return MessageUtils.isTrue(message.getContextualProperty( - org.apache.cxf.message.Message.MTOM_ENABLED)); - } - } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java Wed Feb 5 21:58:23 2014 @@ -21,14 +21,13 @@ package org.apache.cxf.transport.jms; import org.apache.cxf.message.Exchange; /** - * Callback interface for JMSOutputStream + * Callback interface for SendingOutputStream and SendingWriter */ interface JMSExchangeSender { /** - * Is called from JMSOutputStream.doClose() when the stream is fully - * written. Sends the outMessage of the given exchange with the given payload - * from the JMSOutputStream. If the exchange is not oneway a reply should be recieved + * Sends the outMessage of the given exchange with the given payload. + * If the exchange is not oneway a reply should be recieved * and set as inMessage * * @param exchange Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=1564952&r1=1564951&r2=1564952&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Wed Feb 5 21:58:23 2014 @@ -19,33 +19,32 @@ package org.apache.cxf.transport.jms; import java.lang.reflect.Method; -import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.ConnectionFactory; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.MessageListener; -import javax.jms.QueueSession; import javax.jms.Session; import javax.jms.XAConnectionFactory; import javax.naming.NamingException; +import javax.resource.spi.endpoint.MessageEndpointFactory; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.service.model.EndpointInfo; -import org.springframework.core.task.TaskExecutor; +import org.apache.cxf.transport.jms.util.JMSSender; +import org.apache.cxf.transport.jms.util.ResourceCloser; +import org.apache.cxf.transport.jms.util.SessionFactory; import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; -import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.SessionCallback; import org.springframework.jms.listener.AbstractMessageListenerContainer; import org.springframework.jms.listener.DefaultMessageListenerContainer; -import org.springframework.jms.support.destination.DestinationResolver; /** * Factory to create JmsTemplates and JmsListeners from configuration and context information */ public final class JMSFactory { + static final String MESSAGE_ENDPOINT_FACTORY = "MessageEndpointFactory"; + static final String MDB_TRANSACTED_METHOD = "MDBTransactedMethod"; private static final Logger LOG = LogUtils.getL7dLogger(JMSFactory.class); @@ -53,7 +52,7 @@ public final class JMSFactory { } /** - * Retreive connection factory from jndi, wrap it in a UserCredentialsConnectionFactoryAdapter, + * Retrieve connection factory from jndi, wrap it in a UserCredentialsConnectionFactoryAdapter, * set username and password and return the ConnectionFactory * * @param jmsConfig @@ -87,7 +86,7 @@ public final class JMSFactory { throw new RuntimeException(e); } } - + /** * Create JmsTemplate from configuration information. Most settings are taken from jmsConfig. The QoS * settings in headers override the settings from jmsConfig @@ -96,33 +95,22 @@ public final class JMSFactory { * @param messageProperties context headers * @return */ - public static JmsTemplate createJmsTemplate(JMSConfiguration jmsConfig, - JMSMessageHeadersType messageProperties) { - if (jmsConfig.getJmsTemplate() != null) { - return jmsConfig.getJmsTemplate(); - } - JmsTemplate jmsTemplate = new JmsTemplate(); - jmsTemplate.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory()); - jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain()); - if (jmsConfig.getReceiveTimeout() != null) { - jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout()); - } + public static JMSSender createJmsSender(JMSConfiguration jmsConfig, + JMSMessageHeadersType messageProperties) { + JMSSender sender = new JMSSender(); long timeToLive = (messageProperties != null && messageProperties.isSetTimeToLive()) ? messageProperties.getTimeToLive() : jmsConfig.getTimeToLive(); - jmsTemplate.setTimeToLive(timeToLive); + sender.setTimeToLive(timeToLive); int priority = (messageProperties != null && messageProperties.isSetJMSPriority()) ? messageProperties.getJMSPriority() : jmsConfig.getPriority(); - jmsTemplate.setPriority(priority); + sender.setPriority(priority); int deliveryMode = (messageProperties != null && messageProperties.isSetJMSDeliveryMode()) ? messageProperties.getJMSDeliveryMode() : jmsConfig.getDeliveryMode(); - jmsTemplate.setDeliveryMode(deliveryMode); - jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled()); - jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted()); - if (jmsConfig.getDestinationResolver() != null) { - jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver()); - } - return jmsTemplate; + sender.setDeliveryMode(deliveryMode); + sender.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled()); + return sender; } + /** * Create and start listener using configuration information from jmsConfig. Uses * resolveOrCreateDestination to determine the destination for the listener. @@ -130,13 +118,13 @@ public final class JMSFactory { * @param ei the EndpointInfo for the listener * @param jmsConfig configuration information * @param listenerHandler object to be called when a message arrives - * @param destinationName null for temp dest or a destination name + * @param destination to listen on * @return */ public static AbstractMessageListenerContainer createJmsListener(EndpointInfo ei, JMSConfiguration jmsConfig, MessageListener listenerHandler, - String destinationName) { + Destination destination) { if (jmsConfig.getMessageListenerContainer() != null) { AbstractMessageListenerContainer jmsListener = jmsConfig.getMessageListenerContainer(); @@ -148,18 +136,16 @@ public final class JMSFactory { return jmsListener; } - if (jmsConfig.getMessageListenerContainer() != null) { - return jmsConfig.getMessageListenerContainer(); - } DefaultMessageListenerContainer jmsListener = null; //Check to see if transport is being used in JCA RA with XA - Method method = ei.getProperty(JCATransactionalMessageListenerContainer.MDB_TRANSACTED_METHOD, + Method method = ei.getProperty(MDB_TRANSACTED_METHOD, java.lang.reflect.Method.class); + MessageEndpointFactory factory = ei.getProperty(MESSAGE_ENDPOINT_FACTORY, + MessageEndpointFactory.class); if (method != null - && - jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) { - jmsListener = new JCATransactionalMessageListenerContainer(ei); + && jmsConfig.getConnectionFactory() instanceof XAConnectionFactory) { + jmsListener = new JCATransactionalMessageListenerContainer(factory, method); } else { jmsListener = new DefaultMessageListenerContainer(); } @@ -167,9 +153,10 @@ public final class JMSFactory { return createJmsListener(jmsListener, jmsConfig, listenerHandler, - destinationName, - null, null, false); + destination, + null); } + /** * Create and start listener using configuration information from jmsConfig. Uses * resolveOrCreateDestination to determine the destination for the listener. @@ -177,56 +164,36 @@ public final class JMSFactory { * @param jmsConfig configuration information * @param listenerHandler object to be called when a message arrives * @param destinationName null for temp dest or a destination name - * @param messageSelectorPrefix prefix for the messageselector + * @param conduitId id for message selector * @return */ public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig, MessageListener listenerHandler, Destination destination, - String messageSelectorPrefix, - boolean userCID) { + String conduitId) { DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer(); - - return createJmsListener(jmsListener, - jmsConfig, - listenerHandler, - null, - destination, - messageSelectorPrefix, - userCID); - } - public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig, - MessageListener listenerHandler, - String destination, - String messageSelectorPrefix, - boolean userCID) { - DefaultMessageListenerContainer jmsListener = new DefaultMessageListenerContainer(); - return createJmsListener(jmsListener, jmsConfig, listenerHandler, destination, - null, - messageSelectorPrefix, - userCID); + conduitId); } - public static DefaultMessageListenerContainer createJmsListener( + + private static DefaultMessageListenerContainer createJmsListener( DefaultMessageListenerContainer jmsListener, JMSConfiguration jmsConfig, MessageListener listenerHandler, - String destinationName, Destination destination, - String messageSelectorPrefix, - boolean userCID) { + String conduitId) { jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers()); jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers()); - jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsListener.setPubSubNoLocal(jmsConfig.isPubSubNoLocal()); - jmsListener.setConnectionFactory(jmsConfig.getOrCreateWrappedConnectionFactory()); - jmsListener.setMessageSelector(jmsConfig.getMessageSelector()); + jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory()); jmsListener.setSubscriptionDurable(jmsConfig.isSubscriptionDurable()); + jmsListener.setClientId(jmsConfig.getDurableSubscriptionClientId()); jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted()); jmsListener.setTransactionManager(jmsConfig.getTransactionManager()); @@ -236,8 +203,12 @@ public final class JMSFactory { if (jmsConfig.getServerReceiveTimeout() != null) { jmsListener.setReceiveTimeout(jmsConfig.getServerReceiveTimeout()); } - } else if (jmsConfig.getReceiveTimeout() != null) { - jmsListener.setReceiveTimeout(jmsConfig.getReceiveTimeout()); + jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); + } else { + if (jmsConfig.getReceiveTimeout() != null) { + jmsListener.setReceiveTimeout(jmsConfig.getReceiveTimeout()); + } + jmsListener.setPubSubDomain(jmsConfig.isReplyPubSubDomain()); } if (jmsConfig.getRecoveryInterval() != JMSConfiguration.DEFAULT_VALUE) { jmsListener.setRecoveryInterval(jmsConfig.getRecoveryInterval()); @@ -257,88 +228,28 @@ public final class JMSFactory { jmsListener.setAcceptMessagesWhileStopping(jmsConfig.isAcceptMessagesWhileStopping()); } String staticSelectorPrefix = jmsConfig.getConduitSelectorPrefix(); - if (!userCID && messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector()) { - jmsListener.setMessageSelector("JMSCorrelationID LIKE '" - + staticSelectorPrefix - + messageSelectorPrefix + "%'"); - } else if (staticSelectorPrefix.length() > 0) { - jmsListener.setMessageSelector("JMSCorrelationID LIKE '" - + staticSelectorPrefix + "%'"); - } + String conduitIdSt = jmsConfig.isUseConduitIdSelector() && conduitId != null ? conduitId : ""; + String correlationIdPrefix = staticSelectorPrefix + conduitIdSt; - if (jmsConfig.getDestinationResolver() != null) { - jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver()); - } - if (jmsConfig.getTaskExecutor() != null) { - setTaskExecutor(jmsListener, jmsConfig.getTaskExecutor()); - } - if (destination != null) { - jmsListener.setDestination(destination); - } else if (jmsConfig.isAutoResolveDestination()) { - jmsListener.setDestinationName(destinationName); - } else { - JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null); - Destination dest = JMSFactory.resolveOrCreateDestination(jmsTemplate, destinationName, jmsConfig - .isPubSubDomain()); - jmsListener.setDestination(dest); + if (!correlationIdPrefix.isEmpty()) { + String messageSelector = "JMSCorrelationID LIKE '" + correlationIdPrefix + "%'"; + jmsListener.setMessageSelector(messageSelector); } + + jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor()); + + jmsListener.setDestination(destination); jmsListener.initialize(); jmsListener.start(); return jmsListener; } - private static void setTaskExecutor(DefaultMessageListenerContainer jmsListener, TaskExecutor exec) { - //CXF-2630 - The method sig for DefaultMessageListenerContainer.setTaskExecutor changed between - //Spring 2.5 and 3.0 and code compiled for one won't run on the other. Thus, we need - //to revert to using some reflection to make this call - Exception ex = null; - for (Method m : jmsListener.getClass().getMethods()) { - if ("setTaskExecutor".equals(m.getName()) - && m.getParameterTypes().length == 1 - && m.getParameterTypes()[0].isInstance(exec)) { - try { - m.invoke(jmsListener, exec); - return; - } catch (Exception e) { - ex = e; - } - } - } - //if we get here, we couldn't find a valid method or something else went wrong - if (ex != null) { - LOG.log(Level.WARNING, "ERROR_SETTING_TASKEXECUTOR", ex); - } else { - LOG.log(Level.WARNING, "NO_SETTASKEXECUTOR_METHOD", jmsListener.getClass().getName()); - } - } - - /** - * If the destinationName given is null then a temporary destination is created else the destination name - * is resolved using the resolver from the jmsConfig - * - * @param jmsTemplate template to use for session and resolver - * @param replyToDestinationName null for temporary destination or a destination name - * @param pubSubDomain true=pubSub, false=Queues - * @return resolved destination - */ - public static Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate, - final String replyToDestinationName, - final boolean pubSubDomain) { - return jmsTemplate.execute(new SessionCallback() { - public Destination doInJms(Session session) throws JMSException { - if (replyToDestinationName == null) { - if (session instanceof QueueSession) { - // For JMS 1.0.2 - return ((QueueSession)session).createTemporaryQueue(); - } else { - // For JMS 1.1 - return session.createTemporaryQueue(); - } - } - DestinationResolver resolv = jmsTemplate.getDestinationResolver(); - return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain); - } - }); + public static SessionFactory createJmsSessionFactory(JMSConfiguration jmsConfig, ResourceCloser closer) { + SessionFactory sf = new SessionFactory(jmsConfig.getConnectionFactory(), closer); + sf.setAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); + sf.setSessionTransacted(jmsConfig.isSessionTransacted()); + sf.setDurableSubscriptionClientId(jmsConfig.getDurableSubscriptionClientId()); + return sf; } - + }