activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-723 - AMQP subscriptions aren't deleted properly
Date Mon, 12 Sep 2016 09:33:59 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master abed0cd5b -> 1bcac5c36


ARTEMIS-723 - AMQP subscriptions aren't deleted properly

https://issues.apache.org/jira/browse/ARTEMIS-723


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/cdb0391c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/cdb0391c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/cdb0391c

Branch: refs/heads/master
Commit: cdb0391c1c2b7c0e51f7bbfe0e9ec27306a735b4
Parents: abed0cd
Author: Andy Taylor <andy.tayls67@gmail.com>
Authored: Fri Sep 9 10:36:01 2016 +0100
Committer: Andy Taylor <andy.tayls67@gmail.com>
Committed: Mon Sep 12 10:27:49 2016 +0100

----------------------------------------------------------------------
 .../plug/ProtonSessionIntegrationCallback.java  |   2 +-
 .../server/ProtonServerSenderContext.java       |  10 +-
 .../tests/integration/proton/ProtonTest.java    | 113 +++++++++++++++++++
 3 files changed, 123 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cdb0391c/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
index c3ac671..e422a34 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java
@@ -195,7 +195,7 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback,
Se
 
    @Override
    public void createTemporaryQueue(String address, String queueName, String filter) throws
Exception {
-      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),
SimpleString.toSimpleString(filter), false, true);
+      serverSession.createQueue(SimpleString.toSimpleString(address), SimpleString.toSimpleString(queueName),
SimpleString.toSimpleString(filter), true, false);
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cdb0391c/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
index 78b1668..739f8e8 100644
--- a/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java
@@ -272,7 +272,6 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
    @Override
    public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
       super.close(remoteLinkClose);
-
       try {
          sessionSPI.closeSender(brokerConsumer);
          //if this is a link close rather than a connection close or detach, we need to delete
any durable resources for
@@ -285,6 +284,15 @@ public class ProtonServerSenderContext extends AbstractProtonContextSender
imple
                if (exists) {
                   sessionSPI.deleteQueue(address);
                }
+               else {
+                  String clientId = connection.getRemoteContainer();
+                  String pubId = sender.getName();
+                  String queue = clientId + ":" + pubId;
+                  exists = sessionSPI.queueQuery(queue);
+                  if (exists) {
+                     sessionSPI.deleteQueue(queue);
+                  }
+               }
             }
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/cdb0391c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
index 8da5aa2..b3d9a5f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/proton/ProtonTest.java
@@ -39,6 +39,9 @@ import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TemporaryQueue;
 import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 import java.io.IOException;
 import java.io.Serializable;
 import java.lang.reflect.Field;
@@ -56,6 +59,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.postoffice.Bindings;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -184,6 +189,62 @@ public class ProtonTest extends ProtonTestBase {
    }
 
    @Test
+   public void testDurableSubscriptionUnsubscribe() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      Connection connection = createConnection("myClientId");
+      try {
+         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic("amqp_testtopic");
+         TopicSubscriber myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+         session.close();
+         connection.close();
+         connection = createConnection("myClientId");
+         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         myDurSub = session.createDurableSubscriber(topic, "myDurSub");
+         myDurSub.close();
+         Assert.assertNotNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+         session.unsubscribe("myDurSub");
+         Assert.assertNull(server.getPostOffice().getBinding(new SimpleString("myClientId:myDurSub")));
+         session.close();
+         connection.close();
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test
+   public void testTemporarySubscriptionDeleted() throws Exception {
+      if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
+      try {
+         TopicSession session = (TopicSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Topic topic = session.createTopic("amqp_testtopic");
+         TopicSubscriber myDurSub = session.createSubscriber(topic);
+         Bindings bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
+         Assert.assertEquals(2, bindingsForAddress.getBindings().size());
+         session.close();
+         final CountDownLatch latch = new CountDownLatch(1);
+         server.getRemotingService().getConnections().iterator().next().addCloseListener(new
CloseListener() {
+            @Override
+            public void connectionClosed() {
+               latch.countDown();
+            }
+         });
+         connection.close();
+         latch.await(5, TimeUnit.SECONDS);
+         bindingsForAddress = server.getPostOffice().getBindingsForAddress(new SimpleString("amqp_testtopic"));
+         Assert.assertEquals(1, bindingsForAddress.getBindings().size());
+      }
+      finally {
+         if (connection != null) {
+            connection.close();
+         }
+      }
+   }
+
+   @Test
    public void testBrokerContainerId() throws Exception {
       if (protocol != 0 && protocol != 3) return; // Only run this test for AMQP
protocol
 
@@ -1530,6 +1591,58 @@ public class ProtonTest extends ProtonTestBase {
       return connection;
    }
 
+   private javax.jms.Connection createConnection(String clientId) throws JMSException {
+      Connection connection;
+      if (protocol == 3) {
+         factory = new JmsConnectionFactory(amqpConnectionUri);
+         connection = factory.createConnection();
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         connection.setClientID(clientId);
+         connection.start();
+      }
+      else if (protocol == 0) {
+         factory = new JmsConnectionFactory(userName, password, amqpConnectionUri);
+         connection = factory.createConnection();
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         connection.setClientID(clientId);
+         connection.start();
+      }
+      else {
+         TransportConfiguration transport;
+
+         if (protocol == 1) {
+            transport = new TransportConfiguration(INVM_CONNECTOR_FACTORY);
+            factory = new ActiveMQConnectionFactory("vm:/0");
+         }
+         else {
+            factory = new ActiveMQConnectionFactory();
+         }
+
+         connection = factory.createConnection(userName, password);
+         connection.setClientID(clientId);
+         connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+               exception.printStackTrace();
+            }
+         });
+         connection.start();
+      }
+
+      return connection;
+   }
+
+
    private void setAddressFullBlockPolicy() {
       // For BLOCK tests
       AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch("#");


Mime
View raw message