Return-Path: Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: (qmail 37255 invoked from network); 3 Oct 2008 11:43:11 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Oct 2008 11:43:11 -0000 Received: (qmail 27122 invoked by uid 500); 3 Oct 2008 11:43:10 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 27094 invoked by uid 500); 3 Oct 2008 11:43:09 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 27085 invoked by uid 99); 3 Oct 2008 11:43:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2008 04:43:09 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2008 11:42:03 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id A5BDC2388855; Fri, 3 Oct 2008 04:42:08 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r701354 - in /cxf/trunk/rt/transports/jms: ./ src/main/java/org/apache/cxf/transport/jms/ src/test/java/org/apache/cxf/transport/jms/ src/test/resources/wsdl/ Date: Fri, 03 Oct 2008 11:42:08 -0000 To: commits@cxf.apache.org From: cschneider@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081003114208.A5BDC2388855@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: cschneider Date: Fri Oct 3 04:42:07 2008 New Revision: 701354 URL: http://svn.apache.org/viewvc?rev=701354&view=rev Log: CXF-1832 QoS support Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Removed: cxf/trunk/rt/transports/jms/src/test/resources/wsdl/ Modified: cxf/trunk/rt/transports/jms/ (props changed) cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java Propchange: cxf/trunk/rt/transports/jms/ ------------------------------------------------------------------------------ --- svn:ignore (original) +++ svn:ignore Fri Oct 3 04:42:07 2008 @@ -1,9 +1,10 @@ -.pmd -.checkstyle -.ruleset -target -eclipse-classes -.settings -.classpath -.project -.wtpmodules +.pmd +.checkstyle +.ruleset +target +eclipse-classes +.settings +.classpath +.project +.wtpmodules +.springBeans Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Fri Oct 3 04:42:07 2008 @@ -28,10 +28,8 @@ import java.util.logging.Level; import java.util.logging.Logger; -import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageListener; -import javax.jms.QueueSession; import javax.jms.Session; import org.apache.cxf.common.logging.LogUtils; @@ -40,14 +38,10 @@ import org.apache.cxf.message.MessageImpl; import org.apache.cxf.transport.AbstractConduit; import org.apache.cxf.ws.addressing.EndpointReferenceType; -import org.springframework.beans.factory.InitializingBean; import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.JmsTemplate102; import org.springframework.jms.core.MessageCreator; -import org.springframework.jms.core.SessionCallback; import org.springframework.jms.listener.DefaultMessageListenerContainer; import org.springframework.jms.support.JmsUtils; -import org.springframework.jms.support.destination.DestinationResolver; /** * JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport @@ -55,73 +49,19 @@ * a JMS destination. If the Exchange is not oneway it then recevies the response and converts it to a CXF * Message. This is then provided in the Exchange and also sent to the incomingObserver */ -public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener, - InitializingBean { +public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener { static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class); private JMSConfiguration jmsConfig; private Map correlationMap; private DefaultMessageListenerContainer jmsListener; - private JmsTemplate jmsTemplate; public JMSConduit(EndpointReferenceType target, JMSConfiguration jmsConfig) { super(target); this.jmsConfig = jmsConfig; correlationMap = new ConcurrentHashMap(); } - - private Destination determineReplyToDestination(final JmsTemplate jmsTemplate2, - final String replyToDestinationName, - final boolean pubSubDomain) { - return (Destination)jmsTemplate2.execute(new SessionCallback() { - public Object doInJms(Session session) throws JMSException { - if (replyToDestinationName == null) { - if (session instanceof QueueSession) { - // For JMS 1.0.2 - return ((QueueSession)session).createTemporaryQueue(); - } else { - // For JMS 1.1 - return session.createTemporaryQueue(); - } - } - DestinationResolver resolv = jmsTemplate2.getDestinationResolver(); - return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain); - } - }); - } - - /** - * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc} - */ - public void afterPropertiesSet() { - jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102(); - jmsTemplate.setDefaultDestinationName(jmsConfig.getTargetDestination()); - jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory()); - jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain()); - jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout()); - jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive()); - jmsTemplate.setPriority(jmsConfig.getPriority()); - jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode()); - jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled()); - jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted()); - - jmsListener = new DefaultMessageListenerContainer(); - jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); - jmsListener.setAutoStartup(false); - jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory()); - jmsListener.setMessageSelector(jmsConfig.getMessageSelector()); - jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); - jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted()); - jmsListener.setTransactionManager(jmsConfig.getTransactionManager()); - - jmsListener.setMessageListener(this); - - if (jmsConfig.getDestinationResolver() != null) { - jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver()); - jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver()); - } - } /** * Prepare the message for send out. The message will be sent after the caller has written the payload to @@ -143,32 +83,36 @@ */ public void sendExchange(final Exchange exchange, final Object request) { LOG.log(Level.FINE, "JMSConduit send message"); + final Message outMessage = exchange.getOutMessage(); if (outMessage == null) { throw new RuntimeException("Exchange to be sent has no outMessage"); } - - if (!exchange.isOneWay() && !jmsListener.isRunning()) { - Destination replyTo = determineReplyToDestination(jmsTemplate, - jmsConfig.getReplyDestination(), - jmsConfig.isPubSubDomain()); - jmsListener.setDestination(replyTo); - jmsListener.start(); - jmsListener.initialize(); - } JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); + + JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers); + if (!exchange.isOneWay() && jmsListener == null) { + jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getReplyDestination()); + } + + final javax.jms.Destination replyTo = exchange.isOneWay() ? null : jmsListener.getDestination(); + final String correlationId = (headers != null && headers.isSetJMSCorrelationID()) ? headers .getJMSCorrelationID() : JMSUtils.generateCorrelationId(); // String selector = "JMSCorrelationID = '" + correlationId + "'"; - - jmsTemplate.send(new MessageCreator() { + Message inMessage = null; + if (!exchange.isOneWay()) { + inMessage = new MessageImpl(); + correlationMap.put(correlationId, inMessage); + } + jmsTemplate.send(jmsConfig.getTargetDestination(), new MessageCreator() { public javax.jms.Message createMessage(Session session) throws JMSException { String messageType = jmsConfig.getMessageType(); final javax.jms.Message jmsMessage; jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType, - session, jmsListener.getDestination(), + session, replyTo, correlationId); LOG.log(Level.FINE, "client sending request: ", jmsMessage); return jmsMessage; @@ -176,34 +120,34 @@ }); /** - * If the message is not oneWay we will expect to receive a reply on the listener. - * To receive this reply we add the correlationId and an empty CXF Message to the - * correlationMap. The listener will fill to Message and notify this thread + * If the message is not oneWay we will expect to receive a reply on the listener. To receive this + * reply we add the correlationId and an empty CXF Message to the correlationMap. The listener will + * fill to Message and notify this thread */ if (!exchange.isOneWay()) { - Message inMessage = new MessageImpl(); synchronized (inMessage) { - correlationMap.put(correlationId, inMessage); try { inMessage.wait(jmsTemplate.getReceiveTimeout()); } catch (InterruptedException e) { throw new RuntimeException(e); } correlationMap.remove(correlationId); + if (inMessage.getContent(InputStream.class) == null) { + throw new RuntimeException("Timeout receiving message with correlationId " + + correlationId); + } } exchange.setInMessage(inMessage); if (incomingObserver != null) { incomingObserver.onMessage(inMessage); - } + } } } /** - * When a message is received on the reply destination the correlation map is searched - * for the correlationId. If it is found the message is converted to a CXF message and the - * thread sending the request is notified - * - * {@inheritDoc} + * When a message is received on the reply destination the correlation map is searched for the + * correlationId. If it is found the message is converted to a CXF message and the thread sending the + * request is notified {@inheritDoc} */ public void onMessage(javax.jms.Message jmsMessage) { String correlationId; @@ -221,7 +165,7 @@ byte[] response = JMSUtils.retrievePayload(jmsMessage); LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]"); inMessage.setContent(InputStream.class, new ByteArrayInputStream(response)); - + synchronized (inMessage) { inMessage.notifyAll(); } @@ -229,7 +173,9 @@ } public void close() { - jmsListener.shutdown(); + if (jmsListener != null) { + jmsListener.shutdown(); + } LOG.log(Level.FINE, "JMSConduit closed "); } @@ -247,12 +193,10 @@ @Override protected void finalize() throws Throwable { - if (jmsListener.isRunning()) { + if (jmsListener != null) { jmsListener.shutdown(); } super.finalize(); } - - } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Oct 3 04:42:07 2008 @@ -54,7 +54,6 @@ import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.wsdl.EndpointReferenceUtils; import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.core.JmsTemplate102; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; import org.springframework.jms.listener.DefaultMessageListenerContainer; @@ -69,7 +68,6 @@ private JMSConfiguration jmsConfig; private Bus bus; private DefaultMessageListenerContainer jmsListener; - private JmsTemplate jmsTemplate; public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) { super(b, getTargetReference(info, b), info); @@ -91,41 +89,14 @@ */ public void activate() { getLogger().log(Level.INFO, "JMSDestination activate().... "); - - jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102(); - jmsTemplate.setDefaultDestinationName(jmsConfig.getReplyDestination()); - jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory()); - jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain()); - jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout()); - jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive()); - jmsTemplate.setPriority(jmsConfig.getPriority()); - jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode()); - jmsTemplate.setExplicitQosEnabled(true); - jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted()); - - jmsListener = new DefaultMessageListenerContainer(); - jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); - jmsListener.setAutoStartup(true); - jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory()); - jmsListener.setMessageSelector(jmsConfig.getMessageSelector()); - jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); - jmsListener.setDestinationName(jmsConfig.getTargetDestination()); - jmsListener.setMessageListener(this); - jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted()); - jmsListener.setTransactionManager(jmsConfig.getTransactionManager()); - - if (jmsConfig.getDestinationResolver() != null) { - jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver()); - jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver()); - } - - if (!jmsListener.isRunning()) { - jmsListener.initialize(); - } + jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getTargetDestination()); + jmsConfig.getTargetDestination(); } public void deactivate() { - jmsListener.shutdown(); + if (jmsListener != null) { + jmsListener.shutdown(); + } } public void shutdown() { @@ -133,7 +104,7 @@ this.deactivate(); } - private Destination resolveDestinationName(final String name) { + private Destination resolveDestinationName(final JmsTemplate jmsTemplate, final String name) { return (Destination)jmsTemplate.execute(new SessionCallback() { public Object doInJms(Session session) throws JMSException { DestinationResolver resolv = jmsTemplate.getDestinationResolver(); @@ -142,12 +113,12 @@ }); } - public Destination getReplyToDestination(Message inMessage) throws JMSException { + public Destination getReplyToDestination(JmsTemplate jmsTemplate, Message inMessage) throws JMSException { javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); // If WS-Addressing had set the replyTo header. final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO); if (replyToName != null) { - return resolveDestinationName(replyToName); + return resolveDestinationName(jmsTemplate, replyToName); } else if (message.getJMSReplyTo() != null) { return message.getJMSReplyTo(); } else { @@ -216,6 +187,12 @@ return; } try { + final JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage + .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); + JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage + .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); + JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, inHeaders); + // setup the reply message final javax.jms.Message request = (javax.jms.Message)inMessage .get(JMSConstants.JMS_REQUEST_MESSAGE); @@ -228,11 +205,7 @@ msgType = JMSConstants.BINARY_MESSAGE_TYPE; } - Destination replyTo = getReplyToDestination(inMessage); - final JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage - .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); - JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage - .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); + Destination replyTo = getReplyToDestination(jmsTemplate, inMessage); if (request.getJMSExpiration() > 0) { TimeZone tz = new SimpleTimeZone(0, "GMT"); @@ -245,11 +218,6 @@ } } - int deliveryMode = JMSUtils.getJMSDeliveryMode(inHeaders); - int priority = JMSUtils.getJMSPriority(inHeaders); - - jmsTemplate.setDeliveryMode(deliveryMode); - jmsTemplate.setPriority(priority); getLogger().log(Level.FINE, "send out the message!"); jmsTemplate.send(replyTo, new MessageCreator() { public javax.jms.Message createMessage(Session session) throws JMSException { @@ -278,6 +246,14 @@ return LOG; } + public JMSConfiguration getJmsConfig() { + return jmsConfig; + } + + public void setJmsConfig(JMSConfiguration jmsConfig) { + this.jmsConfig = jmsConfig; + } + /** * Conduit for sending the reply back to the client */ @@ -330,12 +306,4 @@ } } - public JMSConfiguration getJmsConfig() { - return jmsConfig; - } - - public void setJmsConfig(JMSConfiguration jmsConfig) { - this.jmsConfig = jmsConfig; - } - } Added: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=701354&view=auto ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java (added) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java Fri Oct 3 04:42:07 2008 @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cxf.transport.jms; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageListener; +import javax.jms.QueueSession; +import javax.jms.Session; + +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.JmsTemplate102; +import org.springframework.jms.core.SessionCallback; +import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.jms.listener.DefaultMessageListenerContainer102; +import org.springframework.jms.support.destination.DestinationResolver; + +/** + * Factory to create JmsTemplates and JmsListeners from configuration and context information + */ +public final class JMSFactory { + + private JMSFactory() { + } + + /** + * Create JmsTemplate from configuration information. Most settings are taken from jmsConfig. The QoS + * settings in headers override the settings from jmsConfig + * + * @param jmsConfig configuration information + * @param headers context headers + * @return + */ + public static JmsTemplate createJmsTemplate(JMSConfiguration jmsConfig, JMSMessageHeadersType headers) { + JmsTemplate jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102(); + jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory()); + jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout()); + jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive()); + int priority = (headers != null && headers.isSetJMSPriority()) + ? headers.getJMSPriority() : jmsConfig.getPriority(); + jmsTemplate.setPriority(priority); + int deliveryMode = (headers != null && headers.isSetJMSDeliveryMode()) ? headers + .getJMSDeliveryMode() : jmsConfig.getDeliveryMode(); + jmsTemplate.setDeliveryMode(deliveryMode); + jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled()); + jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted()); + if (jmsConfig.getDestinationResolver() != null) { + jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver()); + } + return jmsTemplate; + } + + /** + * Create and start listener using configuration information from jmsConfig. Uses + * resolveOrCreateDestination to determine the destination for the listener. + * + * @param jmsConfig configuration information + * @param listenerHandler object to be called when a message arrives + * @param destinationName null for temp dest or a destination name + * @return + */ + public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig, + MessageListener listenerHandler, + String destinationName) { + DefaultMessageListenerContainer jmsListener2 = jmsConfig.isUseJms11() + ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102(); + jmsListener2.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsListener2.setAutoStartup(true); + jmsListener2.setConnectionFactory(jmsConfig.getConnectionFactory()); + jmsListener2.setMessageSelector(jmsConfig.getMessageSelector()); + jmsListener2.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); + jmsListener2.setSessionTransacted(jmsConfig.isSessionTransacted()); + jmsListener2.setTransactionManager(jmsConfig.getTransactionManager()); + jmsListener2.setMessageListener(listenerHandler); + if (jmsConfig.getDestinationResolver() != null) { + jmsListener2.setDestinationResolver(jmsConfig.getDestinationResolver()); + } + JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null); + jmsListener2.setDestination(JMSFactory.resolveOrCreateDestination(jmsTemplate, destinationName, + jmsConfig.isPubSubDomain())); + jmsListener2.initialize(); + return jmsListener2; + } + + /** + * If the destinationName given is null then a temporary destination is created else the destination name + * is resolved using the resolver from the jmsConfig + * + * @param jmsTemplate template to use for session and resolver + * @param replyToDestinationName null for temporary destination or a destination name + * @param pubSubDomain true=pubSub, false=Queues + * @return resolved destination + */ + private static Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate, + final String replyToDestinationName, + final boolean pubSubDomain) { + return (Destination)jmsTemplate.execute(new SessionCallback() { + public Object doInJms(Session session) throws JMSException { + if (replyToDestinationName == null) { + if (session instanceof QueueSession) { + // For JMS 1.0.2 + return ((QueueSession)session).createTemporaryQueue(); + } else { + // For JMS 1.1 + return session.createTemporaryQueue(); + } + } + DestinationResolver resolv = jmsTemplate.getDestinationResolver(); + return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain); + } + }); + } + +} Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Fri Oct 3 04:42:07 2008 @@ -18,10 +18,16 @@ */ package org.apache.cxf.transport.jms; +import java.util.Enumeration; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; + import javax.jms.ConnectionFactory; import javax.naming.NamingException; import org.apache.cxf.Bus; +import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.configuration.Configurer; import org.apache.cxf.service.model.EndpointInfo; import org.springframework.jms.connection.SingleConnectionFactory; @@ -30,6 +36,7 @@ import org.springframework.jndi.JndiTemplate; public class JMSOldConfigHolder { + private static final Logger LOG = LogUtils.getL7dLogger(JMSOldConfigHolder.class); private ClientConfig clientConfig; private ClientBehaviorPolicyType runtimePolicy; @@ -83,7 +90,7 @@ } JndiTemplate jt = new JndiTemplate(); - jt.setEnvironment(JMSUtils.getInitialContextEnv(address)); + jt.setEnvironment(JMSOldConfigHolder.getInitialContextEnv(address)); ConnectionFactory cf = getConnectionFactoryFromJndi(address.getJndiConnectionFactoryName(), address .getConnectionUserName(), address.getConnectionPassword(), jt); @@ -195,4 +202,24 @@ public void setServerBehavior(ServerBehaviorPolicyType serverBehavior) { this.serverBehavior = serverBehavior; } + + public static Properties getInitialContextEnv(AddressType addrType) { + Properties env = new Properties(); + java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator(); + while (listIter.hasNext()) { + JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next(); + if (null != propertyPair.getValue()) { + env.setProperty(propertyPair.getName(), propertyPair.getValue()); + } + } + if (LOG.isLoggable(Level.FINE)) { + Enumeration props = env.propertyNames(); + while (props.hasMoreElements()) { + String name = (String)props.nextElement(); + String value = env.getProperty(name); + LOG.log(Level.FINE, "Context property: " + name + " | " + value); + } + } + return env; + } } Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Fri Oct 3 04:42:07 2008 @@ -63,9 +63,7 @@ public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws IOException { JMSOldConfigHolder old = new JMSOldConfigHolder(); JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true); - JMSConduit jmsConduit = new JMSConduit(target, jmsConf); - jmsConduit.afterPropertiesSet(); - return jmsConduit; + return new JMSConduit(target, jmsConf); } /** Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original) +++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Fri Oct 3 04:42:07 2008 @@ -28,8 +28,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.BytesMessage; @@ -47,45 +45,12 @@ public final class JMSUtils { - private static final Logger LOG = LogUtils.getL7dLogger(JMSUtils.class); + static final Logger LOG = LogUtils.getL7dLogger(JMSUtils.class); private JMSUtils() { } - public static Properties getInitialContextEnv(AddressType addrType) { - Properties env = new Properties(); - java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator(); - while (listIter.hasNext()) { - JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next(); - if (null != propertyPair.getValue()) { - env.setProperty(propertyPair.getName(), propertyPair.getValue()); - } - } - if (LOG.isLoggable(Level.FINE)) { - Enumeration props = env.propertyNames(); - while (props.hasMoreElements()) { - String name = (String)props.nextElement(); - String value = env.getProperty(name); - LOG.log(Level.FINE, "Context property: " + name + " | " + value); - } - } - return env; - } - - public static int getJMSDeliveryMode(JMSMessageHeadersType headers) { - int deliveryMode = Message.DEFAULT_DELIVERY_MODE; - if (headers != null && headers.isSetJMSDeliveryMode()) { - deliveryMode = headers.getJMSDeliveryMode(); - } - return deliveryMode; - } - - public static int getJMSPriority(JMSMessageHeadersType headers) { - return (headers != null && headers.isSetJMSPriority()) - ? headers.getJMSPriority() : Message.DEFAULT_PRIORITY; - } - public static long getTimeToLive(JMSMessageHeadersType headers) { long ttl = -1; if (headers != null && headers.isSetTimeToLive()) { @@ -264,7 +229,6 @@ if (headers == null) { headers = new JMSMessageHeadersType(); - // throw new RuntimeException("No JMS_CLIENT_REQUEST_HEADERS set in message"); } JMSUtils.setMessageProperties(headers, jmsMessage); @@ -274,7 +238,6 @@ .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS)); JMSUtils.addProtocolHeaders(jmsMessage, protHeaders); jmsMessage.setJMSCorrelationID(correlationId); - jmsMessage.setJMSPriority(JMSUtils.getJMSPriority(headers)); return jmsMessage; } Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java Fri Oct 3 04:42:07 2008 @@ -114,11 +114,7 @@ JMSConfiguration jmsConfig = new JMSOldConfigHolder() .createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true); - jmsConfig.setDeliveryMode(3); - jmsConfig.setPriority(1); - jmsConfig.setTimeToLive(1000); JMSConduit jmsConduit = new JMSConduit(target, jmsConfig); - jmsConduit.afterPropertiesSet(); if (send) { // setMessageObserver observer = new MessageObserver() { Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java Fri Oct 3 04:42:07 2008 @@ -24,6 +24,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.nio.charset.Charset; +import java.util.logging.Logger; import javax.jms.BytesMessage; import javax.jms.JMSException; @@ -31,9 +32,11 @@ import org.apache.cxf.BusFactory; import org.apache.cxf.bus.spring.SpringBusFactory; +import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.helpers.IOUtils; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.springframework.jms.core.JmsTemplate; @@ -41,6 +44,8 @@ public class JMSConduitTest extends AbstractJMSTester { + static final Logger LOG = LogUtils.getL7dLogger(JMSConduitTest.class); + @BeforeClass public static void createAndStartBroker() throws Exception { startBroker(new JMSBrokerSetup("tcp://localhost:61500")); @@ -60,7 +65,7 @@ .getReceiveTimeout()); bus.shutdown(false); BusFactory.setDefaultBus(null); - + conduit.close(); } @Test @@ -78,7 +83,7 @@ verifySentMessage(false, message); } - public void verifySentMessage(boolean send, Message message) { + private void verifySentMessage(boolean send, Message message) { OutputStream os = message.getContent(OutputStream.class); assertTrue("OutputStream should not be null", os != null); } @@ -87,16 +92,50 @@ public void testSendOut() throws Exception { setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl", "HelloWorldServiceLoop", "HelloWorldPortLoop"); + JMSConduit conduit = setupJMSConduit(true, false); + conduit.getJmsConfig().setReceiveTimeout(1000); + + try { + for (int c = 0; c < 100; c++) { + LOG.info("Sending message " + c); + Message message = new MessageImpl(); + sendoutMessage(conduit, message, false); + verifyReceivedMessage(message); + } + } finally { + conduit.close(); + } + } + + /** + * Sends several messages and verfies the results. The service sends the message to itself. So it should + * always receive the result + * + * @throws Exception + */ + @Test + public void testTimeoutOnReceive() throws Exception { + setupServiceInfo("http://cxf.apache.org/hello_world_jms", "/wsdl/jms_test.wsdl", + "HelloWorldServiceLoop", "HelloWorldPortLoop"); JMSConduit conduit = setupJMSConduit(true, false); + // TODO IF the system is extremely fast. The message could still get through + conduit.getJmsConfig().setReceiveTimeout(1); Message message = new MessageImpl(); - // set the isOneWay to false - sendoutMessage(conduit, message, false); - verifyReceivedMessage(message); + try { + sendoutMessage(conduit, message, false); + verifyReceivedMessage(message); + throw new RuntimeException("Expected a timeout here"); + } catch (RuntimeException e) { + LOG.info("Received exception. This is expected"); + } finally { + conduit.close(); + } } - public void verifyReceivedMessage(Message message) { + private void verifyReceivedMessage(Message message) { ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class); + Assert.assertNotNull("The received message input stream should not be null", bis); byte bytes[] = new byte[bis.available()]; try { bis.read(bytes); Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java Fri Oct 3 04:42:07 2008 @@ -23,6 +23,8 @@ import java.io.IOException; import java.io.InputStream; +import javax.jms.DeliveryMode; + import org.apache.cxf.BusFactory; import org.apache.cxf.bus.spring.SpringBusFactory; import org.apache.cxf.helpers.IOUtils; @@ -192,7 +194,7 @@ private void setupMessageHeader(Message outMessage) { JMSMessageHeadersType header = new JMSMessageHeadersType(); header.setJMSCorrelationID("Destination test"); - header.setJMSDeliveryMode(3); + header.setJMSDeliveryMode(DeliveryMode.PERSISTENT); header.setJMSPriority(1); header.setTimeToLive(1000); outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header); @@ -238,6 +240,8 @@ .getJMSCorrelationID(), inHeader.getJMSCorrelationID()); assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals", outHeader .getJMSPriority(), inHeader.getJMSPriority()); + assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be equals", outHeader + .getJMSDeliveryMode(), inHeader.getJMSDeliveryMode()); assertEquals("The inMessage and outMessage JMS Header's JMSType should be equals", outHeader .getJMSType(), inHeader.getJMSType()); } Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java?rev=701354&r1=701353&r2=701354&view=diff ============================================================================== --- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java (original) +++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java Fri Oct 3 04:42:07 2008 @@ -42,7 +42,7 @@ prop2.setValue("12"); addrType.getJMSNamingProperty().add(prop2); - Properties env = JMSUtils.getInitialContextEnv(addrType); + Properties env = JMSOldConfigHolder.getInitialContextEnv(addrType); assertTrue("Environment should not be empty", env.size() > 0); assertTrue("Environemnt should contain NamingBatchSize property", env.get(Context.BATCHSIZE) != null); }