cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r737102 - in /cxf/branches/2.0.x-fixes: ./ rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/ rt/transports/jms/src/main/resources/schemas/configuration/ rt/transports/jms/src/main/resources/schemas/wsdl/ rt/transports/jms/src/te...
Date Fri, 23 Jan 2009 17:24:34 GMT
Author: dkulp
Date: Fri Jan 23 09:24:34 2009
New Revision: 737102

URL: http://svn.apache.org/viewvc?rev=737102&view=rev
Log:
Merged revisions 736453 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/branches/2.1.x-fixes

................
  r736453 | dkulp | 2009-01-21 17:30:05 -0500 (Wed, 21 Jan 2009) | 9 lines
  
  Merged revisions 736408 via svnmerge from 
  https://svn.apache.org/repos/asf/cxf/trunk
  
  ........
    r736408 | cschneider | 2009-01-21 15:36:50 -0500 (Wed, 21 Jan 2009) | 1 line
    
    CXF-1978 Add configurable message selector the selects all messages with the conduit id
to allow permanent queues to be used by several instances concurrently
  ........
................

Modified:
    cxf/branches/2.0.x-fixes/   (props changed)
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jan 23 09:24:34 2009
@@ -1,3 +1,3 @@
-/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715,691745,692162-692163,692468,692500,694466-694469,694472,694717,694748-694749,694870,695503,695509,695553,695555,695563,695875-695877,695940,695980,696436,696455,696721,697086,698129,701526,701634,702275,702443,702527,702582,702604,702610,702642-702643,702649,702760,702870,702873,702959,703193,703242,703523,704303,704587,704738,704998,705153,705280-705449,705455,709357,709641,709644,710177,710184,711736,712199,712225,712275,712600,712896,713083,713410,713413,713594,713599,713808,713901,714169-714171,718622,718929,719211,719221-719223,7192
 96,719300-719301,719303,719308,719332,719356,719363,719369-719383,719650,719695,720124,723545,724403-724404,724421,724448,724451,724486-724487,724714,725367,725371,725763,725774,726045,726048,726106,726123,726745-726746,726749,726754,726756-726758,726995,727794,727797-727798,727800,731676,731684,731686-731688,731690,733587,733873,733876,733884,733891,733893,733915,735132,735136,735789,736451
-/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355,691488,691602,691706,691728,692116,692157,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694869,695396,695484,695537,695552,695561,695619,695684,695835,695935,695977,696094,696433,696720,697085,698128,700261,700602,701783,701830,701862,702267,702580,702602,702609,702616,702656,702957,703191,703239,703501,704584,704997,705150,705274,705340,705446,708550,708554,709353-709354,709425,710150,712194,712198,712272,712312,712670,712893,713082,713584,713597,713804,713899,714167-714168,718281,718565,718620,718640,718665,719017,719210,719215-719218,719222,719273,719327-719354,719362,719368,719382,719649,719680,720119-720217,723338,723717-723791,724334-724371,724433-724438,724449,724481,724485,724668,724782,724795,725754,725773,725799,725839,726342,726524,726631,726637,726639,726692,726724,726992,727445,727692,727754,727792,730139,731598,731604,731615,73163
 1,731635,732320,732363,732411,732710,732827,733582,734666,734836,735734,736352
+/cxf/branches/2.1.x-fixes:673548,674485,674547,674551,674562,674601,674649,674764,674887,675644,675653,677048,677385,678004,678009,678559,678629,678808,678852,678891,678893,679248,679597,680435,681060,681165,681813,681816,682902,682951,683089,683290,683318,684099,684790-684793,684842,684862,684895-684918,685205,685253,686237,686283,686299,686333-686364,686765,686827,687097,687464-687465,689109,689112,689122,691316,691357,691491,691711,691715,691745,692162-692163,692468,692500,694466-694469,694472,694717,694748-694749,694870,695503,695509,695553,695555,695563,695875-695877,695940,695980,696436,696455,696721,697086,698129,701526,701634,702275,702443,702527,702582,702604,702610,702642-702643,702649,702760,702870,702873,702959,703193,703242,703523,704303,704587,704738,704998,705153,705280-705449,705455,709357,709641,709644,710177,710184,711736,712199,712225,712275,712600,712896,713083,713410,713413,713594,713599,713808,713901,714169-714171,718622,718929,719211,719221-719223,7192
 96,719300-719301,719303,719308,719332,719356,719363,719369-719383,719650,719695,720124,723545,724403-724404,724421,724448,724451,724486-724487,724714,725367,725371,725763,725774,726045,726048,726106,726123,726745-726746,726749,726754,726756-726758,726995,727794,727797-727798,727800,731676,731684,731686-731688,731690,733587,733873,733876,733884,733891,733893,733915,735132,735136,735789,736451,736453
