activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [3/3] activemq git commit: AMQ-6030 Add support for composite destinations to STOMP. (cherry picked from commit c360c3e4a38b51f72b109a095a648433754acc2d)
Date Fri, 13 Nov 2015 16:40:55 GMT
AMQ-6030  Add support for composite destinations to STOMP.
(cherry picked from commit c360c3e4a38b51f72b109a095a648433754acc2d)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/eccbd871
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/eccbd871
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/eccbd871

Branch: refs/heads/activemq-5.12.x
Commit: eccbd87156c141194313e54a8a1bc24c206e6723
Parents: 6643458
Author: Timothy Bish <tabish121@gmail.com>
Authored: Fri Oct 30 11:02:27 2015 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Fri Nov 13 11:36:58 2015 -0500

----------------------------------------------------------------------
 activemq-stomp/pom.xml                          |   5 +
 .../transport/stomp/LegacyFrameTranslator.java  | 105 ++++---
 .../transport/stomp/ProtocolConverter.java      |   2 +-
 .../stomp/LegacyFrameTranslatorTest.java        | 225 ++++++++++++++
 .../stomp/StompCompositeDestinationTest.java    | 295 +++++++++++++++++++
 5 files changed, 596 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/eccbd871/activemq-stomp/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-stomp/pom.xml b/activemq-stomp/pom.xml
index f2f9ab3..3a837e0 100755
--- a/activemq-stomp/pom.xml
+++ b/activemq-stomp/pom.xml
@@ -85,6 +85,11 @@
       <scope>test</scope>
     </dependency>
     <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/activemq/blob/eccbd871/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
index 8cfa121..013c1ec 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
@@ -16,25 +16,31 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
 import org.apache.activemq.command.ActiveMQBytesMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTextMessage;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements ActiveMQ 4.0 translations
  */
 public class LegacyFrameTranslator implements FrameTranslator {
 
+    private static final Logger LOG = LoggerFactory.getLogger(LegacyFrameTranslator.class);
+
+    @Override
     public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command)
throws JMSException, ProtocolException {
         final Map<?, ?> headers = command.getHeaders();
         final ActiveMQMessage msg;
@@ -87,6 +93,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
         return msg;
     }
 
+    @Override
     public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message)
throws IOException, JMSException {
         StompFrame command = new StompFrame();
         command.setAction(Stomp.Responses.MESSAGE);
@@ -126,6 +133,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
         return command;
     }
 
