activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-5622
Date Mon, 02 Mar 2015 21:12:50 GMT
Repository: activemq
Updated Branches:
  refs/heads/master bb83bf574 -> 4fe2bd534


https://issues.apache.org/jira/browse/AMQ-5622

Fix some issues with STOMP v1.2 protocol support.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4fe2bd53
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4fe2bd53
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4fe2bd53

Branch: refs/heads/master
Commit: 4fe2bd534a6b8b256790ce148dbad21f704e86e5
Parents: bb83bf5
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Mar 2 16:12:44 2015 -0500
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Mar 2 16:12:44 2015 -0500

----------------------------------------------------------------------
 .../transport/stomp/ProtocolConverter.java      |   6 +-
 .../activemq/transport/stomp/Stomp12Test.java   | 107 ++++++++++++++++++-
 2 files changed, 106 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4fe2bd53/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
index d366962..7ec53b1 100644
--- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
+++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
@@ -557,7 +557,7 @@ public class ProtocolConverter {
         String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
         String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
 
-        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
             throw new ProtocolException("SUBSCRIBE received without a subscription id!");
         }
 
@@ -675,7 +675,7 @@ public class ProtocolConverter {
         }
 
         String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
-        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+        if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
             throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
         }
 
@@ -686,7 +686,7 @@ public class ProtocolConverter {
         // check if it is a durable subscription
         String durable = command.getHeaders().get("activemq.subscriptionName");
         String clientId = durable;
-        if (this.version.equals(Stomp.V1_1)) {
+        if (!this.version.equals(Stomp.V1_0)) {
             clientId = connectionInfo.getClientId();
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/4fe2bd53/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
----------------------------------------------------------------------
diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
index 1ccdbd8..cec51ee 100644
--- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
+++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import javax.jms.Connection;
 
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -319,9 +320,13 @@ public class Stomp12Test extends StompTestSupport {
 
         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
-        try {
-            Thread.sleep(400);
-        } catch (InterruptedException e){}
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
 
         // reconnect and send some messages to the offline subscribers and then try to get
         // them after subscribing again.
@@ -411,7 +416,7 @@ public class Stomp12Test extends StompTestSupport {
         assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
 
         String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
-                       "receipt:1" + "id:12345\n\n" + Stomp.NULL;
+                       "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
         stompConnection.sendFrame(unsub);
 
         StompFrame stompFrame = stompConnection.receive();
@@ -466,4 +471,98 @@ public class Stomp12Test extends StompTestSupport {
         String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
         stompConnection.sendFrame(frame);
     }
+
+    @Test(timeout = 60000)
+    public void testDurableSubAndUnSub() throws Exception {
+        BrokerViewMBean view = getProxyToBroker();
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.2\n" +
+                              "host:localhost\n" +
+                              "client-id:durableSubTest\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+        // subscribe to destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("1", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        stompConnection.close();
+        Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() <= 1;
+            }
+        }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
+
+        stompConnect();
+        stompConnection.sendFrame(connectFrame);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+        // unsubscribe from topic
+        frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "1\n" +
+                "id:durablesub-1\n" + "receipt:3\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("3", receipt.getHeaders().get("receipt-id"));
+
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test(timeout = 60000)
+    public void testSubscribeWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.2\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
 }


Mime
View raw message