+/cxf/trunk:651669-686342,686344-686363,686764,686820,687096,687387,687463,688086,688102,688735,691271,691355,691488,691602,691706,691728,692116,692157,692466,692499,693653,693819,694179,694263,694417,694716,694744,694747,694869,695396,695484,695537,695552,695561,695619,695684,695835,695935,695977,696094,696433,696720,697085,698128,700261,700602,701783,701830,701862,702267,702580,702602,702609,702616,702656,702957,703191,703239,703501,704584,704997,705150,705274,705340,705446,708550,708554,709353-709354,709425,710150,712194,712198,712272,712312,712670,712893,713082,713584,713597,713804,713899,714167-714168,718281,718565,718620,718640,718665,719017,719210,719215-719218,719222,719273,719327-719354,719362,719368,719382,719649,719680,720119-720217,723338,723717-723791,724334-724371,724433-724438,724449,724481,724485,724668,724782,724795,725754,725773,725799,725839,726342,726524,726631,726637,726639,726692,726724,726992,727445,727692,727754,727792,730139,731598,731604,731615,73163
 1,731635,732320,732363,732411,732710,732827,733582,734666,734836,735734,736352,736408
 /incubator/cxf/trunk:434594-651668

Propchange: cxf/branches/2.0.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Fri Jan 23 09:24:34 2009
@@ -25,6 +25,7 @@
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -59,12 +60,21 @@
     private JMSConfiguration jmsConfig;
     private Map<String, Exchange> correlationMap;
     private DefaultMessageListenerContainer jmsListener;
+    private String conduitId;
+    private int messageCount;
 
     public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration
jmsConfig) {
         super(target);
         this.jmsConfig = jmsConfig;
         this.endpointInfo = endpointInfo;
         correlationMap = new ConcurrentHashMap<String, Exchange>();
+        conduitId = UUID.randomUUID().toString();
+        messageCount = 0;
+    }
+    
+    private synchronized String createCorrelationId() {
+        messageCount++;
+        return conduitId + "_" + messageCount;
     }
 
     /**
@@ -103,13 +113,14 @@
 
         JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers);
         if (!exchange.isOneWay() && jmsListener == null) {
-            jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getReplyDestination());
+            jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getReplyDestination(),

+                                                       conduitId);
         }
         
         final javax.jms.Destination replyTo = exchange.isOneWay() ? null : jmsListener.getDestination();
 
         final String correlationId = (headers != null && headers.isSetJMSCorrelationID())
? headers
-            .getJMSCorrelationID() : JMSUtils.generateCorrelationId();
+            .getJMSCorrelationID() : createCorrelationId();
             
         MessageCreator messageCreator = new MessageCreator() {
             public javax.jms.Message createMessage(Session session) throws JMSException {

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
Fri Jan 23 09:24:34 2009
@@ -60,6 +60,7 @@
     private String replyDestination;
     private String messageType = JMSConstants.TEXT_MESSAGE_TYPE;
     private boolean pubSubDomain;
+    private boolean useConduitIdSelector = true;
 
     public boolean isUsingEndpointInfo() {
         return this.usingEndpointInfo;
@@ -267,4 +268,12 @@
         this.taskExecutor = taskExecutor;
     }
 
+    public void setUseConduitIdSelector(boolean useConduitIdSelector) {
+        this.useConduitIdSelector = useConduitIdSelector;
+    }
+
+    public boolean isUseConduitIdSelector() {
+        return useConduitIdSelector;
+    }
+
 }

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
Fri Jan 23 09:24:34 2009
@@ -104,7 +104,7 @@
             throw new ConfigurationException(
                 new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION",
LOG, name));
         }
-        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getTargetDestination());
+        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getTargetDestination(),
null);
     }
 
     public void deactivate() {

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
Fri Jan 23 09:24:34 2009
@@ -1,137 +1,143 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.cxf.transport.jms;
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.cxf.transport.jms;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
+import org.springframework.jms.core.SessionCallback;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+import org.springframework.jms.listener.DefaultMessageListenerContainer102;
+import org.springframework.jms.support.destination.DestinationResolver;
+
+/**
+ * Factory to create JmsTemplates and JmsListeners from configuration and context information
+ */
+public final class JMSFactory {
+    
+    private JMSFactory() {
+    }
+
+    /**
+     * Create JmsTemplate from configuration information. Most settings are taken from jmsConfig.
The QoS
+     * settings in headers override the settings from jmsConfig
+     * 
+     * @param jmsConfig configuration information
+     * @param headers context headers
+     * @return
+     */
+    public static JmsTemplate createJmsTemplate(JMSConfiguration jmsConfig, JMSMessageHeadersType
headers) {
+        JmsTemplate jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102();
+        jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
+        jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
+        jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
+        jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive());
+        int priority = (headers != null && headers.isSetJMSPriority())
+            ? headers.getJMSPriority() : jmsConfig.getPriority();
+        jmsTemplate.setPriority(priority);
+        int deliveryMode = (headers != null && headers.isSetJMSDeliveryMode()) ?
headers
+            .getJMSDeliveryMode() : jmsConfig.getDeliveryMode();
+        jmsTemplate.setDeliveryMode(deliveryMode);
+        jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled());
+        jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
+        if (jmsConfig.getDestinationResolver() != null) {
+            jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
+        }
+        return jmsTemplate;
+    }
+
+    /**
+     * Create and start listener using configuration information from jmsConfig. Uses
+     * resolveOrCreateDestination to determine the destination for the listener.
+     * 
+     * @param jmsConfig configuration information
+     * @param listenerHandler object to be called when a message arrives
+     * @param destinationName null for temp dest or a destination name
+     * @param messageSelectorPrefix prefix for the messageselector
+     * @return
+     */
+    public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
+                                                                    MessageListener listenerHandler,
+                                                                    String destinationName,

