activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andytay...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-548 Stomp durable sub unsubscrbe
Date Mon, 25 Jul 2016 15:23:30 GMT
ARTEMIS-548 Stomp durable sub unsubscrbe

Implement ability for Stomp clients to unsubscribe durable
subscriptions.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c4a7ddf9
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c4a7ddf9
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c4a7ddf9

Branch: refs/heads/master
Commit: c4a7ddf9d6588b4a403932d2b69ffc1bfdba490d
Parents: 6f86e51
Author: jbertram <jbertram@apache.org>
Authored: Thu Jul 21 14:10:04 2016 -0500
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Mon Jul 25 16:23:06 2016 +0100

----------------------------------------------------------------------
 .../artemis/core/protocol/stomp/Stomp.java      |  6 ++
 .../core/protocol/stomp/StompConnection.java    |  4 +-
 .../protocol/stomp/StompProtocolManager.java    |  5 +-
 .../core/protocol/stomp/StompSession.java       | 26 ++++---
 .../stomp/VersionedStompFrameHandler.java       |  3 +
 .../stomp/v10/StompFrameHandlerV10.java         |  7 +-
 .../stomp/v11/StompFrameHandlerV11.java         |  9 ++-
 .../tests/integration/stomp/StompTest.java      | 54 +++++++++++++++
 .../integration/stomp/v11/StompV11Test.java     | 66 +++++++++++++-----
 .../integration/stomp/v12/StompV12Test.java     | 73 +++++++++++++++-----
 10 files changed, 198 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
