qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rob...@apache.org
Subject [1/3] qpid-jms git commit: support adding prefixes to the to/reply-to fields on messages when setting JMSDestination/JMSReplyTo value if the connection has prefixes configured
Date Fri, 19 Dec 2014 19:59:03 GMT
Repository: qpid-jms
Updated Branches:
  refs/heads/master 0e590e7ce -> 3dbfdb7be


support adding prefixes to the to/reply-to fields on messages when setting JMSDestination/JMSReplyTo
value if the connection has prefixes configured


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9bd7e592
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9bd7e592
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9bd7e592

Branch: refs/heads/master
Commit: 9bd7e5920ea08c380efeb1672b1f94b67134ca8f
Parents: 0e590e7
Author: Robert Gemmell <robbie@apache.org>
Authored: Fri Dec 19 19:46:32 2014 +0000
Committer: Robert Gemmell <robbie@apache.org>
Committed: Fri Dec 19 19:51:08 2014 +0000

----------------------------------------------------------------------
 .../amqp/message/AmqpDestinationHelper.java     |  31 +++-
 .../jms/integration/MessageIntegrationTest.java | 144 ++++++++++++++++++-
 .../amqp/message/AmqpDestinationHelperTest.java |   8 ++
 3 files changed, 178 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9bd7e592/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 0f770fe..da10adb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -150,7 +150,7 @@ public class AmqpDestinationHelper {
     }
 
     public void setToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination
destination) {
-        String address = destination != null ? destination.getName() : null;
+        String address = getDestinationAddress(destination, message.getConnection());
         Object typeValue = toTypeAnnotation(destination);
 
         message.setToAddress(address);
@@ -163,7 +163,7 @@ public class AmqpDestinationHelper {
     }
 
     public void setReplyToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination
destination) {
-        String replyToAddress = destination != null ? destination.getName() : null;
+        String replyToAddress = getDestinationAddress(destination, message.getConnection());
         Object typeValue = toTypeAnnotation(destination);
 
         message.setReplyToAddress(replyToAddress);
@@ -175,6 +175,33 @@ public class AmqpDestinationHelper {
         }
     }
 