+                                                                    String messageSelectorPrefix)
{
+        DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
+            ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
+        jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
+        jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
+        jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
+        jmsListener.setAutoStartup(true);
+        jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
+        jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
+        jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
+        jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
+        jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
+        jmsListener.setMessageListener(listenerHandler);
+        if (messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector())
{
+            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" + messageSelectorPrefix
+ "%'");
+        }
+        if (jmsConfig.getDestinationResolver() != null) {
+            jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
+        }
+        if (jmsConfig.getTaskExecutor() != null) {
+            jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor());
+        }
+        JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null);
+        Destination dest = JMSFactory.resolveOrCreateDestination(jmsTemplate, destinationName,
jmsConfig
+            .isPubSubDomain());
+        jmsListener.setDestination(dest);
+        jmsListener.initialize();
+        return jmsListener;
+    }
+
+    /**
+     * If the destinationName given is null then a temporary destination is created else
the destination name
+     * is resolved using the resolver from the jmsConfig
+     * 
+     * @param jmsTemplate template to use for session and resolver
+     * @param replyToDestinationName null for temporary destination or a destination name
+     * @param pubSubDomain true=pubSub, false=Queues
+     * @return resolved destination
+     */
+    private static Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
+                                                          final String replyToDestinationName,
+                                                          final boolean pubSubDomain) {
+        return (Destination)jmsTemplate.execute(new SessionCallback() {
+            public Object doInJms(Session session) throws JMSException {
+                if (replyToDestinationName == null) {
+                    if (session instanceof QueueSession) {
+                        // For JMS 1.0.2
+                        return ((QueueSession)session).createTemporaryQueue();
+                    } else {
+                        // For JMS 1.1
+                        return session.createTemporaryQueue();
+                    }
+                }
+                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
+                return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
+            }
+        });
+    }
+
+}
 
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.QueueSession;
-import javax.jms.Session;
-
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.JmsTemplate102;
-import org.springframework.jms.core.SessionCallback;
-import org.springframework.jms.listener.DefaultMessageListenerContainer;
-import org.springframework.jms.listener.DefaultMessageListenerContainer102;
-import org.springframework.jms.support.destination.DestinationResolver;
-
-/**
- * Factory to create JmsTemplates and JmsListeners from configuration and context information
- */
-public final class JMSFactory {
-    
-    private JMSFactory() {
-    }
-
-    /**
-     * Create JmsTemplate from configuration information. Most settings are taken from jmsConfig.
The QoS
-     * settings in headers override the settings from jmsConfig
-     * 
-     * @param jmsConfig configuration information
-     * @param headers context headers
-     * @return
-     */
-    public static JmsTemplate createJmsTemplate(JMSConfiguration jmsConfig, JMSMessageHeadersType
headers) {
-        JmsTemplate jmsTemplate = jmsConfig.isUseJms11() ? new JmsTemplate() : new JmsTemplate102();
-        jmsTemplate.setConnectionFactory(jmsConfig.getConnectionFactory());
-        jmsTemplate.setPubSubDomain(jmsConfig.isPubSubDomain());
-        jmsTemplate.setReceiveTimeout(jmsConfig.getReceiveTimeout());
-        jmsTemplate.setTimeToLive(jmsConfig.getTimeToLive());
-        int priority = (headers != null && headers.isSetJMSPriority())
-            ? headers.getJMSPriority() : jmsConfig.getPriority();
-        jmsTemplate.setPriority(priority);
-        int deliveryMode = (headers != null && headers.isSetJMSDeliveryMode()) ?
headers
-            .getJMSDeliveryMode() : jmsConfig.getDeliveryMode();
-        jmsTemplate.setDeliveryMode(deliveryMode);
-        jmsTemplate.setExplicitQosEnabled(jmsConfig.isExplicitQosEnabled());
-        jmsTemplate.setSessionTransacted(jmsConfig.isSessionTransacted());
-        if (jmsConfig.getDestinationResolver() != null) {
-            jmsTemplate.setDestinationResolver(jmsConfig.getDestinationResolver());
-        }
-        return jmsTemplate;
-    }
-
-    /**
-     * Create and start listener using configuration information from jmsConfig. Uses
-     * resolveOrCreateDestination to determine the destination for the listener.
-     * 
-     * @param jmsConfig configuration information
-     * @param listenerHandler object to be called when a message arrives
-     * @param destinationName null for temp dest or a destination name
-     * @return
-     */
-    public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
-                                                                    MessageListener listenerHandler,
-                                                                    String destinationName)
{
-        DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
-            ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
-        jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
-        jmsListener.setMaxConcurrentConsumers(jmsConfig.getMaxConcurrentConsumers());
-        jmsListener.setPubSubDomain(jmsConfig.isPubSubDomain());
-        jmsListener.setAutoStartup(true);
-        jmsListener.setConnectionFactory(jmsConfig.getConnectionFactory());
-        jmsListener.setMessageSelector(jmsConfig.getMessageSelector());
-        jmsListener.setDurableSubscriptionName(jmsConfig.getDurableSubscriptionName());
-        jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
-        jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
-        jmsListener.setMessageListener(listenerHandler);
-        if (jmsConfig.getDestinationResolver() != null) {
-            jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
-        }
-        if (jmsConfig.getTaskExecutor() != null) {
-            jmsListener.setTaskExecutor(jmsConfig.getTaskExecutor());
-        }
-        JmsTemplate jmsTemplate = createJmsTemplate(jmsConfig, null);
-        Destination dest = JMSFactory.resolveOrCreateDestination(jmsTemplate, destinationName,
jmsConfig
-            .isPubSubDomain());
-        jmsListener.setDestination(dest);
-        jmsListener.initialize();
-        return jmsListener;
-    }
-
-    /**
-     * If the destinationName given is null then a temporary destination is created else
the destination name
-     * is resolved using the resolver from the jmsConfig
-     * 
-     * @param jmsTemplate template to use for session and resolver
-     * @param replyToDestinationName null for temporary destination or a destination name
-     * @param pubSubDomain true=pubSub, false=Queues
-     * @return resolved destination
-     */
-    private static Destination resolveOrCreateDestination(final JmsTemplate jmsTemplate,
-                                                          final String replyToDestinationName,
-                                                          final boolean pubSubDomain) {
-        return (Destination)jmsTemplate.execute(new SessionCallback() {
-            public Object doInJms(Session session) throws JMSException {
-                if (replyToDestinationName == null) {
-                    if (session instanceof QueueSession) {
-                        // For JMS 1.0.2
-                        return ((QueueSession)session).createTemporaryQueue();
-                    } else {
-                        // For JMS 1.1
-                        return session.createTemporaryQueue();
-                    }
-                }
-                DestinationResolver resolv = jmsTemplate.getDestinationResolver();
-                return resolv.resolveDestinationName(session, replyToDestinationName, pubSubDomain);
-            }
-        });
-    }
-
-}

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
Fri Jan 23 09:24:34 2009
@@ -135,7 +135,8 @@
             jmsConfig.setPubSubNoLocal(true);
             //if (clientConfig.isSetClientReceiveTimeout()) {
             jmsConfig.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