index 8e8acb3..badcc1a 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/Stomp.java
@@ -139,8 +139,11 @@ public interface Stomp {
 
          String SELECTOR = "selector";
 
+         @Deprecated
          String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
 
+         String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
+
          String NO_LOCAL = "no-local";
 
          public interface AckModeValues {
@@ -159,7 +162,10 @@ public interface Stomp {
 
          String ID = "id";
 
+         @Deprecated
          String DURABLE_SUBSCRIBER_NAME = "durable-subscriber-name";
+
+         String DURABLE_SUBSCRIPTION_NAME = "durable-subscription-name";
       }
 
       public interface Connect {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 7ab2750..07a85b1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -655,9 +655,9 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   public void unsubscribe(String subscriptionID, String durableSubscriberName) throws ActiveMQStompException
{
+   public void unsubscribe(String subscriptionID, String durableSubscriptionName) throws
ActiveMQStompException {
       try {
-         manager.unsubscribe(this, subscriptionID, durableSubscriberName);
+         manager.unsubscribe(this, subscriptionID, durableSubscriptionName);
       }
       catch (ActiveMQStompException e) {
          throw e;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 2c8751c..9c92fd1 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -385,15 +385,14 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
             ". Either use unique subscription IDs or do not create multiple subscriptions
for the same destination");
       }
       long consumerID = server.getStorageManager().generateID();
-      String clientID = (connection.getClientID() != null) ? connection.getClientID() : null;
-      stompSession.addSubscription(consumerID, subscriptionID, clientID, durableSubscriptionName,
destination, selector, ack);
+      stompSession.addSubscription(consumerID, subscriptionID, connection.getClientID(),
durableSubscriptionName, destination, selector, ack);
    }
 
    public void unsubscribe(StompConnection connection,
                            String subscriptionID,
                            String durableSubscriberName) throws Exception {
       StompSession stompSession = getSession(connection);
-      boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName);
+      boolean unsubscribed = stompSession.unsubscribe(subscriptionID, durableSubscriberName,
connection.getClientID());
       if (!unsubscribed) {
          throw new ActiveMQStompException(connection, "Cannot unsubscribe as no subscription
exists for id: " + subscriptionID);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index 8db5720..2a853a0 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -306,8 +306,10 @@ public class StompSession implements SessionCallback {
       session.start();
    }
 
-   public boolean unsubscribe(String id, String durableSubscriptionName) throws Exception
{
+   public boolean unsubscribe(String id, String durableSubscriptionName, String clientID)
throws Exception {
+      boolean result = false;
       Iterator<Entry<Long, StompSubscription>> iterator = subscriptions.entrySet().iterator();
+
       while (iterator.hasNext()) {
          Map.Entry<Long, StompSubscription> entry = iterator.next();
          long consumerID = entry.getKey();
@@ -315,21 +317,25 @@ public class StompSession implements SessionCallback {
          if (id != null && id.equals(sub.getID())) {
             iterator.remove();
             session.closeConsumer(consumerID);
-            SimpleString queueName;
-            if (durableSubscriptionName != null && durableSubscriptionName.trim().length()
!= 0) {
-               queueName = SimpleString.toSimpleString(id + "." + durableSubscriptionName);
-            }
-            else {
-               queueName = SimpleString.toSimpleString(id);
-            }
+            SimpleString queueName = SimpleString.toSimpleString(id);
             QueueQueryResult query = session.executeQueueQuery(queueName);
             if (query.isExists()) {
                session.deleteQueue(queueName);
             }
-            return true;
+            result = true;
          }
       }
-      return false;
+
+      if (!result && durableSubscriptionName != null && clientID != null)
{
+         SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
+         QueueQueryResult query = session.executeQueueQuery(queueName);
+         if (query.isExists()) {
+            session.deleteQueue(queueName);
+         }
+         result = true;
+      }
+
+      return result;
    }
 
    boolean containsSubscription(String subscriptionID) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 5da8574..185f81f 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -251,6 +251,9 @@ public abstract class VersionedStompFrameHandler {
       String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
       String id = request.getHeader(Stomp.Headers.Subscribe.ID);
       String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
+      }
       boolean noLocal = false;
 
       if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
index 8c76f6f..25db3b0 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
@@ -92,7 +92,10 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
       StompFrame response = null;
       String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
       String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
-      String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
+      String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
+      }
 
       String subscriptionID = null;
       if (id != null) {
@@ -108,7 +111,7 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
       }
 
       try {
-         connection.unsubscribe(subscriptionID, durableSubscriberName);
+         connection.unsubscribe(subscriptionID, durableSubscriptionName);
       }
       catch (ActiveMQStompException e) {
          return e.getFrame();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
index f9f60b9..d17fd82 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v11/StompFrameHandlerV11.java
@@ -153,19 +153,22 @@ public class StompFrameHandlerV11 extends VersionedStompFrameHandler
implements
       StompFrame response = null;
       //unsubscribe in 1.1 only needs id header
       String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
-      String durableSubscriberName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
+      String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
+      if (durableSubscriptionName == null) {
+         durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
+      }
 
       String subscriptionID = null;
       if (id != null) {
          subscriptionID = id;
       }
-      else {
+      else if (durableSubscriptionName == null) {
          response = BUNDLE.needSubscriptionID().setHandler(this).getFrame();
          return response;
       }
 
       try {
-         connection.unsubscribe(subscriptionID, durableSubscriberName);
+         connection.unsubscribe(subscriptionID, durableSubscriptionName);
       }
       catch (ActiveMQStompException e) {
          response = e.getFrame();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 4a6324a..67293be 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1433,6 +1433,60 @@ public class StompTest extends StompTestBase {
    }
 
    @Test
+   public void testDurableUnSubscribe() throws Exception {
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id:
myclientid\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(1000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
+         getTopicPrefix() +
+         getTopicName() +
+         "\n" +
+         "receipt: 12\n" +
+         "durable-subscriber-name: " +
+         getName() +
+         "\n" +
+         "\n\n" +
+         Stomp.NULL;
+      sendFrame(subscribeFrame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(1000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      waitForFrameToTakeEffect();
+
+      assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid."
+ getName())));
+
+      reconnect(100);
+      frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n"
+ Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(1000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
+         getTopicPrefix() +
+         getTopicName() +
+         "\n" +
+         "durable-subscriber-name: " +
+         getName() +
+         "\n" +
+         "\n\n" +
+         Stomp.NULL;
+      sendFrame(unsubscribeFrame);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+      waitForFrameToTakeEffect();
+
+      assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid."
+ getName())));
+   }
+
+   @Test
    public void testSubscribeToTopicWithNoLocal() throws Exception {
 
       String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 2cba55e..1b87376 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
@@ -48,6 +49,7 @@ import org.junit.Test;
 public class StompV11Test extends StompV11TestBase {
 
    private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+   public static final String CLIENT_ID = "myclientid";
 
    private StompClientConnection connV11;
 
@@ -1333,7 +1335,7 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testTwoSubscribers() throws Exception {
-      connV11.connect(defUser, defPass, "myclientid");
+      connV11.connect(defUser, defPass, CLIENT_ID);
 
       this.subscribeTopic(connV11, "sub1", "auto", null);
 
@@ -1535,7 +1537,7 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
-      connV11.connect(defUser, defPass, "myclientid");
+      connV11.connect(defUser, defPass, CLIENT_ID);
 
       this.subscribeTopic(connV11, "sub1", "auto", getName());
 
@@ -1553,7 +1555,7 @@ public class StompV11Test extends StompV11TestBase {
 
       connV11.destroy();
       connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      connV11.connect(defUser, defPass, "myclientid");
+      connV11.connect(defUser, defPass, CLIENT_ID);
 
       this.subscribeTopic(connV11, "sub1", "auto", getName());
 
@@ -1570,6 +1572,30 @@ public class StompV11Test extends StompV11TestBase {
    }
 
    @Test
+   public void testDurableUnSubscribe() throws Exception {
+      connV11.connect(defUser, defPass, CLIENT_ID);
+
+      this.subscribeTopic(connV11, null, "auto", getName());
+
+      connV11.disconnect();
+      connV11.destroy();
+      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      connV11.connect(defUser, defPass, CLIENT_ID);
+
+      this.unsubscribe(connV11, getName(), false, true);
+
+      long start = System.currentTimeMillis();
+      SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
+      while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis()
- start) < 5000) {
+         Thread.sleep(100);
+      }
+
+      assertNull(server.getActiveMQServer().locateQueue(queueName));
+
+      connV11.disconnect();
+   }
+
+   @Test
    public void testJMSXGroupIdCanBeSet() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
@@ -2364,8 +2390,10 @@ public class StompV11Test extends StompV11TestBase {
                                boolean receipt,
                                boolean noLocal) throws IOException, InterruptedException
{
       ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", subId);
       subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+      if (subId != null) {
+         subFrame.addHeader("id", subId);
+      }
       if (ack != null) {
          subFrame.addHeader("ack", ack);
       }
@@ -2386,18 +2414,14 @@ public class StompV11Test extends StompV11TestBase {
       }
    }
 
-   private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException {
-      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
-      subFrame.addHeader("id", subId);
-
-      conn.sendFrame(subFrame);
-   }
-
-   private void unsubscribe(StompClientConnection conn,
-                            String subId,
-                            boolean receipt) throws IOException, InterruptedException {
-      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
-      subFrame.addHeader("id", subId);
+   private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean
durable) throws IOException, InterruptedException {
+      ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
+      if (durable) {
+         subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId);
+      }
+      else {
+         subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId);
+      }
 
       if (receipt) {
          subFrame.addHeader("receipt", "4321");
@@ -2412,6 +2436,16 @@ public class StompV11Test extends StompV11TestBase {
       }
    }
 
+   private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException {
+      unsubscribe(conn, subId, false, false);
+   }
+
+   private void unsubscribe(StompClientConnection conn,
+                            String subId,
+                            boolean receipt) throws IOException, InterruptedException {
+      unsubscribe(conn, subId, receipt, false);
+   }
+
    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect)
throws Exception {
       connV11.connect(defUser, defPass);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c4a7ddf9/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index 9835c17..fe1e339 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -31,6 +31,9 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
@@ -38,7 +41,6 @@ import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConne
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV11;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionV12;
 import org.apache.activemq.artemis.tests.integration.stomp.v11.StompV11TestBase;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -50,6 +52,7 @@ import org.junit.Test;
 public class StompV12Test extends StompV11TestBase {
 
    private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+   public static final String CLIENT_ID = "myclientid";
 
    private StompClientConnectionV12 connV12;
 
@@ -1325,7 +1328,7 @@ public class StompV12Test extends StompV11TestBase {
 
    @Test
    public void testTwoSubscribers() throws Exception {
-      connV12.connect(defUser, defPass, "myclientid");
+      connV12.connect(defUser, defPass, CLIENT_ID);
 
       this.subscribeTopic(connV12, "sub1", "auto", null);
 
@@ -1529,7 +1532,7 @@ public class StompV12Test extends StompV11TestBase {
 
    @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
-      connV12.connect(defUser, defPass, "myclientid");
+      connV12.connect(defUser, defPass, CLIENT_ID);
 
       this.subscribeTopic(connV12, "sub1", "auto", getName());
 
@@ -1547,7 +1550,7 @@ public class StompV12Test extends StompV11TestBase {
 
       connV12.destroy();
       connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2",
hostname, port);
-      connV12.connect(defUser, defPass, "myclientid");
+      connV12.connect(defUser, defPass, CLIENT_ID);
 
       this.subscribeTopic(connV12, "sub1", "auto", getName());
 
@@ -1564,6 +1567,30 @@ public class StompV12Test extends StompV11TestBase {
    }
 
    @Test
+   public void testDurableUnSubscribe() throws Exception {
+      connV12.connect(defUser, defPass, CLIENT_ID);
+
+      this.subscribeTopic(connV12, null, "auto", getName());
+
+      connV12.disconnect();
+      connV12.destroy();
+      connV12 = (StompClientConnectionV12) StompClientConnectionFactory.createClientConnection("1.2",
hostname, port);
+      connV12.connect(defUser, defPass, CLIENT_ID);
+
+      this.unsubscribe(connV12, getName(), false, true);
+
+      long start = System.currentTimeMillis();
+      SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
+      while (server.getActiveMQServer().locateQueue(queueName) != null && (System.currentTimeMillis()
- start) < 5000) {
+         Thread.sleep(100);
+      }
+
+      assertNull(server.getActiveMQServer().locateQueue(queueName));
+
+      connV12.disconnect();
+   }
+
+   @Test
    public void testJMSXGroupIdCanBeSet() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
@@ -2403,8 +2430,10 @@ public class StompV12Test extends StompV11TestBase {
                                boolean receipt,
                                boolean noLocal) throws IOException, InterruptedException
{
       ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", subId);
       subFrame.addHeader("destination", getTopicPrefix() + getTopicName());
+      if (subId != null) {
+         subFrame.addHeader("id", subId);
+      }
       if (ack != null) {
          subFrame.addHeader("ack", ack);
       }
@@ -2425,18 +2454,14 @@ public class StompV12Test extends StompV11TestBase {
       }
    }
 
-   private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException {
-      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
-      subFrame.addHeader("id", subId);
-
-      conn.sendFrame(subFrame);
-   }
-
-   private void unsubscribe(StompClientConnection conn,
-                            String subId,
-                            boolean receipt) throws IOException, InterruptedException {
-      ClientStompFrame subFrame = conn.createFrame("UNSUBSCRIBE");
-      subFrame.addHeader("id", subId);
+   private void unsubscribe(StompClientConnection conn, String subId, boolean receipt, boolean
durable) throws IOException, InterruptedException {
+      ClientStompFrame subFrame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
+      if (durable) {
+         subFrame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subId);
+      }
+      else {
+         subFrame.addHeader(Stomp.Headers.Unsubscribe.ID, subId);
+      }
 
       if (receipt) {
          subFrame.addHeader("receipt", "4321");
@@ -2446,11 +2471,21 @@ public class StompV12Test extends StompV11TestBase {
 
       if (receipt) {
          System.out.println("response: " + f);
-         Assert.assertEquals("RECEIPT", f.getCommand());
-         Assert.assertEquals("4321", f.getHeader("receipt-id"));
+         assertEquals("RECEIPT", f.getCommand());
+         assertEquals("4321", f.getHeader("receipt-id"));
       }
    }
 
+   private void unsubscribe(StompClientConnection conn, String subId) throws IOException,
InterruptedException {
+      unsubscribe(conn, subId, false, false);
+   }
+
+   private void unsubscribe(StompClientConnection conn,
+                            String subId,
+                            boolean receipt) throws IOException, InterruptedException {
+      unsubscribe(conn, subId, receipt, false);
+   }
+
    protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect)
throws Exception {
       connV12.connect(defUser, defPass);
 


Mime
View raw message