cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cschnei...@apache.org
Subject svn commit: r736408 - in /cxf/trunk/rt/transports/jms/src: main/java/org/apache/cxf/transport/jms/ main/resources/schemas/configuration/ main/resources/schemas/wsdl/ test/java/org/apache/cxf/transport/jms/
Date Wed, 21 Jan 2009 20:36:51 GMT
Author: cschneider
Date: Wed Jan 21 12:36:50 2009
New Revision: 736408

URL: http://svn.apache.org/viewvc?rev=736408&view=rev
Log:
CXF-1978 Add configurable message selector the selects all messages with the conduit id to
allow permanent queues to be used by several instances concurrently

Modified:
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
    cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
    cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
    cxf/trunk/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
    cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConduit.java
Wed Jan 21 12:36:50 2009
@@ -25,6 +25,7 @@
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -59,12 +60,21 @@
     private JMSConfiguration jmsConfig;
     private Map<String, Exchange> correlationMap;
     private DefaultMessageListenerContainer jmsListener;
+    private String conduitId;
+    private int messageCount;
 
     public JMSConduit(EndpointInfo endpointInfo, EndpointReferenceType target, JMSConfiguration
jmsConfig) {
         super(target);
         this.jmsConfig = jmsConfig;
         this.endpointInfo = endpointInfo;
         correlationMap = new ConcurrentHashMap<String, Exchange>();
+        conduitId = UUID.randomUUID().toString();
+        messageCount = 0;
+    }
+    
+    private synchronized String createCorrelationId() {
+        messageCount++;
+        return conduitId + "_" + messageCount;
     }
 
     /**
@@ -103,13 +113,14 @@
 
         JmsTemplate jmsTemplate = JMSFactory.createJmsTemplate(jmsConfig, headers);
         if (!exchange.isOneWay() && jmsListener == null) {
-            jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getReplyDestination());
+            jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getReplyDestination(),

+                                                       conduitId);
         }
         
         final javax.jms.Destination replyTo = exchange.isOneWay() ? null : jmsListener.getDestination();
 
         final String correlationId = (headers != null && headers.isSetJMSCorrelationID())
? headers
-            .getJMSCorrelationID() : JMSUtils.generateCorrelationId();
+            .getJMSCorrelationID() : createCorrelationId();
             
         MessageCreator messageCreator = new MessageCreator() {
             public javax.jms.Message createMessage(Session session) throws JMSException {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSConfiguration.java
Wed Jan 21 12:36:50 2009
@@ -59,6 +59,7 @@
     private String replyDestination;
     private String messageType = JMSConstants.TEXT_MESSAGE_TYPE;
     private boolean pubSubDomain;
+    private boolean useConduitIdSelector = true;
 
     public boolean isUsingEndpointInfo() {
         return this.usingEndpointInfo;
@@ -266,4 +267,12 @@
         this.taskExecutor = taskExecutor;
     }
 
+    public void setUseConduitIdSelector(boolean useConduitIdSelector) {
+        this.useConduitIdSelector = useConduitIdSelector;
+    }
+
+    public boolean isUseConduitIdSelector() {
+        return useConduitIdSelector;
+    }
+
 }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSDestination.java
Wed Jan 21 12:36:50 2009
@@ -104,7 +104,7 @@
             throw new ConfigurationException(
                 new org.apache.cxf.common.i18n.Message("INSUFFICIENT_CONFIGURATION_DESTINATION",
LOG, name));
         }
-        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getTargetDestination());
+        jmsListener = JMSFactory.createJmsListener(jmsConfig, this, jmsConfig.getTargetDestination(),
null);
     }
 
     public void deactivate() {

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSFactory.java
Wed Jan 21 12:36:50 2009
@@ -74,11 +74,13 @@
      * @param jmsConfig configuration information
      * @param listenerHandler object to be called when a message arrives
      * @param destinationName null for temp dest or a destination name
+     * @param messageSelectorPrefix prefix for the messageselector
      * @return
      */
     public static DefaultMessageListenerContainer createJmsListener(JMSConfiguration jmsConfig,
                                                                     MessageListener listenerHandler,
-                                                                    String destinationName)
{
+                                                                    String destinationName,

+                                                                    String messageSelectorPrefix)
{
         DefaultMessageListenerContainer jmsListener = jmsConfig.isUseJms11()
             ? new DefaultMessageListenerContainer() : new DefaultMessageListenerContainer102();
         jmsListener.setConcurrentConsumers(jmsConfig.getConcurrentConsumers());
@@ -91,6 +93,9 @@
         jmsListener.setSessionTransacted(jmsConfig.isSessionTransacted());
         jmsListener.setTransactionManager(jmsConfig.getTransactionManager());
         jmsListener.setMessageListener(listenerHandler);
+        if (messageSelectorPrefix != null && jmsConfig.isUseConduitIdSelector())
{
+            jmsListener.setMessageSelector("JMSCorrelationID LIKE '" + messageSelectorPrefix
+ "%'");
+        }
         if (jmsConfig.getDestinationResolver() != null) {
             jmsListener.setDestinationResolver(jmsConfig.getDestinationResolver());
         }

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
(original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSOldConfigHolder.java
Wed Jan 21 12:36:50 2009
@@ -135,7 +135,8 @@
             jmsConfig.setPubSubNoLocal(true);
             //if (clientConfig.isSetClientReceiveTimeout()) {
             jmsConfig.setReceiveTimeout(clientConfig.getClientReceiveTimeout());
-            //}            
+            //}
+            jmsConfig.setUseConduitIdSelector(clientConfig.isUseConduitIdSelector());
             jmsConfig.setSubscriptionDurable(serverBehavior.isSetDurableSubscriberName());
      
             jmsConfig.setDurableSubscriptionName(serverBehavior.getDurableSubscriberName());
       
         

Modified: cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java (original)
+++ cxf/trunk/rt/transports/jms/src/main/java/org/apache/cxf/transport/jms/JMSUtils.java Wed
Jan 21 12:36:50 2009
@@ -20,10 +20,7 @@
 package org.apache.cxf.transport.jms;
 
 import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
-import java.util.Calendar;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
@@ -285,20 +282,4 @@
         return jmsMessage;
     }
 
-    /**
-     * Create a unique correlation Id from
-     * <host>_<user.name>_<currentThread><time>
-     * @return correlationId
-     */
-    public static String generateCorrelationId() {
-        String host = "localhost";
-        try {
-            InetAddress addr = InetAddress.getLocalHost();
-            host = addr.getHostName();
-        } catch (UnknownHostException ukex) {
-            // Default to localhost
-        }
-        long time = Calendar.getInstance().getTimeInMillis();
-        return host + "_" + System.getProperty("user.name") + "_" + Thread.currentThread()
+ time;
-    }
 }