+    @Override
     public String convertDestination(ProtocolConverter converter, Destination d) {
         if (d == null) {
             return null;
@@ -156,6 +164,7 @@ public class LegacyFrameTranslator implements FrameTranslator {
         return buffer.toString();
     }
 
+    @Override
     public ActiveMQDestination convertDestination(ProtocolConverter converter, String name,
boolean forceFallback) throws ProtocolException {
         if (name == null) {
             return null;
@@ -166,38 +175,64 @@ public class LegacyFrameTranslator implements FrameTranslator {
         String originalName = name;
         name = name.trim();
 
-        if (name.startsWith("/queue/")) {
-            String qName = name.substring("/queue/".length(), name.length());
-            return ActiveMQDestination.createDestination(qName, ActiveMQDestination.QUEUE_TYPE);
-        } else if (name.startsWith("/topic/")) {
-            String tName = name.substring("/topic/".length(), name.length());
-            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TOPIC_TYPE);
-        } else if (name.startsWith("/remote-temp-queue/")) {
-            String tName = name.substring("/remote-temp-queue/".length(), name.length());
-            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_QUEUE_TYPE);
-        } else if (name.startsWith("/remote-temp-topic/")) {
-            String tName = name.substring("/remote-temp-topic/".length(), name.length());
-            return ActiveMQDestination.createDestination(tName, ActiveMQDestination.TEMP_TOPIC_TYPE);
-        } else if (name.startsWith("/temp-queue/")) {
-            return converter.createTempDestination(name, false);
-        } else if (name.startsWith("/temp-topic/")) {
-            return converter.createTempDestination(name, true);
-        } else {
-            if (forceFallback) {
-                try {
-                    ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(originalName);
-                    if (fallback != null) {
-                        return fallback;
+        String[] destinations = name.split(",");
+        if (destinations == null || destinations.length == 0) {
+            destinations = new String[] { name };
+        }
+
+        StringBuilder destinationBuilder = new StringBuilder();
+        for (int i = 0; i < destinations.length; ++i) {
+            String destinationName = destinations[i];
+
+            if (destinationName.startsWith("/queue/")) {
+                destinationName = destinationName.substring("/queue/".length(), destinationName.length());
+                destinationBuilder.append(ActiveMQDestination.QUEUE_QUALIFIED_PREFIX + destinationName);
+            } else if (destinationName.startsWith("/topic/")) {
+                destinationName = destinationName.substring("/topic/".length(), destinationName.length());
+                destinationBuilder.append(ActiveMQDestination.TOPIC_QUALIFIED_PREFIX + destinationName);
+            } else if (destinationName.startsWith("/remote-temp-queue/")) {
+                destinationName = destinationName.substring("/remote-temp-queue/".length(),
destinationName.length());
+                destinationBuilder.append(ActiveMQDestination.TEMP_QUEUE_QUALIFED_PREFIX
+ destinationName);
+            } else if (destinationName.startsWith("/remote-temp-topic/")) {
+                destinationName = destinationName.substring("/remote-temp-topic/".length(),
destinationName.length());
+                destinationBuilder.append(ActiveMQDestination.TEMP_TOPIC_QUALIFED_PREFIX
+ destinationName);
+            } else if (destinationName.startsWith("/temp-queue/")) {
+                ActiveMQDestination converted = converter.createTempDestination(destinationName,
false);
+                destinationBuilder.append(converted.getQualifiedName());
+            } else if (destinationName.startsWith("/temp-topic/")) {
+                ActiveMQDestination converted = converter.createTempDestination(destinationName,
true);
+                destinationBuilder.append(converted.getQualifiedName());
+            } else {
+                if (forceFallback) {
+                    String fallbackName = destinationName;
+                    if (destinationName.length() == 1) {
+                        // Use the original non-trimmed name instead
+                        fallbackName = originalName;
+                    }
+
+                    try {
+                        ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(fallbackName);
+                        if (fallback != null) {
+                            destinationBuilder.append(fallback.getQualifiedName());
+                            continue;
+                        }
+                    } catch (JMSException e) {
+                        throw new ProtocolException("Illegal destination name: [" + fallbackName
+ "] -- ActiveMQ STOMP destinations "
+                                + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/",
false, e);
                     }
-                } catch (JMSException e) {
-                    throw new ProtocolException("Illegal destination name: [" + originalName
+ "] -- ActiveMQ STOMP destinations "
-                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/",
false, e);
                 }
+
+                throw new ProtocolException("Illegal destination name: [" + originalName
+ "] -- ActiveMQ STOMP destinations "
+                                            + "must begin with one of: /queue/ /topic/ /temp-queue/
/temp-topic/");
+            }
+
+            if (i < destinations.length - 1) {
+                destinationBuilder.append(",");
             }
-            throw new ProtocolException("Illegal destination name: [" + originalName + "]
-- ActiveMQ STOMP destinations "
-                                        + "must begin with one of: /queue/ /topic/ /temp-queue/
/temp-topic/");
         }
-    }
 
+        LOG.trace("New Composite Destination name: {}", destinationBuilder);
 
+        return ActiveMQDestination.createDestination(destinationBuilder.toString(), ActiveMQDestination.QUEUE_TYPE);
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/eccbd871/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index cfeff9e..bcf4714 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -600,7 +600,7 @@ public class ProtocolConverter {
             throw new ProtocolException("Invalid Subscription: cannot durably subscribe to
a Queue destination!");
         }
 
-        consumerInfo.setDestination(translator.convertDestination(this, destination, true));
+        consumerInfo.setDestination(actualDest);
 
         StompSubscription stompSubscription;
         if (!consumerInfo.isBrowser()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/eccbd871/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java
new file mode 100644
index 0000000..85b51db
--- /dev/null
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/LegacyFrameTranslatorTest.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.UUID;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Tests for conversion capabilities of LegacyFrameTranslator
+ */
+public class LegacyFrameTranslatorTest {
+
+    private ProtocolConverter converter;
+    private LegacyFrameTranslator translator;
+
+    @Before
+    public void setUp() {
+        converter = Mockito.mock(ProtocolConverter.class);
+
+        // Stub out a temp destination creation
+        Mockito.when(converter.createTempDestination(Mockito.anyString(), Mockito.anyBoolean())).thenAnswer(new
Answer<ActiveMQDestination>() {
+
+            @Override
+            public ActiveMQDestination answer(InvocationOnMock invocation) throws Throwable
{
+
+                String name = invocation.getArgumentAt(0, String.class);
+                boolean topic = invocation.getArgumentAt(1, Boolean.class);
+
+                name = "temp-" + (topic ? "topic://" : "queue://X:") + UUID.randomUUID().toString();
+
+                return ActiveMQDestination.createDestination(name, ActiveMQDestination.QUEUE_TYPE);
+            }
+        });
+
+        translator = new LegacyFrameTranslator();
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertQueue() throws Exception {
+        ActiveMQDestination destination = translator.convertDestination(converter, "/queue/test",
false);
+
+        assertFalse(destination.isComposite());
+        assertEquals("test", destination.getPhysicalName());
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertTopic() throws Exception {
+        ActiveMQDestination destination = translator.convertDestination(converter, "/topic/test",
false);
+
+        assertFalse(destination.isComposite());
+        assertEquals("test", destination.getPhysicalName());
+        assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertTemporaryQueue() throws Exception {
+        ActiveMQDestination destination = translator.convertDestination(converter, "/temp-queue/test",
false);
+
+        assertFalse(destination.isComposite());
+        assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertTemporaryTopic() throws Exception {
+        ActiveMQDestination destination = translator.convertDestination(converter, "/temp-topic/test",
false);
+
+        assertFalse(destination.isComposite());
+        assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertRemoteTempQueue() throws Exception {
+        ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-queue/test",
false);
+
+        assertFalse(destination.isComposite());
+        assertEquals("test", destination.getPhysicalName());
+        assertEquals(ActiveMQDestination.TEMP_QUEUE_TYPE, destination.getDestinationType());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertRemoteTempTopic() throws Exception {
+        ActiveMQDestination destination = translator.convertDestination(converter, "/remote-temp-topic/test",
false);
+
+        assertFalse(destination.isComposite());
+        assertEquals("test", destination.getPhysicalName());
+        assertEquals(ActiveMQDestination.TEMP_TOPIC_TYPE, destination.getDestinationType());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertCompositeQueues() throws Exception {
+        String destinationA = "destinationA";
+        String destinationB = "destinationB";
+
+        String composite = "/queue/" + destinationA + ",/queue/" + destinationB;
+
+        ActiveMQDestination destination = translator.convertDestination(converter, composite,
false);
+
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+        assertTrue(destination.isComposite());
+        ActiveMQDestination[] composites = destination.getCompositeDestinations();
+        assertEquals(2, composites.length);
+
+        Arrays.sort(composites);
+
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType());
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[1].getDestinationType());
+
+        assertEquals(destinationA, composites[0].getPhysicalName());
+        assertEquals(destinationB, composites[1].getPhysicalName());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertCompositeTopics() throws Exception {
+        String destinationA = "destinationA";
+        String destinationB = "destinationB";
+
+        String composite = "/topic/" + destinationA + ",/topic/" + destinationB;
+
+        ActiveMQDestination destination = translator.convertDestination(converter, composite,
false);
+
+        assertEquals(ActiveMQDestination.TOPIC_TYPE, destination.getDestinationType());
+        assertTrue(destination.isComposite());
+        ActiveMQDestination[] composites = destination.getCompositeDestinations();
+        assertEquals(2, composites.length);
+
+        Arrays.sort(composites);
+
+        assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[0].getDestinationType());
+        assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType());
+
+        assertEquals(destinationA, composites[0].getPhysicalName());
+        assertEquals(destinationB, composites[1].getPhysicalName());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertCompositeQueueAndTopic() throws Exception {
+        String destinationA = "destinationA";
+        String destinationB = "destinationB";
+
+        String composite = "/queue/" + destinationA + ",/topic/" + destinationB;
+
+        ActiveMQDestination destination = translator.convertDestination(converter, composite,
false);
+
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+        assertTrue(destination.isComposite());
+        ActiveMQDestination[] composites = destination.getCompositeDestinations();
+        assertEquals(2, composites.length);
+
+        Arrays.sort(composites);
+
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, composites[0].getDestinationType());
+        assertEquals(ActiveMQDestination.TOPIC_TYPE, composites[1].getDestinationType());
+
+        assertEquals(destinationA, composites[0].getPhysicalName());
+        assertEquals(destinationB, composites[1].getPhysicalName());
+    }
+
+    @Test(timeout = 10000)
+    public void testConvertCompositeMixture() throws Exception {
+        String destinationA = "destinationA";
+        String destinationB = "destinationB";
+        String destinationC = "destinationC";
+        String destinationD = "destinationD";
+
+        String composite = "/queue/" + destinationA + ",/topic/" + destinationB +
+                           ",/temp-queue/" + destinationC + ",/temp-topic/" + destinationD;
+
+        ActiveMQDestination destination = translator.convertDestination(converter, composite,
false);
+
+        assertEquals(ActiveMQDestination.QUEUE_TYPE, destination.getDestinationType());
+        assertTrue(destination.isComposite());
+        ActiveMQDestination[] composites = destination.getCompositeDestinations();
+        assertEquals(4, composites.length);
+
+        Arrays.sort(composites);
+
+        boolean foundQueue = false;
+        boolean foundTopic = false;
+        boolean foundTempTopic = false;
+        boolean foundTempQueue = false;
+
+        for (ActiveMQDestination dest : composites) {
+            if (dest.getDestinationType() == ActiveMQDestination.QUEUE_TYPE) {
+                foundQueue = true;
+            } else if (dest.getDestinationType() == ActiveMQDestination.TOPIC_TYPE) {
+                foundTopic = true;
+            } else if (dest.getDestinationType() == ActiveMQDestination.TEMP_TOPIC_TYPE)
{
+                foundTempTopic = true;
+            } else if (dest.getDestinationType() == ActiveMQDestination.TEMP_QUEUE_TYPE)
{
+                foundTempQueue = true;
+            }
+        }
+
+        assertTrue(foundQueue);
+        assertTrue(foundTopic);
+        assertTrue(foundTempTopic);
+        assertTrue(foundTempQueue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/eccbd871/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
new file mode 100644
index 0000000..223a84a
--- /dev/null
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompCompositeDestinationTest.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for support of composite destination support over STOMP
+ */
+public class StompCompositeDestinationTest extends StompTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StompCompositeDestinationTest.class);
+
+    protected ActiveMQConnection connection;
+
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            connection.close();
+        } catch (Exception ex) {}
+
+        super.tearDown();
+    }
+
+    @Test(timeout = 20000)
+    public void testSubscribeToCompositeQueue() throws Exception {
+        stompConnect();
+
+        String destinationA = "StompA";
+        String destinationB = "StompB";
+
+        String frame = "CONNECT\n" +
+                       "login:system\n" +
+                       "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
+
+        frame = "SUBSCRIBE\n" +
+                "destination:/queue/" + destinationA + ",/queue/" + destinationB + "\n" +
+                "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        // Test in same order as the subscribe command
+
+        sendMessage(destinationA, false);
+        sendMessage(destinationB, false);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        // Test the reverse ordering
+
+        sendMessage(destinationB, false);
+        sendMessage(destinationA, false);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        stompConnection.disconnect();
+    }
+
+    @Test(timeout = 20000)
+    public void testSubscribeToCompositeQueueTrailersDefault() throws Exception {
+        stompConnect();
+
+        String destinationA = "StompA";
+        String destinationB = "StompB";
+
+        String frame = "CONNECT\n" +
+                       "login:system\n" +
+                       "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
+
+        frame = "SUBSCRIBE\n" +
+                "destination:/queue/" + destinationA + "," + destinationB + "\n" +
+                "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        // Test in same order as the subscribe command
+
+        sendMessage(destinationA, false);
+        sendMessage(destinationB, false);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        // Test the reverse ordering
+
+        sendMessage(destinationB, false);
+        sendMessage(destinationA, false);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        stompConnection.disconnect();
+    }
+
+    @Test(timeout = 20000)
+    public void testSubscribeToCompositeTopics() throws Exception {
+        stompConnect();
+
+        String destinationA = "StompA";
+        String destinationB = "StompB";
+
+        String frame = "CONNECT\n" +
+                       "login:system\n" +
+                       "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        LOG.info("Subscribing to destination: {},{}", destinationA, destinationB);
+
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + destinationA + ",/topic/" + destinationB + "\n" +
+                "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        // Test in same order as the subscribe command
+
+        sendMessage(destinationA, true);
+        sendMessage(destinationB, true);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        // Test the reverse ordering
+
+        sendMessage(destinationB, true);
+        sendMessage(destinationA, true);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("MESSAGE"));
+
+        stompConnection.disconnect();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToCompositeQueue() throws Exception {
+        stompConnect();
+
+        String destinationA = "StompA";
+        String destinationB = "StompB";
+
+        String frame = "CONNECT\n" +
+                       "login:system\n" +
+                       "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" +
+                "destination:/queue/" + destinationA + ",/queue/" + destinationB +
+                "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+        assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getQueues().length == 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150)));
+
+        QueueViewMBean viewOfA = getProxyToQueue(destinationA);
+        QueueViewMBean viewOfB = getProxyToQueue(destinationB);
+
+        assertNotNull(viewOfA);
+        assertNotNull(viewOfB);
+
+        assertEquals(1, viewOfA.getQueueSize());
+        assertEquals(1, viewOfB.getQueueSize());
+
+        stompConnection.disconnect();
+    }
+
+    @Test(timeout = 60000)
+    public void testSendMessageToCompositeTopic() throws Exception {
+        stompConnect();
+
+        String destinationA = "StompA";
+        String destinationB = "StompB";
+
+        String frame = "CONNECT\n" +
+                       "login:system\n" +
+                       "passcode:manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" +
+                "destination:/topic/" + destinationA + ",/topic/" + destinationB +
+                "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        final BrokerViewMBean brokerView = getProxyToBroker();
+        assertTrue("Should be two destinations for the dispatch", Wait.waitFor(new Wait.Condition()
{
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerView.getTopics().length == 2;
+            }
+        }, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(150)));
+
+        TopicViewMBean viewOfA = getProxyToTopic(destinationA);
+        TopicViewMBean viewOfB = getProxyToTopic(destinationB);
+
+        assertNotNull(viewOfA);
+        assertNotNull(viewOfB);
+
+        assertEquals(1, viewOfA.getEnqueueCount());
+        assertEquals(1, viewOfB.getEnqueueCount());
+
+        stompConnection.disconnect();
+    }
+
+    private void sendMessage(String destinationName, boolean topic) throws JMSException {
+        Connection connection = cf.createConnection("system", "manager");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = null;
+
+        if (topic) {
+            destination = session.createTopic(destinationName);
+        } else {
+            destination = session.createQueue(destinationName);
+        }
+
+        MessageProducer producer = session.createProducer(destination);
+        TextMessage message = session.createTextMessage("test");
+        producer.send(message);
+
+        connection.close();
+    }
+}


Mime
View raw message