activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-741 fix subscription queue leak on STOMP
Date Mon, 26 Sep 2016 22:03:22 GMT
ARTEMIS-741 fix subscription queue leak on STOMP


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2dcf8de0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2dcf8de0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2dcf8de0

Branch: refs/heads/master
Commit: 2dcf8de0decf7ef10c190825cc382ac0c601cc69
Parents: c86e41d
Author: jbertram <jbertram@apache.com>
Authored: Wed Sep 21 19:19:59 2016 -0500
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Sep 26 18:01:30 2016 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompSession.java       | 30 ++++---
 .../core/protocol/stomp/StompSubscription.java  | 29 ++++---
 .../tests/integration/stomp/StompTest.java      | 82 ++++++++++++++++++++
 3 files changed, 115 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2dcf8de0/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index ba0abbf..2596b15 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -36,7 +36,6 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMess
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.server.LargeServerMessage;
 import org.apache.activemq.artemis.core.server.MessageReference;
-import org.apache.activemq.artemis.core.server.QueueQueryResult;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.ServerSession;
@@ -276,7 +275,8 @@ public class StompSession implements SessionCallback {
                                String destination,
                                String selector,
                                String ack) throws Exception {
-      SimpleString queue = SimpleString.toSimpleString(destination);
+      SimpleString queueName = SimpleString.toSimpleString(destination);
+      boolean pubSub = false;
       int receiveCredits = consumerCredits;
       if (ack.equals(Stomp.Headers.Subscribe.AckModeValues.AUTO)) {
          receiveCredits = -1;
@@ -284,27 +284,27 @@ public class StompSession implements SessionCallback {
 
       if (destination.startsWith("jms.topic")) {
          // subscribes to a topic
+         pubSub = true;
          if (durableSubscriptionName != null) {
             if (clientID == null) {
                throw BUNDLE.missingClientID();
             }
-            queue = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
-            QueueQueryResult query = session.executeQueueQuery(queue);
-            if (!query.isExists()) {
-               session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector),
false, true);
+            queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
+            if (manager.getServer().locateQueue(queueName) == null) {
+               session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector),
false, true);
             }
          }
          else {
-            queue = UUIDGenerator.getInstance().generateSimpleStringUUID();
-            session.createQueue(SimpleString.toSimpleString(destination), queue, SimpleString.toSimpleString(selector),
true, false);
+            queueName = UUIDGenerator.getInstance().generateSimpleStringUUID();
+            session.createQueue(SimpleString.toSimpleString(destination), queueName, SimpleString.toSimpleString(selector),
true, false);
          }
-         ((ServerSessionImpl) session).createConsumer(consumerID, queue, null, false, false,
receiveCredits);
+         session.createConsumer(consumerID, queueName, null, false, false, receiveCredits);
       }
       else {
-         ((ServerSessionImpl) session).createConsumer(consumerID, queue, SimpleString.toSimpleString(selector),
false, false, receiveCredits);
+         session.createConsumer(consumerID, queueName, SimpleString.toSimpleString(selector),
false, false, receiveCredits);
       }
 
-      StompSubscription subscription = new StompSubscription(subscriptionID, ack);
+      StompSubscription subscription = new StompSubscription(subscriptionID, ack, queueName,
pubSub);
       subscriptions.put(consumerID, subscription);
 
       session.start();