Modified: cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd (original)
+++ cxf/trunk/rt/transports/jms/src/main/resources/schemas/configuration/jms.xsd Wed Jan 21
12:36:50 2009
@@ -1,71 +1,74 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements. See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership. The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License. You may obtain a copy of the License at
- 
-  http://www.apache.org/licenses/LICENSE-2.0
- 
-  Unless required by applicable law or agreed to in writing,
-  software distributed under the License is distributed on an
-  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-  KIND, either express or implied. See the License for the
-  specific language governing permissions and limitations
-  under the License.
--->
-<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" 
-  xmlns:jms="http://cxf.apache.org/transports/jms" 
-  xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/" 
-  xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" 
-  xmlns:beans="http://www.springframework.org/schema/beans"
-  xmlns:cxf-beans="http://cxf.apache.org/configuration/beans"
-  targetNamespace="http://cxf.apache.org/transports/jms" 
-  elementFormDefault="qualified" jaxb:version="2.0">
-  
-    <xs:include schemaLocation="http://cxf.apache.org/schemas/wsdl/jms.xsd"/> 
-    <xs:import namespace="http://www.springframework.org/schema/beans" schemaLocation="http://www.springframework.org/schema/beans/spring-beans.xsd"/>
-    <xs:import namespace="http://cxf.apache.org/configuration/beans" schemaLocation="http://cxf.apache.org/schemas/configuration/cxf-beans.xsd"/>
-    
-    <xs:element name="destination">
-        <xs:complexType>
-            <xs:complexContent>
-                <xs:extension base="beans:identifiedType">
-                    <xs:sequence>
-                        <xs:element name="serverConfig" type="jms:ServerConfig" minOccurs="0"/>
-                        <xs:element name="runtimePolicy" type="jms:ServerBehaviorPolicyType"
minOccurs="0"/>
-                        <xs:element name="sessionPool" type="jms:SessionPoolType" minOccurs="0"/>
-                        <xs:element name="address" type="jms:AddressType" minOccurs="0"/>
-                        <xs:element name="jmsConfig-ref" type="xs:string" maxOccurs="1"
minOccurs="0"/>                    	
-                    </xs:sequence>
-                    <xs:attributeGroup ref="cxf-beans:beanAttributes"/>
-                </xs:extension>
-            </xs:complexContent>
-        </xs:complexType>
-    </xs:element>
-    
-    <xs:element name="conduit">
-        <xs:complexType>
-            <xs:complexContent>
-                <xs:extension base="beans:identifiedType">
-                    <xs:sequence>
-                    	<xs:element name="clientConfig"
-                    		type="jms:ClientConfig" minOccurs="0" />
-                    	<xs:element name="runtimePolicy"
-                    		type="jms:ClientBehaviorPolicyType" minOccurs="0" />
-                    	<xs:element name="sessionPool"
-                    		type="jms:SessionPoolType" minOccurs="0" />
-                    	<xs:element name="address"
-                    		type="jms:AddressType" minOccurs="0" />
-                    	<xs:element name="jmsConfig-ref" type="xs:string" maxOccurs="1" minOccurs="0"/>
                   	