-            //}            
+            //}
+            jmsConfig.setUseConduitIdSelector(clientConfig.isUseConduitIdSelector());
             jmsConfig.setSubscriptionDurable(serverBehavior.isSetDurableSubscriberName());
      
             jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName());
       
         

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
Fri Jan 23 09:24:34 2009
@@ -20,10 +20,7 @@
 package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -285,20 +282,4 @@
         return jmsMessage;
     }
 
-    /**
-     * Create a unique correlation Id from
-     * <host>_<user.name>_<currentThread><time>
-     * @return correlationId
-     */
-    public static String generateCorrelationId() {
-        String host = "localhost";
-        try {
-            InetAddress addr = InetAddress.getLocalHost();
-            host = addr.getHostName();
-        } catch (UnknownHostException ukex) {
-            // Default to localhost
-        }
-        long time = Calendar.getInstance().getTimeInMillis();
-        return host + "_" + System.getProperty("user.name") + "_" + Thread.currentThread()
+ time;
-    }
 }

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
Fri Jan 23 09:24:34 2009
@@ -1,71 +1,74 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements. See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership. The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License. You may obtain a copy of the License at
- 
-  http://www.apache.org/licenses/LICENSE-2.0
- 
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied. See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 
-  xmlns:jms="http://cxf.apache.org/transports/jms" 
-  xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" 
-  xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" 
-  xmlns:beans="http://www.springframework.org/schema/beans"
-  xmlns:cxf-beans="http://cxf.apache.org/configuration/beans"
-  targetNamespace="http://cxf.apache.org/transports/jms" 
-  elementFormDefault="qualified" jaxb:version="2.0">
-  
-    <xs:include schemaLocation="http://cxf.apache.org/schemas/wsdl/jms.xsd"/> 
-    <xs:import namespace="http://www.springframework.org/schema/beans" schemaLocation="http://www.springframework.org/schema/beans/spring-beans.xsd"/>
-    <xs:import namespace="http://cxf.apache.org/configuration/beans" schemaLocation="http://cxf.apache.org/schemas/configuration/cxf-beans.xsd"/>
-    
-    <xs:element name="destination">
-        <xs:complexType>
-            <xs:complexContent>
-                <xs:extension base="beans:identifiedType">
-                    <xs:sequence>
-                        <xs:element name="serverConfig" type="jms:ServerConfig" minOccurs="0"/>
-                        <xs:element name="runtimePolicy" type="jms:ServerBehaviorPolicyType"
minOccurs="0"/>
-                        <xs:element name="sessionPool" type="jms:SessionPoolType" minOccurs="0"/>
-                        <xs:element name="address" type="jms:AddressType" minOccurs="0"/>
-                        <xs:element name="jmsConfig-ref" type="xs:string" maxOccurs="1"
minOccurs="0"/>                    	
-                    </xs:sequence>
-                    <xs:attributeGroup ref="cxf-beans:beanAttributes"/>
-                </xs:extension>
-            </xs:complexContent>
-        </xs:complexType>
-    </xs:element>
-    
-    <xs:element name="conduit">
-        <xs:complexType>
-            <xs:complexContent>
-                <xs:extension base="beans:identifiedType">
-                    <xs:sequence>
-                    	<xs:element name="clientConfig"
-                    		type="jms:ClientConfig" minOccurs="0" />
-                    	<xs:element name="runtimePolicy"
-                    		type="jms:ClientBehaviorPolicyType" minOccurs="0" />
-                    	<xs:element name="sessionPool"
-                    		type="jms:SessionPoolType" minOccurs="0" />
-                    	<xs:element name="address"
-                    		type="jms:AddressType" minOccurs="0" />
-                    	<xs:element name="jmsConfig-ref" type="xs:string" maxOccurs="1" minOccurs="0"/>
                   	