@@ -320,10 +320,9 @@ public class StompSession implements SessionCallback {
          StompSubscription sub = entry.getValue();
          if (id != null && id.equals(sub.getID())) {
             iterator.remove();
+            SimpleString queueName = sub.getQueueName();
             session.closeConsumer(consumerID);
-            SimpleString queueName = SimpleString.toSimpleString(id);
-            QueueQueryResult query = session.executeQueueQuery(queueName);
-            if (query.isExists()) {
+            if (sub.isPubSub() && manager.getServer().locateQueue(queueName) != null)
{
                session.deleteQueue(queueName);
             }
             result = true;
@@ -332,8 +331,7 @@ public class StompSession implements SessionCallback {
 
       if (!result && durableSubscriptionName != null && clientID != null)
{
          SimpleString queueName = SimpleString.toSimpleString(clientID + "." + durableSubscriptionName);
-         QueueQueryResult query = session.executeQueueQuery(queueName);
-         if (query.isExists()) {
+         if (manager.getServer().locateQueue(queueName) != null) {
             session.deleteQueue(queueName);
          }
          result = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2dcf8de0/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
index 971af27..a1417ad 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSubscription.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.artemis.core.protocol.stomp;
 
+import org.apache.activemq.artemis.api.core.SimpleString;
+
 public class StompSubscription {
    // Constants -----------------------------------------------------
 
@@ -25,13 +27,20 @@ public class StompSubscription {
 
    private final String ack;
 
+   private final SimpleString queueName;
+
+   // whether or not this subscription follows publish/subscribe semantics (e.g. for a JMS
topic)
+   private final boolean pubSub;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public StompSubscription(String subID, String ack) {
+   public StompSubscription(String subID, String ack, SimpleString queueName, boolean pubSub)
{
       this.subID = subID;
       this.ack = ack;
+      this.queueName = queueName;
+      this.pubSub = pubSub;
    }
 
    // Public --------------------------------------------------------
@@ -44,17 +53,17 @@ public class StompSubscription {
       return subID;
    }
 
-   @Override
-   public String toString() {
-      return "StompSubscription[id=" + subID + ", ack=" + ack + "]";
+   public SimpleString getQueueName() {
+      return queueName;
    }
 
-   // Package protected ---------------------------------------------
-
-   // Protected -----------------------------------------------------
-
-   // Private -------------------------------------------------------
+   public boolean isPubSub() {
+      return pubSub;
+   }
 
-   // Inner classes -------------------------------------------------
+   @Override
+   public String toString() {
+      return "StompSubscription[id=" + subID + ", ack=" + ack + ", queueName=" + queueName
+ ", pubSub=" + pubSub + "]";
+   }
 
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2dcf8de0/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 1c92f42..951aa85 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1282,6 +1282,7 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testSubscribeToTopic() throws Exception {
+      final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
 
       String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
       sendFrame(frame);
@@ -1301,6 +1302,19 @@ public class StompTest extends StompTestBase {
       frame = receiveFrame(10000);
       Assert.assertTrue(frame.startsWith("RECEIPT"));
 
+      assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition()
{
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length
- baselineQueueCount == 1) {
+               return true;
+            }
+            else {
+               return false;
+            }
+         }
+      }, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
+
       sendMessage(getName(), topic);
 
       frame = receiveFrame(10000);
@@ -1326,6 +1340,74 @@ public class StompTest extends StompTestBase {
       log.info("Received frame: " + frame);
       Assert.assertNull("No message should have been received since subscription was removed",
frame);
 
+      assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length
- baselineQueueCount);
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
+
+   @Test
+   public void testSubscribeToQueue() throws Exception {
+      final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SUBSCRIBE\n" + "destination:" +
+         getQueuePrefix() +
+         getQueueName() +
+         "\n" +
+         "receipt: 12\n" +
+         "\n\n" +
+         Stomp.NULL;
+      sendFrame(frame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() {
+
+         @Override
+         public boolean isSatisfied() throws Exception {
+            if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length
- baselineQueueCount == 1) {
+               return true;
+            }
+            else {
+               return false;
+            }
+         }
+      }, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
+
+      sendMessage(getName(), queue);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+      frame = "UNSUBSCRIBE\n" + "destination:" +
+         getQueuePrefix() +
+         getQueueName() +
+         "\n" +
+         "receipt: 1234\n" +
+         "\n\n" +
+         Stomp.NULL;
+      sendFrame(frame);
+      // wait for UNSUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      sendMessage(getName(), queue);
+
+      frame = receiveFrame(1000);
+      log.info("Received frame: " + frame);
+      Assert.assertNull("No message should have been received since subscription was removed",
frame);
+
+      assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length);
+
       frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
       sendFrame(frame);
    }


Mime
View raw message