Return-Path: Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: (qmail 81735 invoked from network); 4 Oct 2008 02:36:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 4 Oct 2008 02:36:20 -0000 Received: (qmail 66638 invoked by uid 500); 4 Oct 2008 02:36:19 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 66570 invoked by uid 500); 4 Oct 2008 02:36:19 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 66560 invoked by uid 99); 4 Oct 2008 02:36:19 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Oct 2008 19:36:19 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Oct 2008 02:35:14 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2E68F23888A0; Fri, 3 Oct 2008 19:35:19 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r701585 [1/2] - in /cxf/branches/2.1.x-fixes: ./ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms... Date: Sat, 04 Oct 2008 02:35:18 -0000 To: commits@cxf.apache.org From: ningjiang@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081004023519.2E68F23888A0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ningjiang Date: Fri Oct 3 19:35:17 2008 New Revision: 701585 URL: http://svn.apache.org/viewvc?rev=701585&view=rev Log: Merged revisions 700236 via svnmerge from https://svn.apache.org/repos/asf/cxf/trunk ........ r700236 | cschneider | 2008-09-30 04:11:34 +0800 (Tue, 30 Sep 2008) | 1 line CXF-1832 ........ Added: cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/wsdl/ - copied from r700236, cxf/trunk/rt/transports/jms/src/test/resources/wsdl/ cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/wsdl/jms_test.wsdl - copied unchanged from r700236, cxf/trunk/rt/transports/jms/src/test/resources/wsdl/jms_test.wsdl Removed: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSListenerThread.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSSessionFactory.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/PooledSession.java Modified: cxf/branches/2.1.x-fixes/ (props changed) cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSConduitBeanDefinitionParser.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/spring/JMSDestinationBeanDefinitionParser.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/wsdl11/JmsTransportPlugin.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSConduitTest.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSUtilsTest.java cxf/branches/2.1.x-fixes/rt/transports/jms/src/test/resources/jms_test_config.xml Propchange: cxf/branches/2.1.x-fixes/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java Fri Oct 3 19:35:17 2008 @@ -23,70 +23,58 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.MessageListener; import javax.jms.QueueSession; import javax.jms.Session; -import org.apache.cxf.Bus; import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.configuration.Configurable; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; import org.apache.cxf.message.MessageImpl; -import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.AbstractConduit; import org.apache.cxf.ws.addressing.EndpointReferenceType; +import org.springframework.beans.factory.InitializingBean; import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.JmsTemplate102; import org.springframework.jms.core.MessageCreator; import org.springframework.jms.core.SessionCallback; +import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.jms.support.JmsUtils; import org.springframework.jms.support.destination.DestinationResolver; /** * JMSConduit is instantiated by the JMSTransportfactory which is selected by a client if the transport * protocol starts with jms:// JMSConduit converts CXF Messages to JMS Messages and sends the request by using - * JMS topics or queues. If the Exchange is not oneway it then recevies the response and converts it to a CXF + * a JMS destination. If the Exchange is not oneway it then recevies the response and converts it to a CXF * Message. This is then provided in the Exchange and also sent to the incomingObserver */ -public class JMSConduit extends AbstractConduit implements Configurable, JMSExchangeSender { - - protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-conduit-base"; - +public class JMSConduit extends AbstractConduit implements JMSExchangeSender, MessageListener, + InitializingBean { static final Logger LOG = LogUtils.getL7dLogger(JMSConduit.class); - protected Bus bus; - protected EndpointInfo endpointInfo; - protected JMSConfiguration jmsConfig; - protected String beanNameSuffix; + private JMSConfiguration jmsConfig; + private Map correlationMap; - public JMSConduit(Bus b, EndpointInfo endpointInfo) { - this(b, endpointInfo, null); - } + private DefaultMessageListenerContainer jmsListener; + private JmsTemplate jmsTemplate; - public JMSConduit(Bus b, EndpointInfo endpointInfo, EndpointReferenceType target) { + public JMSConduit(EndpointReferenceType target, JMSConfiguration jmsConfig) { super(target); - this.bus = b; - this.endpointInfo = endpointInfo; - this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX; - } - - // prepare the message for send out , not actually send out the message - public void prepare(Message message) throws IOException { - message.setContent(OutputStream.class, new JMSOutputStream(this, message.getExchange(), - isTextPayload())); - // After this step flow will continue in JMSOutputStream.doClose() + this.jmsConfig = jmsConfig; + correlationMap = new ConcurrentHashMap(); } - - public Destination determineReplyToDestination(final JmsTemplate jmsTemplate, + + private Destination determineReplyToDestination(final JmsTemplate jmsTemplate2, final String replyToDestinationName, - final boolean pubSubDomain, boolean isOneWay) { - if (isOneWay) { - return null; - } - return (Destination)jmsTemplate.execute(new SessionCallback() { + final boolean pubSubDomain) { + return (Destination)jmsTemplate2.execute(new SessionCallback() { public Object doInJms(Session session) throws JMSException { if (replyToDestinationName == null) { if (session instanceof QueueSession) { @@ -97,13 +85,56 @@ return session.createTemporaryQueue(); } } - DestinationResolver resolv = jmsTemplate.getDestinationResolver(); + DestinationResolver resolv = jmsTemplate2.getDestinationResolver(); return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain); } }); } /** + * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc} + */ + public void afterPropertiesSet() { + jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102(); + jmsTemplate.setDefaultDestinationName(jmsConfig.getTargetDestination()); + jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory()); + jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout()); + jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive()); + jmsTemplate.setPriority(jmsConfig.getPriority()); + jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode()); + jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled()); + jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted()); + + jmsListener = new DefaultMessageListenerContainer(); + jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsListener.setAutoStartup(false); + jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory()); + jmsListener.setMessageSelector(jmsConfig.getMessageSelector()); + jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); + jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted()); + jmsListener.setTransactionManager(jmsConfig.getTransactionManager()); + + jmsListener.setMessageListener(this); + + if (jmsConfig.getDestinationResolver() != null) { + jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver()); + jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver()); + } + } + + /** + * Prepare the message for send out. The message will be sent after the caller has written the payload to + * the OutputStream of the message and calls the close method of the stream. In the JMS case the + * JMSOutputStream will then call back the sendExchange method of this class. {@inheritDoc} + */ + public void prepare(Message message) throws IOException { + boolean isTextPayload = JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType()); + JMSOutputStream out = new JMSOutputStream(this, message.getExchange(), isTextPayload); + message.setContent(OutputStream.class, out); + } + + /** * Send the JMS Request out and if not oneWay receive the response * * @param outMessage @@ -112,58 +143,93 @@ */ public void sendExchange(final Exchange exchange, final Object request) { LOG.log(Level.FINE, "JMSConduit send message"); - final JmsTemplate jmsTemplate = jmsConfig.getJmsTemplate(); - final Destination replyTo = determineReplyToDestination(jmsTemplate, - jmsConfig.getReplyDestination(), jmsConfig - .isPubSubDomain(), exchange.isOneWay()); final Message outMessage = exchange.getOutMessage(); if (outMessage == null) { throw new RuntimeException("Exchange to be sent has no outMessage"); } + + if (!exchange.isOneWay() && !jmsListener.isRunning()) { + Destination replyTo = determineReplyToDestination(jmsTemplate, + jmsConfig.getReplyDestination(), + jmsConfig.isPubSubDomain()); + jmsListener.setDestination(replyTo); + jmsListener.start(); + jmsListener.initialize(); + } JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage .get(JMSConstants.JMS_CLIENT_REQUEST_HEADERS); final String correlationId = (headers != null && headers.isSetJMSCorrelationID()) ? headers - .getJMSCorrelationID() : JMSUtils.generateUniqueSelector(); - String selector = "JMSCorrelationID = '" + correlationId + "'"; - - // TODO This is not thread safe - jmsTemplate.setPriority(JMSUtils.getJMSPriority(headers)); - jmsTemplate.send(jmsConfig.getTargetDestination(), new MessageCreator() { + .getJMSCorrelationID() : JMSUtils.generateCorrelationId(); + // String selector = "JMSCorrelationID = '" + correlationId + "'"; + + jmsTemplate.send(new MessageCreator() { public javax.jms.Message createMessage(Session session) throws JMSException { String messageType = jmsConfig.getMessageType(); final javax.jms.Message jmsMessage; jmsMessage = JMSUtils.buildJMSMessageFromCXFMessage(outMessage, request, messageType, - session, replyTo, correlationId); + session, jmsListener.getDestination(), + correlationId); LOG.log(Level.FINE, "client sending request: ", jmsMessage); return jmsMessage; } }); + /** + * If the message is not oneWay we will expect to receive a reply on the listener. + * To receive this reply we add the correlationId and an empty CXF Message to the + * correlationMap. The listener will fill to Message and notify this thread + */ if (!exchange.isOneWay()) { - javax.jms.Message jmsMessage = jmsTemplate.receiveSelected(replyTo, selector); - if (jmsMessage == null) { - throw new RuntimeException("JMS receive timed out"); - } Message inMessage = new MessageImpl(); - LOG.log(Level.FINE, "client received reply: ", jmsMessage); - JMSUtils - .populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); - byte[] response = JMSUtils.retrievePayload(jmsMessage); - LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]"); - inMessage.setContent(InputStream.class, new ByteArrayInputStream(response)); - exchange.setInMessage(inMessage); + synchronized (inMessage) { + correlationMap.put(correlationId, inMessage); + try { + inMessage.wait(jmsTemplate.getReceiveTimeout()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + correlationMap.remove(correlationId); + } if (incomingObserver != null) { incomingObserver.onMessage(inMessage); } + exchange.setInMessage(inMessage); } } - private boolean isTextPayload() { - return JMSConstants.TEXT_MESSAGE_TYPE.equals(jmsConfig.getMessageType()); + /** + * When a message is received on the reply destination the correlation map is searched + * for the correlationId. If it is found the message is converted to a CXF message and the + * thread sending the request is notified + * + * {@inheritDoc} + */ + public void onMessage(javax.jms.Message jmsMessage) { + String correlationId; + try { + correlationId = jmsMessage.getJMSCorrelationID(); + } catch (JMSException e) { + throw JmsUtils.convertJmsAccessException(e); + } + Message inMessage = correlationMap.get(correlationId); + if (inMessage == null) { + LOG.log(Level.WARNING, "Could not correlate message with correlationId " + correlationId); + } + LOG.log(Level.FINE, "client received reply: ", jmsMessage); + JMSUtils.populateIncomingContext(jmsMessage, inMessage, JMSConstants.JMS_CLIENT_RESPONSE_HEADERS); + byte[] response = JMSUtils.retrievePayload(jmsMessage); + LOG.log(Level.FINE, "The Response Message payload is : [" + response + "]"); + inMessage.setContent(InputStream.class, new ByteArrayInputStream(response)); + + synchronized (inMessage) { + inMessage.notifyAll(); + } + } public void close() { + jmsListener.shutdown(); LOG.log(Level.FINE, "JMSConduit closed "); } @@ -171,10 +237,6 @@ return LOG; } - public String getBeanName() { - return endpointInfo.getName().toString() + ".jms-conduit"; - } - public JMSConfiguration getJmsConfig() { return jmsConfig; } @@ -183,4 +245,14 @@ this.jmsConfig = jmsConfig; } + @Override + protected void finalize() throws Throwable { + if (jmsListener.isRunning()) { + jmsListener.shutdown(); + } + super.finalize(); + } + + + } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java Fri Oct 3 19:35:17 2008 @@ -19,52 +19,139 @@ package org.apache.cxf.transport.jms; import javax.jms.ConnectionFactory; +import javax.jms.Message; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Required; import org.springframework.jms.core.JmsTemplate; -import org.springframework.jms.listener.AbstractJmsListeningContainer; +import org.springframework.jms.support.destination.DestinationResolver; +import org.springframework.transaction.PlatformTransactionManager; public class JMSConfiguration implements InitializingBean { private ConnectionFactory connectionFactory; - private JmsTemplate jmsTemplate; - private AbstractJmsListeningContainer jmsListener; + private DestinationResolver destinationResolver; + private PlatformTransactionManager transactionManager; + private boolean useJms11 = true; + private boolean useJndi; + private boolean messageIdEnabled = true; + private boolean messageTimestampEnabled = true; + private boolean pubSubNoLocal; + private long receiveTimeout = JmsTemplate.RECEIVE_TIMEOUT_INDEFINITE_WAIT; + private boolean explicitQosEnabled; + private int deliveryMode = Message.DEFAULT_DELIVERY_MODE; + private int priority = Message.DEFAULT_PRIORITY; + private long timeToLive = Message.DEFAULT_TIME_TO_LIVE; + private boolean sessionTransacted; + + private volatile String messageSelector; + private boolean subscriptionDurable; + private String durableSubscriptionName; + private String targetDestination; private String replyDestination; - private String messageType; + private String messageType = JMSConstants.TEXT_MESSAGE_TYPE; private boolean pubSubDomain; - public JMSConfiguration() { - targetDestination = null; - replyDestination = null; - messageType = JMSConstants.TEXT_MESSAGE_TYPE; - pubSubDomain = false; + public boolean isUseJndi() { + return useJndi; } - public void afterPropertiesSet() throws Exception { - /* - * if (connectionFactory == null) { throw new RuntimeException("Required property connectionfactory - * was not set"); } jmsTemplate.setConnectionFactory(connectionFactory); - * jmsListener.setConnectionFactory(connectionFactory); - */ + public void setUseJndi(boolean useJndi) { + this.useJndi = useJndi; } - public JmsTemplate getJmsTemplate() { - return jmsTemplate; + public boolean isMessageIdEnabled() { + return messageIdEnabled; } - @Required - public void setJmsTemplate(JmsTemplate jmsTemplate) { - this.jmsTemplate = jmsTemplate; + public void setMessageIdEnabled(boolean messageIdEnabled) { + this.messageIdEnabled = messageIdEnabled; } - public AbstractJmsListeningContainer getJmsListener() { - return jmsListener; + public boolean isMessageTimestampEnabled() { + return messageTimestampEnabled; } - @Required - public void setJmsListener(AbstractJmsListeningContainer jmsListener) { - this.jmsListener = jmsListener; + public void setMessageTimestampEnabled(boolean messageTimestampEnabled) { + this.messageTimestampEnabled = messageTimestampEnabled; + } + + public boolean isPubSubNoLocal() { + return pubSubNoLocal; + } + + public void setPubSubNoLocal(boolean pubSubNoLocal) { + this.pubSubNoLocal = pubSubNoLocal; + } + + public long getReceiveTimeout() { + return receiveTimeout; + } + + public void setReceiveTimeout(long receiveTimeout) { + this.receiveTimeout = receiveTimeout; + } + + public boolean isExplicitQosEnabled() { + return explicitQosEnabled; + } + + public void setExplicitQosEnabled(boolean explicitQosEnabled) { + this.explicitQosEnabled = explicitQosEnabled; + } + + public int getDeliveryMode() { + return deliveryMode; + } + + public void setDeliveryMode(int deliveryMode) { + this.deliveryMode = deliveryMode; + } + + public int getPriority() { + return priority; + } + + public void setPriority(int priority) { + this.priority = priority; + } + + public long getTimeToLive() { + return timeToLive; + } + + public void setTimeToLive(long timeToLive) { + this.timeToLive = timeToLive; + } + + public String getMessageSelector() { + return messageSelector; + } + + public void setMessageSelector(String messageSelector) { + this.messageSelector = messageSelector; + } + + public boolean isSubscriptionDurable() { + return subscriptionDurable; + } + + public void setSubscriptionDurable(boolean subscriptionDurable) { + this.subscriptionDurable = subscriptionDurable; + } + + public String getDurableSubscriptionName() { + return durableSubscriptionName; + } + + public void setDurableSubscriptionName(String durableSubscriptionName) { + this.durableSubscriptionName = durableSubscriptionName; + } + + public void afterPropertiesSet() throws Exception { + if (connectionFactory == null) { + throw new RuntimeException("Required property connectionfactory was not set"); + } } public ConnectionFactory getConnectionFactory() { @@ -108,4 +195,36 @@ this.pubSubDomain = pubSubDomain; } + public boolean isUseJms11() { + return useJms11; + } + + public void setUseJms11(boolean useJms11) { + this.useJms11 = useJms11; + } + + public DestinationResolver getDestinationResolver() { + return destinationResolver; + } + + public void setDestinationResolver(DestinationResolver destinationResolver) { + this.destinationResolver = destinationResolver; + } + + public boolean isSessionTransacted() { + return sessionTransacted; + } + + public void setSessionTransacted(boolean sessionTransacted) { + this.sessionTransacted = sessionTransacted; + } + + public PlatformTransactionManager getTransactionManager() { + return transactionManager; + } + + public void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java Fri Oct 3 19:35:17 2008 @@ -29,27 +29,19 @@ import java.util.Map; import java.util.SimpleTimeZone; import java.util.TimeZone; -import java.util.concurrent.Executor; import java.util.logging.Level; import java.util.logging.Logger; import javax.jms.BytesMessage; -import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageListener; -import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import javax.naming.Context; -import javax.naming.NamingException; -import javax.xml.namespace.QName; import org.apache.cxf.Bus; import org.apache.cxf.BusFactory; import org.apache.cxf.common.logging.LogUtils; -import org.apache.cxf.configuration.Configurable; -import org.apache.cxf.configuration.Configurer; import org.apache.cxf.helpers.CastUtils; import org.apache.cxf.message.Exchange; import org.apache.cxf.message.Message; @@ -58,56 +50,31 @@ import org.apache.cxf.transport.AbstractConduit; import org.apache.cxf.transport.AbstractMultiplexDestination; import org.apache.cxf.transport.Conduit; -import org.apache.cxf.transport.ConduitInitiator; import org.apache.cxf.transport.MessageObserver; -import org.apache.cxf.workqueue.SynchronousExecutor; -import org.apache.cxf.workqueue.WorkQueueManager; import org.apache.cxf.ws.addressing.EndpointReferenceType; import org.apache.cxf.wsdl.EndpointReferenceUtils; +import org.springframework.jms.core.JmsTemplate; +import org.springframework.jms.core.JmsTemplate102; +import org.springframework.jms.core.MessageCreator; +import org.springframework.jms.core.SessionCallback; +import org.springframework.jms.listener.DefaultMessageListenerContainer; +import org.springframework.jms.support.JmsUtils; +import org.springframework.jms.support.destination.DestinationResolver; -public class JMSDestination extends AbstractMultiplexDestination implements Configurable, MessageListener, +public class JMSDestination extends AbstractMultiplexDestination implements MessageListener, JMSExchangeSender { - protected static final String BASE_BEAN_NAME_SUFFIX = ".jms-destination-base"; - private static final Logger LOG = LogUtils.getL7dLogger(JMSDestination.class); - protected ServerConfig serverConfig; - protected ServerBehaviorPolicyType runtimePolicy; - protected AddressType address; - protected SessionPoolType sessionPool; - protected Destination targetDestination; - protected Destination replyToDestination; - protected JMSSessionFactory sessionFactory; - protected Bus bus; - protected EndpointInfo endpointInfo; - protected String beanNameSuffix; - - final ConduitInitiator conduitInitiator; - Session listenerSession; - JMSListenerThread listenerThread; + private JMSConfiguration jmsConfig; + private Bus bus; + private DefaultMessageListenerContainer jmsListener; + private JmsTemplate jmsTemplate; - public JMSDestination(Bus b, ConduitInitiator ci, EndpointInfo info) throws IOException { + public JMSDestination(Bus b, EndpointInfo info, JMSConfiguration jmsConfig) { super(b, getTargetReference(info, b), info); - this.bus = b; - this.endpointInfo = info; - this.beanNameSuffix = BASE_BEAN_NAME_SUFFIX; - conduitInitiator = ci; - - initConfig(); - } - - private void initConfig() { - this.runtimePolicy = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(), - ServerBehaviorPolicyType.class); - this.serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class); - this.address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class); - this.sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class); - Configurer configurer = bus.getExtension(Configurer.class); - if (null != configurer) { - configurer.configureBean(this); - } + this.jmsConfig = jmsConfig; } /** @@ -119,64 +86,46 @@ return new BackChannelConduit(this, anon, inMessage); } - private Executor getExecutor(WorkQueueManager wqm, QName name) { - // Fallback if no Workqueuemanager - Executor executor = SynchronousExecutor.getInstance(); - if (wqm != null) { - if (name != null) { - executor = wqm.getNamedWorkQueue("{" + name.getNamespaceURI() + "}" + name.getLocalPart()); - } - if (executor == null) { - executor = wqm.getNamedWorkQueue("jms"); - } - if (executor == null) { - executor = wqm.getAutomaticWorkQueue(); - } - } - return executor; - } - /** - * Initialize Sessionfactory, Initialize and start ListenerThread {@inheritDoc} + * Initialize jmsTemplate and jmsListener from jms configuration data in jmsConfig {@inheritDoc} */ public void activate() { getLogger().log(Level.INFO, "JMSDestination activate().... "); - if (this.address == null || this.address.getJndiConnectionFactoryName() == null) { - throw new RuntimeException("Insufficient configuration for Destination. " - + "Did you configure a and set the jndiConnectionFactoryName ?"); + jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102(); + jmsTemplate.setDefaultDestinationName(jmsConfig.getReplyDestination()); + jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory()); + jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout()); + jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive()); + jmsTemplate.setPriority(jmsConfig.getPriority()); + jmsTemplate.setDeliveryMode(jmsConfig.getDeliveryMode()); + jmsTemplate.setExplicitQosEnabled(true); + jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted()); + + jmsListener = new DefaultMessageListenerContainer(); + jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain()); + jmsListener.setAutoStartup(true); + jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory()); + jmsListener.setMessageSelector(jmsConfig.getMessageSelector()); + jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName()); + jmsListener.setDestinationName(jmsConfig.getTargetDestination()); + jmsListener.setMessageListener(this); + jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted()); + jmsListener.setTransactionManager(jmsConfig.getTransactionManager()); + + if (jmsConfig.getDestinationResolver() != null) { + jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver()); + jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver()); } - try { - getLogger().log(Level.FINE, "establishing JMS connection"); - sessionFactory = JMSSessionFactory.connect(getJMSAddress(), getSessionPool(), serverConfig); - Connection connection = sessionFactory.getConnection(); - Context context = sessionFactory.getInitialContext(); - this.targetDestination = JMSUtils.resolveRequestDestination(context, connection, address); - this.replyToDestination = JMSUtils.resolveRequestDestination(context, connection, address); - WorkQueueManager wqm = bus.getExtension(WorkQueueManager.class); - QName name = null; - if (endpointInfo != null) { - name = endpointInfo.getName(); - } - Executor executor = getExecutor(wqm, name); - String messageSelector = runtimePolicy.getMessageSelector(); - String durableName = runtimePolicy.getDurableSubscriberName(); - listenerThread = new JMSListenerThread(executor, this); - listenerThread.start(connection, targetDestination, messageSelector, durableName); - } catch (JMSException ex) { - getLogger().log(Level.SEVERE, "JMS connect failed with JMSException : ", ex); - } catch (NamingException nex) { - getLogger().log(Level.SEVERE, "JMS connect failed with NamingException : ", nex); + if (!jmsListener.isRunning()) { + jmsListener.initialize(); } } public void deactivate() { - if (listenerThread != null) { - listenerThread.close(); - } - sessionFactory.shutdown(); + jmsListener.shutdown(); } public void shutdown() { @@ -184,24 +133,31 @@ this.deactivate(); } - public Queue getReplyToDestination(Message inMessage) throws JMSException, NamingException { + private Destination resolveDestinationName(final String name) { + return (Destination)jmsTemplate.execute(new SessionCallback() { + public Object doInJms(Session session) throws JMSException { + DestinationResolver resolv = jmsTemplate.getDestinationResolver(); + return resolv.resolveDestinationName(session, name, jmsConfig.isPubSubDomain()); + } + }); + } + + public Destination getReplyToDestination(Message inMessage) throws JMSException { javax.jms.Message message = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); // If WS-Addressing had set the replyTo header. - String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO); + final String replyToName = (String)inMessage.get(JMSConstants.JMS_REBASED_REPLY_TO); if (replyToName != null) { - Context context = sessionFactory.getInitialContext(); - return (Queue)context.lookup(replyToName); + return resolveDestinationName(replyToName); } else if (message.getJMSReplyTo() != null) { - return (Queue)message.getJMSReplyTo(); + return message.getJMSReplyTo(); } else { - return (Queue)replyToDestination; + throw new RuntimeException("No replyTo destination set on request message or cxf message"); } } /** * Decides what correlationId to use for the reply by looking at the request headers. If the request has a - * correlationId set this is taken. Else if the useMessageIDAsCorrelationID is true then the messageId - * from the request message is used as correlation Id + * correlationId set this is taken. Else the messageId from the request message is used as correlation Id * * @param request * @return @@ -209,10 +165,7 @@ */ public String determineCorrelationID(javax.jms.Message request) throws JMSException { String correlationID = request.getJMSCorrelationID(); - if ("".equals(correlationID)) { - correlationID = null; - } - if (correlationID == null && getRuntimePolicy().isUseMessageIDAsCorrelationID()) { + if (correlationID == null || "".equals(correlationID)) { correlationID = request.getJMSMessageID(); } return correlationID; @@ -250,10 +203,10 @@ } } - public void sendExchange(Exchange exchange, Object replyObj) { + public void sendExchange(Exchange exchange, final Object replyObj) { Message inMessage = exchange.getInMessage(); - Message outMessage = exchange.getOutMessage(); - if (!JMSUtils.isDestinationStyleQueue(address)) { + final Message outMessage = exchange.getOutMessage(); + if (jmsConfig.isPubSubDomain()) { // we will never receive a non-oneway invocation in pub-sub // domain from CXF client - however a mis-behaving pure JMS // client could conceivably make suce an invocation, in which @@ -262,12 +215,11 @@ "with 'topic' destinationStyle"); return; } - PooledSession replySession = null; try { // setup the reply message - replySession = sessionFactory.get(); - javax.jms.Message request = (javax.jms.Message)inMessage.get(JMSConstants.JMS_REQUEST_MESSAGE); - String msgType = null; + final javax.jms.Message request = (javax.jms.Message)inMessage + .get(JMSConstants.JMS_REQUEST_MESSAGE); + final String msgType; if (request instanceof TextMessage) { msgType = JMSConstants.TEXT_MESSAGE_TYPE; } else if (request instanceof BytesMessage) { @@ -275,57 +227,50 @@ } else { msgType = JMSConstants.BINARY_MESSAGE_TYPE; } - javax.jms.Message reply = JMSUtils - .createAndSetPayload(replyObj, replySession.session(), msgType); - reply.setJMSCorrelationID(determineCorrelationID(request)); - JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage - .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); - JMSUtils.setMessageProperties(headers, reply); - // ensure that the contentType is set to the out jms message header - JMSUtils.addContentTypeToProtocolHeader(outMessage); - Map> protHeaders = CastUtils.cast((Map)outMessage - .get(Message.PROTOCOL_HEADERS)); - JMSUtils.addProtocolHeaders(reply, protHeaders); Destination replyTo = getReplyToDestination(inMessage); - + final JMSMessageHeadersType headers = (JMSMessageHeadersType)outMessage + .get(JMSConstants.JMS_SERVER_RESPONSE_HEADERS); JMSMessageHeadersType inHeaders = (JMSMessageHeadersType)inMessage .get(JMSConstants.JMS_SERVER_REQUEST_HEADERS); - long timeToLive = 0; if (request.getJMSExpiration() > 0) { TimeZone tz = new SimpleTimeZone(0, "GMT"); Calendar cal = new GregorianCalendar(tz); - timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); - } - - if (timeToLive < 0) { - getLogger().log(Level.INFO, "Message time to live is already expired skipping response."); - return; + long timeToLive = request.getJMSExpiration() - cal.getTimeInMillis(); + if (timeToLive < 0) { + getLogger() + .log(Level.INFO, "Message time to live is already expired skipping response."); + return; + } } int deliveryMode = JMSUtils.getJMSDeliveryMode(inHeaders); int priority = JMSUtils.getJMSPriority(inHeaders); - long ttl = JMSUtils.getTimeToLive(headers); - if (ttl <= 0) { - ttl = getServerConfig().getMessageTimeToLive(); - } - if (ttl <= 0) { - ttl = timeToLive; - } + + jmsTemplate.setDeliveryMode(deliveryMode); + jmsTemplate.setPriority(priority); getLogger().log(Level.FINE, "send out the message!"); - replySession.producer().send(replyTo, reply, deliveryMode, priority, ttl); + jmsTemplate.send(replyTo, new MessageCreator() { + public javax.jms.Message createMessage(Session session) throws JMSException { + javax.jms.Message reply = JMSUtils.createAndSetPayload(replyObj, session, msgType); + + reply.setJMSCorrelationID(determineCorrelationID(request)); + + JMSUtils.setMessageProperties(headers, reply); + // ensure that the contentType is set to the out jms message header + JMSUtils.addContentTypeToProtocolHeader(outMessage); + Map> protHeaders = CastUtils.cast((Map)outMessage + .get(Message.PROTOCOL_HEADERS)); + JMSUtils.addProtocolHeaders(reply, protHeaders); + + LOG.log(Level.FINE, "server sending reply: ", reply); + return reply; + } + }); - getLogger().log(Level.FINE, "just server sending reply: ", reply); - // Check the reply time limit Stream close will call for this } catch (JMSException ex) { - getLogger().log(Level.WARNING, "Failed in post dispatch ...", ex); - throw new RuntimeException(ex.getMessage()); - } catch (NamingException nex) { - getLogger().log(Level.WARNING, "Failed in post dispatch ...", nex); - throw new RuntimeException(nex.getMessage()); - } finally { - sessionFactory.recycle(replySession); + JmsUtils.convertJmsAccessException(ex); } } @@ -333,43 +278,9 @@ return LOG; } - public String getBeanName() { - return endpointInfo.getName().toString() + ".jms-destination"; - } - - public AddressType getJMSAddress() { - return address; - } - - public void setJMSAddress(AddressType a) { - this.address = a; - } - - public ServerBehaviorPolicyType getRuntimePolicy() { - return runtimePolicy; - } - - public void setRuntimePolicy(ServerBehaviorPolicyType runtimePolicy) { - this.runtimePolicy = runtimePolicy; - } - - public ServerConfig getServerConfig() { - return serverConfig; - } - - public void setServerConfig(ServerConfig serverConfig) { - this.serverConfig = serverConfig; - } - - public SessionPoolType getSessionPool() { - return sessionPool; - } - - public void setSessionPool(SessionPoolType sessionPool) { - this.sessionPool = sessionPool; - } - - // this should deal with the cxf message + /** + * Conduit for sending the reply back to the client + */ protected class BackChannelConduit extends AbstractConduit { protected Message inMessage; @@ -419,4 +330,12 @@ } } + public JMSConfiguration getJmsConfig() { + return jmsConfig; + } + + public void setJmsConfig(JMSConfiguration jmsConfig) { + this.jmsConfig = jmsConfig; + } + } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSExchangeSender.java Fri Oct 3 19:35:17 2008 @@ -20,6 +20,19 @@ import org.apache.cxf.message.Exchange; +/** + * Callback interface for JMSOutputStream + */ interface JMSExchangeSender { + + /** + * Is called from JMSOutputStream.doClose() when the stream is fully + * written. Sends the outMessage of the given exchange with the given payload + * from the JMSOutputStream. If the exchange is not oneway a reply should be recieved + * and set as inMessage + * + * @param exchange + * @param payload + */ void sendExchange(Exchange exchange, Object payload); } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java Fri Oct 3 19:35:17 2008 @@ -19,7 +19,6 @@ package org.apache.cxf.transport.jms; import javax.jms.ConnectionFactory; -import javax.jms.Message; import javax.naming.NamingException; import org.apache.cxf.Bus; @@ -27,15 +26,18 @@ import org.apache.cxf.service.model.EndpointInfo; import org.springframework.jms.connection.SingleConnectionFactory; import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; -import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.support.destination.JndiDestinationResolver; import org.springframework.jndi.JndiTemplate; public class JMSOldConfigHolder { - protected ClientConfig clientConfig; - protected ClientBehaviorPolicyType runtimePolicy; - protected AddressType address; - protected SessionPoolType sessionPool; + private ClientConfig clientConfig; + private ClientBehaviorPolicyType runtimePolicy; + + private AddressType address; + private SessionPoolType sessionPool; + private JMSConfiguration jmsConfig; + private ServerConfig serverConfig; + private ServerBehaviorPolicyType serverBehavior; private ConnectionFactory getConnectionFactoryFromJndi(String connectionFactoryName, String userName, String password, JndiTemplate jt) { @@ -43,7 +45,6 @@ return null; } try { - ConnectionFactory connectionFactory = (ConnectionFactory)jt.lookup(connectionFactoryName); UserCredentialsConnectionFactoryAdapter uccf = new UserCredentialsConnectionFactoryAdapter(); uccf.setUsername(userName); @@ -58,20 +59,26 @@ } } - public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo) { - JMSConfiguration jmsConf = new JMSConfiguration(); + public JMSConfiguration createJMSConfigurationFromEndpointInfo(Bus bus, EndpointInfo endpointInfo, + boolean isConduit) { + jmsConfig = new JMSConfiguration(); - // Retrieve configuration information that was extracted from the wsdl + // Retrieve configuration information that was extracted from the WSDL address = endpointInfo.getTraversedExtensor(new AddressType(), AddressType.class); clientConfig = endpointInfo.getTraversedExtensor(new ClientConfig(), ClientConfig.class); runtimePolicy = endpointInfo.getTraversedExtensor(new ClientBehaviorPolicyType(), ClientBehaviorPolicyType.class); + serverConfig = endpointInfo.getTraversedExtensor(new ServerConfig(), ServerConfig.class); + sessionPool = endpointInfo.getTraversedExtensor(new SessionPoolType(), SessionPoolType.class); + serverBehavior = endpointInfo.getTraversedExtensor(new ServerBehaviorPolicyType(), + ServerBehaviorPolicyType.class); + String name = endpointInfo.getName().toString() + (isConduit ? ".jms-conduit" : ".jms-destination"); // Try to retrieve configuration information from the spring - // config. Search for a tag with name=endpoint name + ".jms-conduit" + // config. Search for a conduit or destination with name=endpoint name + ".jms-conduit" + // or ".jms-destination" Configurer configurer = bus.getExtension(Configurer.class); if (null != configurer) { - String name = endpointInfo.getName().toString() + ".jms-conduit"; configurer.configureBean(name, this); } @@ -80,38 +87,57 @@ ConnectionFactory cf = getConnectionFactoryFromJndi(address.getJndiConnectionFactoryName(), address .getConnectionUserName(), address.getConnectionPassword(), jt); - // TODO Use JmsTemplate102 in case JMS 1.1 is not available - JmsTemplate jmsTemplate = new JmsTemplate(); - jmsTemplate.setConnectionFactory(cf); boolean pubSubDomain = false; if (address.isSetDestinationStyle()) { pubSubDomain = DestinationStyleType.TOPIC == address.getDestinationStyle(); } - jmsTemplate.setPubSubDomain(pubSubDomain); - jmsTemplate.setReceiveTimeout(clientConfig.getClientReceiveTimeout()); - jmsTemplate.setTimeToLive(clientConfig.getMessageTimeToLive()); - jmsTemplate.setPriority(Message.DEFAULT_PRIORITY); - jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE); - jmsTemplate.setExplicitQosEnabled(true); + jmsConfig.setConnectionFactory(cf); + jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName()); + jmsConfig.setExplicitQosEnabled(true); + // jmsConfig.setMessageIdEnabled(messageIdEnabled); + jmsConfig.setMessageSelector(serverBehavior.getMessageSelector()); + // jmsConfig.setMessageTimestampEnabled(messageTimestampEnabled); + if (runtimePolicy.isSetMessageType()) { + jmsConfig.setMessageType(runtimePolicy.getMessageType().value()); + } + // jmsConfig.setOneWay(oneWay); + // jmsConfig.setPriority(priority); + jmsConfig.setPubSubDomain(pubSubDomain); + jmsConfig.setPubSubNoLocal(true); + jmsConfig.setReceiveTimeout(clientConfig.getClientReceiveTimeout()); + jmsConfig.setSubscriptionDurable(serverBehavior.isSetDurableSubscriberName()); + long timeToLive = isConduit ? clientConfig.getMessageTimeToLive() : serverConfig + .getMessageTimeToLive(); + jmsConfig.setTimeToLive(timeToLive); + jmsConfig.setUseJms11(true); + boolean useJndi = address.isSetJndiDestinationName(); + jmsConfig.setUseJndi(useJndi); + jmsConfig.setSessionTransacted(serverBehavior.isSetTransactional()); - if (address.isSetJndiDestinationName()) { + if (useJndi) { // Setup Destination jndi destination resolver final JndiDestinationResolver jndiDestinationResolver = new JndiDestinationResolver(); jndiDestinationResolver.setJndiTemplate(jt); - jmsTemplate.setDestinationResolver(jndiDestinationResolver); - jmsConf.setTargetDestination(address.getJndiDestinationName()); - jmsConf.setReplyDestination(address.getJndiReplyDestinationName()); + jmsConfig.setDestinationResolver(jndiDestinationResolver); + jmsConfig.setTargetDestination(address.getJndiDestinationName()); + jmsConfig.setReplyDestination(address.getJndiReplyDestinationName()); } else { // Use the default dynamic destination resolver - jmsConf.setTargetDestination(address.getJmsDestinationName()); - jmsConf.setReplyDestination(address.getJmsReplyDestinationName()); + jmsConfig.setTargetDestination(address.getJmsDestinationName()); + jmsConfig.setReplyDestination(address.getJmsReplyDestinationName()); } - if (runtimePolicy.isSetMessageType()) { - jmsConf.setMessageType(runtimePolicy.getMessageType().value()); + + jmsConfig.setConnectionFactory(cf); + + if (jmsConfig.getTargetDestination() == null || jmsConfig.getConnectionFactory() == null) { + throw new RuntimeException("Insufficient configuration for " + + (isConduit ? "Conduit" : "Destination") + ". " + + "Did you configure a and set the jndiConnectionFactoryName ?"); } - jmsConf.setJmsTemplate(jmsTemplate); - return jmsConf; + return jmsConfig; } public ClientConfig getClientConfig() { @@ -145,4 +171,28 @@ public void setSessionPool(SessionPoolType sessionPool) { this.sessionPool = sessionPool; } + + public JMSConfiguration getJmsConfig() { + return jmsConfig; + } + + public void setJmsConfig(JMSConfiguration jmsConfig) { + this.jmsConfig = jmsConfig; + } + + public ServerConfig getServerConfig() { + return serverConfig; + } + + public void setServerConfig(ServerConfig serverConfig) { + this.serverConfig = serverConfig; + } + + public ServerBehaviorPolicyType getServerBehavior() { + return serverBehavior; + } + + public void setServerBehavior(ServerBehaviorPolicyType serverBehavior) { + this.serverBehavior = serverBehavior; + } } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOutputStream.java Fri Oct 3 19:35:17 2008 @@ -25,7 +25,7 @@ import org.apache.cxf.message.Exchange; /** - * + * Outputstream that sends a message when the exchange is closed */ class JMSOutputStream extends CachedOutputStream { private final JMSExchangeSender sender; @@ -44,6 +44,9 @@ } @Override + /** + * Close the stream and send the message out + */ protected void doClose() throws IOException { Object payload = retrieveRequestFromStream(isTextPayload); this.sender.sendExchange(exchange, payload); Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSTransportFactory.java Fri Oct 3 19:35:17 2008 @@ -26,7 +26,6 @@ import javax.annotation.Resource; import org.apache.cxf.Bus; -import org.apache.cxf.configuration.Configurer; import org.apache.cxf.service.model.EndpointInfo; import org.apache.cxf.transport.AbstractTransportFactory; import org.apache.cxf.transport.Conduit; @@ -44,7 +43,6 @@ } private Bus bus; - private JMSConfiguration jmsConfig; @Resource(name = "cxf") public void setBus(Bus b) { @@ -55,37 +53,32 @@ return bus; } - public Conduit getConduit(EndpointInfo targetInfo) throws IOException { - return getConduit(targetInfo, targetInfo.getTarget()); + public Conduit getConduit(EndpointInfo endpointInfo) throws IOException { + return getConduit(endpointInfo, endpointInfo.getTarget()); } + /** + * {@inheritDoc} + */ public Conduit getConduit(EndpointInfo endpointInfo, EndpointReferenceType target) throws IOException { - JMSConduit conduit = target == null - ? new JMSConduit(bus, endpointInfo) : new JMSConduit(bus, endpointInfo, target); JMSOldConfigHolder old = new JMSOldConfigHolder(); - JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo); - conduit.setJmsConfig(jmsConf); - return conduit; + JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, true); + JMSConduit jmsConduit = new JMSConduit(target, jmsConf); + jmsConduit.afterPropertiesSet(); + return jmsConduit; } + /** + * {@inheritDoc} + */ public Destination getDestination(EndpointInfo endpointInfo) throws IOException { - JMSDestination destination = new JMSDestination(bus, this, endpointInfo); - Configurer configurer = bus.getExtension(Configurer.class); - if (null != configurer) { - configurer.configureBean(destination); - } - return destination; + JMSOldConfigHolder old = new JMSOldConfigHolder(); + JMSConfiguration jmsConf = old.createJMSConfigurationFromEndpointInfo(bus, endpointInfo, false); + return new JMSDestination(bus, endpointInfo, jmsConf); } public Set getUriPrefixes() { return URI_PREFIXES; } - public JMSConfiguration getJmsConfig() { - return jmsConfig; - } - - public void setJmsConfig(JMSConfiguration jmsConfig) { - this.jmsConfig = jmsConfig; - } } Modified: cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java URL: http://svn.apache.org/viewvc/cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=701585&r1=701584&r2=701585&view=diff ============================================================================== --- cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original) +++ cxf/branches/2.1.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Fri Oct 3 19:35:17 2008 @@ -33,19 +33,11 @@ import java.util.logging.Logger; import javax.jms.BytesMessage; -import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; -import javax.jms.MessageProducer; import javax.jms.ObjectMessage; -import javax.jms.Queue; -import javax.jms.QueueSender; import javax.jms.Session; -import javax.jms.Topic; -import javax.jms.TopicPublisher; -import javax.naming.Context; -import javax.naming.NamingException; import org.apache.cxf.common.logging.LogUtils; import org.apache.cxf.helpers.CastUtils; @@ -63,37 +55,26 @@ public static Properties getInitialContextEnv(AddressType addrType) { Properties env = new Properties(); - populateContextEnvironment(addrType, env); - + java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator(); + while (listIter.hasNext()) { + JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next(); + if (null != propertyPair.getValue()) { + env.setProperty(propertyPair.getName(), propertyPair.getValue()); + } + } if (LOG.isLoggable(Level.FINE)) { Enumeration props = env.propertyNames(); - while (props.hasMoreElements()) { String name = (String)props.nextElement(); String value = env.getProperty(name); LOG.log(Level.FINE, "Context property: " + name + " | " + value); } } - return env; } - protected static void populateContextEnvironment(AddressType addrType, Properties env) { - - java.util.ListIterator listIter = addrType.getJMSNamingProperty().listIterator(); - - while (listIter.hasNext()) { - JMSNamingPropertyType propertyPair = (JMSNamingPropertyType)listIter.next(); - - if (null != propertyPair.getValue()) { - env.setProperty(propertyPair.getName(), propertyPair.getValue()); - } - } - } - public static int getJMSDeliveryMode(JMSMessageHeadersType headers) { int deliveryMode = Message.DEFAULT_DELIVERY_MODE; - if (headers != null && headers.isSetJMSDeliveryMode()) { deliveryMode = headers.getJMSDeliveryMode(); } @@ -101,11 +82,8 @@ } public static int getJMSPriority(JMSMessageHeadersType headers) { - int priority = Message.DEFAULT_PRIORITY; - if (headers != null && headers.isSetJMSPriority()) { - priority = headers.getJMSPriority(); - } - return priority; + return (headers != null && headers.isSetJMSPriority()) + ? headers.getJMSPriority() : Message.DEFAULT_PRIORITY; } public static long getTimeToLive(JMSMessageHeadersType headers) { @@ -118,7 +96,6 @@ public static void setMessageProperties(JMSMessageHeadersType headers, Message message) throws JMSException { - if (headers != null && headers.isSetProperty()) { List props = headers.getProperty(); for (int x = 0; x < props.size(); x++) { @@ -139,7 +116,6 @@ public static Message createAndSetPayload(Object payload, Session session, String messageType) throws JMSException { Message message = null; - if (JMSConstants.TEXT_MESSAGE_TYPE.equals(messageType)) { message = session.createTextMessage((String)payload); } else if (JMSConstants.BYTE_MESSAGE_TYPE.equals(messageType)) { @@ -149,7 +125,6 @@ message = session.createObjectMessage(); ((ObjectMessage)message).setObject((byte[])payload); } - return message; } @@ -177,9 +152,8 @@ } } - public static JMSMessageHeadersType populateIncomingContext(javax.jms.Message message, - org.apache.cxf.message.Message inMessage, - String headerType) { + public static void populateIncomingContext(javax.jms.Message message, + org.apache.cxf.message.Message inMessage, String headerType) { try { JMSMessageHeadersType headers = null; headers = (JMSMessageHeadersType)inMessage.get(headerType); @@ -220,7 +194,6 @@ } } inMessage.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, protHeaders); - return headers; } catch (JMSException ex) { throw JmsUtils.convertJmsAccessException(ex); } @@ -242,7 +215,7 @@ value.append(s); first = false; } - // Incase if the Content-Type header key is Content-Type replace with JMS_Content_Type + // If the Content-Type header key is Content-Type replace with JMS_Content_Type if (entry.getKey().equals(org.apache.cxf.message.Message.CONTENT_TYPE)) { message.setStringProperty(JMSConstants.JMS_CONTENT_TYPE, value.toString()); } else { @@ -252,20 +225,18 @@ } } - public static Map> getSetProtocolHeaders(org.apache.cxf.message.Message message) { + public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) { + String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE); + + // Retrieve or create protocol headers Map> headers = CastUtils.cast((Map)message .get(org.apache.cxf.message.Message.PROTOCOL_HEADERS)); if (null == headers) { headers = new HashMap>(); message.put(org.apache.cxf.message.Message.PROTOCOL_HEADERS, headers); } - return headers; - } - - public static void addContentTypeToProtocolHeader(org.apache.cxf.message.Message message) { - String contentType = (String)message.get(org.apache.cxf.message.Message.CONTENT_TYPE); - - Map> headers = JMSUtils.getSetProtocolHeaders(message); + + // Add content type to the protocol headers List ct; if (headers.get(JMSConstants.JMS_CONTENT_TYPE) != null) { ct = headers.get(JMSConstants.JMS_CONTENT_TYPE); @@ -275,14 +246,9 @@ ct = new ArrayList(); headers.put(JMSConstants.JMS_CONTENT_TYPE, ct); } - ct.add(contentType); } - public static boolean isDestinationStyleQueue(AddressType address) { - return JMSConstants.JMS_QUEUE.equals(address.getDestinationStyle().value()); - } - public static Message buildJMSMessageFromCXFMessage(org.apache.cxf.message.Message outMessage, Object payload, String messageType, Session session, Destination replyTo, String correlationId) @@ -312,76 +278,19 @@ return jmsMessage; } - public static void sendMessage(MessageProducer producer, Destination destination, Message jmsMessage, - long timeToLive, int deliveryMode, int priority) throws JMSException { - /* - * Can this be changed to producer.send(destination, jmsMessage, deliveryMode, priority, timeToLive); - */ - - if (destination instanceof Queue) { - QueueSender sender = (QueueSender)producer; - sender.setTimeToLive(timeToLive); - sender.send((Queue)destination, jmsMessage, deliveryMode, priority, timeToLive); - } else { - TopicPublisher publisher = (TopicPublisher)producer; - publisher.setTimeToLive(timeToLive); - publisher.publish((Topic)destination, jmsMessage, deliveryMode, priority, timeToLive); - } - } - - public static Destination resolveRequestDestination(Context context, Connection connection, - AddressType addrDetails) throws JMSException, - NamingException { - Destination requestDestination = null; - // see if jndiDestination is set - if (addrDetails.getJndiDestinationName() != null) { - requestDestination = (Destination)context.lookup(addrDetails.getJndiDestinationName()); - } - - // if no jndiDestination or it fails see if jmsDestination is set - // and try to create it. - if (requestDestination == null && addrDetails.getJmsDestinationName() != null) { - if (JMSUtils.isDestinationStyleQueue(addrDetails)) { - requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) - .createQueue(addrDetails.getJmsDestinationName()); - } else { - requestDestination = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) - .createTopic(addrDetails.getJmsDestinationName()); - } - } - return requestDestination; - } - - public static Queue resolveReplyDestination(Context context, Connection connection, - AddressType addrDetails) throws NamingException, - JMSException { - Queue replyDestination = null; - - // Reply Destination is used (if present) only if the session is - // point-to-point session - if (JMSUtils.isDestinationStyleQueue(addrDetails)) { - if (addrDetails.getJndiReplyDestinationName() != null) { - replyDestination = (Queue)context.lookup(addrDetails.getJndiReplyDestinationName()); - } - if (replyDestination == null && addrDetails.getJmsReplyDestinationName() != null) { - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - replyDestination = session.createQueue(addrDetails.getJmsReplyDestinationName()); - session.close(); - } - } - return replyDestination; - } - - public static String generateUniqueSelector() { + /** + * Create a unique correlation Id from + * __