-                    </xs:sequence>
-                    <xs:attributeGroup ref="cxf-beans:beanAttributes"/>
-                </xs:extension>
-            </xs:complexContent>
-        </xs:complexType>
-    </xs:element>
-    
+	<!--
+		Licensed to the Apache Software Foundation (ASF) under one or more
+		contributor license agreements. See the NOTICE file distributed with
+		this work for additional information regarding copyright ownership.
+		The ASF licenses this file to you under the Apache License, Version
+		2.0 (the "License"); you may not use this file except in compliance
+		with the License. You may obtain a copy of the License at
+
+		http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+		applicable law or agreed to in writing, software distributed under the
+		License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+		CONDITIONS OF ANY KIND, either express or implied. See the License for
+		the specific language governing permissions and limitations under the
+		License.
+	-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+	xmlns:jms="http://cxf.apache.org/transports/jms" xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
+	xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" xmlns:beans="http://www.springframework.org/schema/beans"
+	xmlns:cxf-beans="http://cxf.apache.org/configuration/beans"
+	targetNamespace="http://cxf.apache.org/transports/jms"
+	elementFormDefault="qualified" jaxb:version="2.0">
+
+	<xs:include schemaLocation="http://cxf.apache.org/schemas/wsdl/jms.xsd" />
+	<xs:import namespace="http://www.springframework.org/schema/beans"
+		schemaLocation="http://www.springframework.org/schema/beans/spring-beans.xsd" />
+	<xs:import namespace="http://cxf.apache.org/configuration/beans"
+		schemaLocation="http://cxf.apache.org/schemas/configuration/cxf-beans.xsd" />
+
+	<xs:element name="destination">
+		<xs:complexType>
+			<xs:complexContent>
+				<xs:extension base="beans:identifiedType">
+					<xs:sequence>
+						<xs:element name="serverConfig" type="jms:ServerConfig"
+							minOccurs="0" />
+						<xs:element name="runtimePolicy" type="jms:ServerBehaviorPolicyType"
+							minOccurs="0" />
+						<xs:element name="sessionPool" type="jms:SessionPoolType"
+							minOccurs="0" />
+						<xs:element name="address" type="jms:AddressType"
+							minOccurs="0" />
+						<xs:element name="jmsConfig-ref" type="xs:string"
+							maxOccurs="1" minOccurs="0" />
+					</xs:sequence>
+					<xs:attributeGroup ref="cxf-beans:beanAttributes" />
+				</xs:extension>
+			</xs:complexContent>
+		</xs:complexType>
+	</xs:element>
+
+	<xs:element name="conduit">
+		<xs:complexType>
+			<xs:complexContent>
+				<xs:extension base="beans:identifiedType">
+					<xs:sequence>
+						<xs:element name="clientConfig" type="jms:ClientConfig"
+							minOccurs="0" />
+						<xs:element name="runtimePolicy" type="jms:ClientBehaviorPolicyType"
+							minOccurs="0" />
+						<xs:element name="sessionPool" type="jms:SessionPoolType"
+							minOccurs="0" />
+						<xs:element name="address" type="jms:AddressType"
+							minOccurs="0" />
+						<xs:element name="jmsConfig-ref" type="xs:string"
+							maxOccurs="1" minOccurs="0" />
+					</xs:sequence>
+					<xs:attributeGroup ref="cxf-beans:beanAttributes" />
+				</xs:extension>
+			</xs:complexContent>
+		</xs:complexType>
+	</xs:element>
+
 </xs:schema>

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd (original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd Fri
Jan 23 09:24:34 2009
@@ -48,14 +48,17 @@
     </xs:complexType>
     
     <xs:complexType name="ClientConfig">
-        <xs:complexContent>
-            <xs:extension base="wsdl:tExtensibilityElement">  
-				<xs:attribute name="clientReceiveTimeout" type="xs:long" default="60000"/>
-        		<xs:attribute name="messageTimeToLive" type="xs:long" default="0" />
-        	</xs:extension>
-		</xs:complexContent>        	
+    	<xs:complexContent>
+    		<xs:extension base="wsdl:tExtensibilityElement">
+    			<xs:attribute name="clientReceiveTimeout" type="xs:long"
+    				default="60000" />
+    			<xs:attribute name="messageTimeToLive" type="xs:long"
+    				default="0" />
+    			<xs:attribute name="useConduitIdSelector" type="xs:boolean" use="optional" default="true"></xs:attribute>
+    		</xs:extension>
+    	</xs:complexContent>
     </xs:complexType>
-    
+
     <xs:complexType name="ServerConfig">
 		<xs:complexContent>
             <xs:extension base="wsdl:tExtensibilityElement">

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
Fri Jan 23 09:24:34 2009
@@ -42,6 +42,8 @@
 import org.junit.Before;
 
 public abstract class AbstractJMSTester extends Assert {
+    protected static final String MESSAGE_CONTENT = "HelloWorld";
+
     private static JMSBrokerSetup broker;
 
     protected Bus bus;
@@ -102,7 +104,7 @@
         }
         OutputStream os = message.getContent(OutputStream.class);
         assertTrue("The OutputStream should not be null ", os != null);
-        os.write("HelloWorld".getBytes()); // TODO encoding
+        os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
         os.close();
     }
 