-                    </xs:sequence>
-                    <xs:attributeGroup ref="cxf-beans:beanAttributes"/>
-                </xs:extension>
-            </xs:complexContent>
-        </xs:complexType>
-    </xs:element>
-    
+	<!--
+		Licensed to the Apache Software Foundation (ASF) under one or more
+		contributor license agreements. See the NOTICE file distributed with
+		this work for additional information regarding copyright ownership.
+		The ASF licenses this file to you under the Apache License, Version
+		2.0 (the "License"); you may not use this file except in compliance
+		with the License. You may obtain a copy of the License at
+
+		http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+		applicable law or agreed to in writing, software distributed under the
+		License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+		CONDITIONS OF ANY KIND, either express or implied. See the License for
+		the specific language governing permissions and limitations under the
+		License.
+	-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+	xmlns:jms="http://cxf.apache.org/transports/jms" xmlns:wsdl="http://schemas.xmlsoap.org/wsdl/"
+	xmlns:jaxb="http://java.sun.com/xml/ns/jaxb" xmlns:beans="http://www.springframework.org/schema/beans"
+	xmlns:cxf-beans="http://cxf.apache.org/configuration/beans"
+	targetNamespace="http://cxf.apache.org/transports/jms"
+	elementFormDefault="qualified" jaxb:version="2.0">
+
+	<xs:include schemaLocation="http://cxf.apache.org/schemas/wsdl/jms.xsd" />
+	<xs:import namespace="http://www.springframework.org/schema/beans"
+		schemaLocation="http://www.springframework.org/schema/beans/spring-beans.xsd" />
+	<xs:import namespace="http://cxf.apache.org/configuration/beans"
+		schemaLocation="http://cxf.apache.org/schemas/configuration/cxf-beans.xsd" />
+
+	<xs:element name="destination">
+		<xs:complexType>
+			<xs:complexContent>
+				<xs:extension base="beans:identifiedType">
+					<xs:sequence>
+						<xs:element name="serverConfig" type="jms:ServerConfig"
+							minOccurs="0" />
+						<xs:element name="runtimePolicy" type="jms:ServerBehaviorPolicyType"
+							minOccurs="0" />
+						<xs:element name="sessionPool" type="jms:SessionPoolType"
+							minOccurs="0" />
+						<xs:element name="address" type="jms:AddressType"
+							minOccurs="0" />
+						<xs:element name="jmsConfig-ref" type="xs:string"
+							maxOccurs="1" minOccurs="0" />
+					</xs:sequence>
+					<xs:attributeGroup ref="cxf-beans:beanAttributes" />
+				</xs:extension>
+			</xs:complexContent>
+		</xs:complexType>
+	</xs:element>
+
+	<xs:element name="conduit">
+		<xs:complexType>
+			<xs:complexContent>
+				<xs:extension base="beans:identifiedType">
+					<xs:sequence>
+						<xs:element name="clientConfig" type="jms:ClientConfig"
+							minOccurs="0" />
+						<xs:element name="runtimePolicy" type="jms:ClientBehaviorPolicyType"
+							minOccurs="0" />
+						<xs:element name="sessionPool" type="jms:SessionPoolType"
+							minOccurs="0" />
+						<xs:element name="address" type="jms:AddressType"
+							minOccurs="0" />
+						<xs:element name="jmsConfig-ref" type="xs:string"
+							maxOccurs="1" minOccurs="0" />
+					</xs:sequence>
+					<xs:attributeGroup ref="cxf-beans:beanAttributes" />
+				</xs:extension>
+			</xs:complexContent>
+		</xs:complexType>
+	</xs:element>
+
 </xs:schema>

Modified: cxf/trunk/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd (original)
+++ cxf/trunk/rt/transports/jms/src/main/resources/schemas/wsdl/jms.xsd Wed Jan 21 12:36:50
2009
@@ -48,14 +48,17 @@
     </xs:complexType>
     
     <xs:complexType name="ClientConfig">