+    private String getDestinationAddress(JmsDestination destination, AmqpConnection conn)
{
+        if (destination == null) {
+            return null;
+        }
+
+        final String name = destination.getName();
+
+        // Add prefix if necessary
+        if (!destination.isTemporary()) {
+            if (destination.isQueue()) {
+                String queuePrefix = conn.getQueuePrefix();
+                if (queuePrefix != null && !name.startsWith(queuePrefix)) {
+                    return queuePrefix + name;
+                }
+            }
+
+            if (destination.isTopic()) {
+                String topicPrefix = conn.getTopicPrefix();
+                if (topicPrefix != null && !name.startsWith(topicPrefix)) {
+                    return topicPrefix + name;
+                }
+            }
+        }
+
+        return name;
+    }
+
     /**
      * @return the annotation type value, or null if the supplied destination
      *         is null or can't be classified

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9bd7e592/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
index 0734718..ba64434 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MessageIntegrationTest.java
@@ -54,6 +54,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationProp
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.SourceMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.TargetMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
@@ -120,7 +121,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase
 
             MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
 
-            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo("queue://"
+ queueName));
 
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(headersMatcher);
@@ -514,6 +515,143 @@ public class MessageIntegrationTest extends QpidJmsTestCase
         }
     }
 
+    /**
+     * Tests that the a connection with a 'topic prefix' set on it adds the
+     * prefix to the content of the to/reply-to fields for outgoing messages.
+     */
+    @Test(timeout = 2000)
+    public void testSendMessageWithTopicDestinationsOnConnectionWithTopicPrefix() throws
Exception {
+        Class<? extends Destination> destType = Topic.class;
+        String destPrefix = "t12321-";
+        String destName = "myTopic";
+        String destAddress = destPrefix + destName;
+        Byte annotationValue = AmqpDestinationHelper.TOPIC_TYPE;
+
+        doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress,
annotationValue);
+    }
+
+    /**
+     * Tests that the a connection with a 'queue prefix' set on it adds the
+     * prefix to the content of the to/reply-to fields for outgoing messages.
+     */
+    @Test(timeout = 2000)
+    public void testSendMessageWithQueueDestinationsOnConnectionWithQueuePrefix() throws
Exception {
+        Class<? extends Destination> destType = Queue.class;
+        String destPrefix = "q12321-";
+        String destName = "myQueue";
+        String destAddress = destPrefix + destName;
+        Byte annotationValue = AmqpDestinationHelper.QUEUE_TYPE;
+
+        doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress,
annotationValue);
+    }
+
+    /**
+     * Tests that the a connection with 'destination prefixes' set on it does not add
+     * the prefix to the content of the to/reply-to fields for TemporaryQueues.
+     */
+    @Test(timeout = 2000)
+    public void testSendMessageWithTemporaryQueueDestinationsOnConnectionWithDestinationPrefixes()
throws Exception {
+        Class<? extends Destination> destType = TemporaryQueue.class;
+        String destPrefix = "q12321-";
+        String destName = null;
+        String destAddress = "temp-queue://myTempQueue";
+        Byte annotationValue = AmqpDestinationHelper.TEMP_QUEUE_TYPE;
+
+        doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress,
annotationValue);
+    }
+
+    /**
+     * Tests that the a connection with 'destination prefixes' set on it does not add
+     * the prefix to the content of the to/reply-to fields for TemporaryTopics.
+     */
+    @Test(timeout = 2000)
+    public void testSendMessageWithTemporaryTopicDestinationsOnConnectionWithDestinationPrefixes()
throws Exception {
+        Class<? extends Destination> destType = TemporaryTopic.class;
+        String destPrefix = "q12321-";
+        String destName = null;
+        String destAddress = "temp-topic://myTempTopic";
+        Byte annotationValue = AmqpDestinationHelper.TEMP_TOPIC_TYPE;
+
+        doSendMessageOnConnectionWithPrefixTestImpl(destType, destPrefix, destName, destAddress,
annotationValue);
+    }
+
+    private void doSendMessageOnConnectionWithPrefixTestImpl(Class<? extends Destination>
destType,
+                                                             String destPrefix,
+                                                             String destName,
+                                                             String destAddress,
+                                                             Byte destTypeAnnotationValue)
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) {
+            Connection connection = null;
+            if (destType == Topic.class) {
+                connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix="
+ destPrefix);
+            } else if (destType == Queue.class) {
+                connection = testFixture.establishConnecton(testPeer, "?jms.queuePrefix="
+ destPrefix);
+            } else {
+                // Set both the non-temporary prefixes, we wont use non-temp dests but want
to ensure they don't affect anything
+                connection = testFixture.establishConnecton(testPeer, "?jms.topicPrefix="
+ destPrefix + "&jms.queuePrefix=" + destPrefix);
+            }
+
+            connection.start();
+
+            // Set the prefix if Topic or Queue dest type.
+            if (destType == Topic.class) {
+                ((JmsConnection) connection).setTopicPrefix(destPrefix);
+            } else if (destType == Queue.class) {
+                ((JmsConnection) connection).setQueuePrefix(destPrefix);
+            }
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create the destination
+            Destination dest = null;
+            if (destType == Topic.class) {
+                dest = session.createTopic(destName);
+            } else if (destType == Queue.class) {
+                dest = session.createQueue(destName);
+            } else if (destType == TemporaryTopic.class) {
+                // TODO:add method to expect temp topic creation
+                testPeer.expectTempQueueCreationAttach(destAddress);
+                dest = session.createTemporaryTopic();
+            } else if (destType == TemporaryQueue.class) {
+                testPeer.expectTempQueueCreationAttach(destAddress);
+                dest = session.createTemporaryQueue();
+            }
+
+            TargetMatcher targetMatcher = new TargetMatcher();
+            targetMatcher.withAddress(equalTo(destAddress));
+
+            testPeer.expectSenderAttach(targetMatcher, false, false);
+
+            MessageProducer producer = session.createProducer(dest);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.AMQP_TO_ANNOTATION),
equalTo(destTypeAnnotationValue));
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION),
equalTo(destTypeAnnotationValue));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
+            propsMatcher.withTo(equalTo(destAddress));
+            propsMatcher.withReplyTo(equalTo(destAddress));
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+
+            //TODO: currently we aren't sending any body section, decide if this is allowed
+            //messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createMessage();
+            message.setJMSReplyTo(dest);
+
+            producer.send(message);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     // --- byte type annotation values --- //
 
     /**
@@ -535,7 +673,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase
             Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_TO_ANNOTATION);
             msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE));
 
-            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(topicName));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo("topic://"
+ topicName));
 
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(headersMatcher);
@@ -574,7 +712,7 @@ public class MessageIntegrationTest extends QpidJmsTestCase
             Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.AMQP_REPLY_TO_ANNOTATION);
             msgAnnotationsMatcher.withEntry(annotationKey, equalTo(AmqpDestinationHelper.TOPIC_TYPE));
 
-            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo(replyTopicName));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withReplyTo(equalTo("topic://"
+ replyTopicName));
 
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(headersMatcher);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9bd7e592/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
index 2f516de..852199f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelperTest.java
@@ -487,6 +487,8 @@ public class AmqpDestinationHelperTest {
         String testAddress = "testAddress";
         JmsDestination destination = new JmsQueue("testAddress");
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
 
         helper.setToAddressFromDestination(message, destination);
 
@@ -499,6 +501,8 @@ public class AmqpDestinationHelperTest {
         String testAddress = "testAddress";
         JmsDestination destination = new JmsTopic("testAddress");
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
 
         helper.setToAddressFromDestination(message, destination);
 
@@ -569,6 +573,8 @@ public class AmqpDestinationHelperTest {
         String testAddress = "testAddress";
         JmsDestination destination = new JmsQueue("testAddress");
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
 
         helper.setReplyToAddressFromDestination(message, destination);
 
@@ -581,6 +587,8 @@ public class AmqpDestinationHelperTest {
         String testAddress = "testAddress";
         JmsDestination destination = new JmsTopic("testAddress");
         AmqpJmsMessageFacade message = Mockito.mock(AmqpJmsMessageFacade.class);
+        AmqpConnection conn = Mockito.mock(AmqpConnection.class);
+        Mockito.when(message.getConnection()).thenReturn(conn);
 
         helper.setReplyToAddressFromDestination(message, destination);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message