Modified: cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=737102&r1=737101&r2=737102&view=diff
==============================================================================
--- cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
(original)
+++ cxf/branches/2.0.x-fixes/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
Fri Jan 23 09:24:34 2009
@@ -208,15 +208,19 @@
         destination.shutdown();
     }
 
-    private void setupMessageHeader(Message outMessage) {
+    private void setupMessageHeader(Message outMessage, String correlationId) {
         JMSMessageHeadersType header = new JMSMessageHeadersType();
-        header.setJMSCorrelationID("Destination test");
+        header.setJMSCorrelationID(correlationId);
         header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
         header.setJMSPriority(1);
         header.setTimeToLive(1000);
         outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
         outMessage.put(Message.ENCODING, "US-ASCII");
     }
+    
+    private void setupMessageHeader(Message outMessage) {
+        setupMessageHeader(outMessage, "Destination test");
+    }
 
     private void verifyReceivedMessage(Message inMessage) {
         ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
@@ -227,8 +231,8 @@
             assertFalse("Read the Destination recieved Message error ", false);
             ex.printStackTrace();
         }
-        String reponse = new String(bytes);
-        assertEquals("The reponse date should be equal", reponse, "HelloWorld");
+        String response = IOUtils.newStringFromBytes(bytes);
+        assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT,
response);
     }
 
     private void verifyRequestResponseHeaders(Message inMessage, Message outMessage) {
@@ -258,8 +262,13 @@
     }
 
     private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType
inHeader) {
-        assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals",
outHeader
-            .getJMSCorrelationID(), inHeader.getJMSCorrelationID());
+        if (outHeader.getJMSCorrelationID() != null) {
+            // only check if the correlation id was explicitly set as
+            // otherwise the in header will contain an automatically
+            // generated correlation id
+            assertEquals("The inMessage and outMessage JMS Header's CorrelationID should
be equals", outHeader
+                         .getJMSCorrelationID(), inHeader.getJMSCorrelationID());
+        }
         assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals",
outHeader
             .getJMSPriority(), inHeader.getJMSPriority());
         assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be
equals", outHeader
@@ -277,7 +286,7 @@
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduit(true, false);
         final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null);
         final JMSDestination destination = setupJMSDestination(true);
 
         // set up MessageObserver for handling the conduit message
@@ -332,7 +341,7 @@
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduit(true, false);
         final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null);
 
         JMSPropertyType excludeProp = new JMSPropertyType();
         excludeProp.setName(customPropertyName);



Mime
View raw message