-        <xs:complexContent>
-            <xs:extension base="wsdl:tExtensibilityElement">  
-				<xs:attribute name="clientReceiveTimeout" type="xs:long" default="60000"/>
-        		<xs:attribute name="messageTimeToLive" type="xs:long" default="0" />
-        	</xs:extension>
-		</xs:complexContent>        	
+    	<xs:complexContent>
+    		<xs:extension base="wsdl:tExtensibilityElement">
+    			<xs:attribute name="clientReceiveTimeout" type="xs:long"
+    				default="60000" />
+    			<xs:attribute name="messageTimeToLive" type="xs:long"
+    				default="0" />
+    			<xs:attribute name="useConduitIdSelector" type="xs:boolean" use="optional" default="true"></xs:attribute>
+    		</xs:extension>
+    	</xs:complexContent>
     </xs:complexType>
-    
+
     <xs:complexType name="ServerConfig">
 		<xs:complexContent>
             <xs:extension base="wsdl:tExtensibilityElement">

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
(original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/AbstractJMSTester.java
Wed Jan 21 12:36:50 2009
@@ -42,6 +42,8 @@
 import org.junit.Before;
 
 public abstract class AbstractJMSTester extends Assert {
+    protected static final String MESSAGE_CONTENT = "HelloWorld";
+
     private static JMSBrokerSetup broker;
 
     protected Bus bus;
@@ -102,7 +104,7 @@
         }
         OutputStream os = message.getContent(OutputStream.class);
         assertTrue("The OutputStream should not be null ", os != null);
-        os.write("HelloWorld".getBytes()); // TODO encoding
+        os.write(MESSAGE_CONTENT.getBytes()); // TODO encoding
         os.close();
     }
 

Modified: cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java?rev=736408&r1=736407&r2=736408&view=diff
==============================================================================
--- cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
(original)
+++ cxf/trunk/rt/transports/jms/src/test/java/org/apache/cxf/transport/jms/JMSDestinationTest.java
Wed Jan 21 12:36:50 2009
@@ -207,15 +207,19 @@
         destination.shutdown();
     }
 
-    private void setupMessageHeader(Message outMessage) {
+    private void setupMessageHeader(Message outMessage, String correlationId) {
         JMSMessageHeadersType header = new JMSMessageHeadersType();
-        header.setJMSCorrelationID("Destination test");
+        header.setJMSCorrelationID(correlationId);
         header.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
         header.setJMSPriority(1);
         header.setTimeToLive(1000);
         outMessage.put(JMSConstants.JMS_CLIENT_REQUEST_HEADERS, header);
         outMessage.put(Message.ENCODING, "US-ASCII");
     }
+    
+    private void setupMessageHeader(Message outMessage) {
+        setupMessageHeader(outMessage, "Destination test");
+    }
 
     private void verifyReceivedMessage(Message inMessage) {
         ByteArrayInputStream bis = (ByteArrayInputStream)inMessage.getContent(InputStream.class);
@@ -226,8 +230,8 @@
             assertFalse("Read the Destination recieved Message error ", false);
             ex.printStackTrace();
         }
-        String reponse = IOUtils.newStringFromBytes(bytes);
-        assertEquals("The reponse date should be equal", reponse, "HelloWorld");
+        String response = IOUtils.newStringFromBytes(bytes);
+        assertEquals("The response content should be equal", AbstractJMSTester.MESSAGE_CONTENT,
response);
     }
 
     private void verifyRequestResponseHeaders(Message inMessage, Message outMessage) {
@@ -257,8 +261,13 @@
     }
 
     private void verifyJmsHeaderEquality(JMSMessageHeadersType outHeader, JMSMessageHeadersType
inHeader) {
-        assertEquals("The inMessage and outMessage JMS Header's CorrelationID should be equals",
outHeader
-            .getJMSCorrelationID(), inHeader.getJMSCorrelationID());
+        if (outHeader.getJMSCorrelationID() != null) {
+            // only check if the correlation id was explicitly set as
+            // otherwise the in header will contain an automatically
+            // generated correlation id
+            assertEquals("The inMessage and outMessage JMS Header's CorrelationID should
be equals", outHeader
+                         .getJMSCorrelationID(), inHeader.getJMSCorrelationID());
+        }
         assertEquals("The inMessage and outMessage JMS Header's JMSPriority should be equals",
outHeader
             .getJMSPriority(), inHeader.getJMSPriority());
         assertEquals("The inMessage and outMessage JMS Header's JMSDeliveryMode should be
equals", outHeader
@@ -276,7 +285,7 @@
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduit(true, false);
         final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null);
         final JMSDestination destination = setupJMSDestination(true);
 
         // set up MessageObserver for handling the conduit message
@@ -331,7 +340,7 @@
         // set up the conduit send to be true
         JMSConduit conduit = setupJMSConduit(true, false);
         final Message outMessage = new MessageImpl();
-        setupMessageHeader(outMessage);
+        setupMessageHeader(outMessage, null);
 
         JMSPropertyType excludeProp = new JMSPropertyType();
         excludeProp.setName(customPropertyName);



Mime
View raw message