activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2629 ensure queue auto-delete after expiration
Date Fri, 06 Mar 2020 02:36:09 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 766f88c  ARTEMIS-2629 ensure queue auto-delete after expiration
     new 32829d6  This closes #2992
766f88c is described below

commit 766f88c22af032fdcae08c0f525f73d528b4a162
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Fri Feb 21 17:48:16 2020 -0600

    ARTEMIS-2629 ensure queue auto-delete after expiration
---
 .../core/postoffice/impl/PostOfficeImpl.java       |  6 +++-
 .../artemis/core/server/impl/QueueImpl.java        | 10 ++++++
 .../artemis/core/server/impl/QueueManagerImpl.java |  9 +++--
 .../integration/client/AutoDeleteQueueTest.java    | 38 +++++++++++++++++++++-
 4 files changed, 56 insertions(+), 7 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index bbebfae..f3f0c5d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -1735,7 +1735,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
       @Override
       public void run() {
          for (Queue queue : getLocalQueues()) {
-            if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue)
&& QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue)
&& QueueManagerImpl.messageCountCheck(queue)) {
+            if (!queue.isInternalQueue() && QueueManagerImpl.isAutoDelete(queue)
&& QueueManagerImpl.consumerCountCheck(queue) && QueueManagerImpl.delayCheck(queue)
&& QueueManagerImpl.messageCountCheck(queue) && queueWasUsed(queue)) {
                QueueManagerImpl.performAutoDeleteQueue(server, queue);
             }
          }
@@ -1760,6 +1760,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener,
Binding
             }
          }
       }
+
+      private boolean queueWasUsed(Queue queue) {
+         return queue.getMessagesExpired() > 0 || queue.getMessagesAcknowledged() >
0 || queue.getMessagesKilled() > 0 || queue.getConsumerRemovedTimestamp() != -1;
+      }
    }
 
    private List<Queue> getLocalQueues() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 53b701e..9ce735d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1859,6 +1859,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
          acknowledge(ref, AckReason.EXPIRED, consumer);
       }
 
+      // potentially auto-delete this queue if this expired the last message
+      refCountForConsumers.check();
+
       if (server != null && server.hasBrokerMessagePlugins()) {
          final SimpleString expiryAddress = messageExpiryAddress;
          server.callBrokerMessagePlugins(plugin -> plugin.messageExpired(ref, expiryAddress,
consumer));
@@ -3366,6 +3369,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
          expiryLogger.addExpiry(address, ref);
       }
 
+      // potentially auto-delete this queue if this expired the last message
+      tx.addOperation(new TransactionOperationAbstract() {
+         @Override
+         public void afterCommit(Transaction tx) {
+            refCountForConsumers.check();
+         }
+      });
    }
 
    private class ExpiryLogger extends TransactionOperationAbstract {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
index b20b537..549686f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueManagerImpl.java
@@ -35,7 +35,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
       //the queue may already have been deleted and this is a result of that
       if (queue == null) {
          if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-            ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + ".\"");
+            ActiveMQServerLogger.LOGGER.debug("no queue to delete \"" + queueName + "\".");
          }
          return;
       }
@@ -52,7 +52,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
       long messageCount = queue.getMessageCount();
 
       if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + ".\" consumerCount
= " + consumerCount + "; messageCount = " + messageCount);
+         ActiveMQServerLogger.LOGGER.debug("purging queue \"" + queue.getName() + "\": consumerCount
= " + consumerCount + "; messageCount = " + messageCount);
       }
       try {
          queue.deleteMatchingReferences(QueueImpl.DEFAULT_FLUSH_LIMIT, null, AckReason.KILLED);
@@ -65,7 +65,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
       SimpleString queueName = queue.getName();
       AddressSettings settings = server.getAddressSettingsRepository().getMatch(queue.getAddress().toString());
       if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
-         ActiveMQServerLogger.LOGGER.info("deleting auto-created queue \"" + queueName +
".\" consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount()
+ "; isAutoDelete = " + queue.isAutoDelete());
+         ActiveMQServerLogger.LOGGER.debug("deleting auto-created queue \"" + queueName +
"\": consumerCount = " + queue.getConsumerCount() + "; messageCount = " + queue.getMessageCount()
+ "; isAutoDelete = " + queue.isAutoDelete());
       }
 
       try {
@@ -84,8 +84,7 @@ public class QueueManagerImpl extends ReferenceCounterUtil implements QueueManag
    }
 
    public static boolean delayCheck(Queue queue) {
-      long consumerRemovedTimestamp =  queue.getConsumerRemovedTimestamp();
-      return consumerRemovedTimestamp != -1 && System.currentTimeMillis() - consumerRemovedTimestamp
>= queue.getAutoDeleteDelay();
+      return System.currentTimeMillis() - queue.getConsumerRemovedTimestamp() >= queue.getAutoDeleteDelay();
    }
 
    public static boolean consumerCountCheck(Queue queue) {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
index 8603e7b..5bce2cf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/AutoDeleteQueueTest.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.artemis.tests.integration.client;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -42,13 +44,15 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
       super.setUp();
       locator = createInVMNonHALocator();
       server = createServer(false);
+      server.getConfiguration().setAddressQueueScanPeriod(500);
+      server.getConfiguration().setMessageExpiryScanPeriod(500);
 
       server.start();
       cf = createSessionFactory(locator);
    }
 
    @Test
-   public void testAutoDeleteAutoCreatedQueue() throws Exception {
+   public void testAutoDeleteAutoCreatedQueueOnLastConsumerClose() throws Exception {
       // auto-delete-queues defaults to true
       server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false,
false, false, true, 1, false, true);
       assertNotNull(server.locateQueue(queueA));
@@ -57,6 +61,30 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testAutoDeleteAutoCreatedQueueOnLastMessageRemovedWithoutConsumer() throws
Exception {
+      // auto-delete-queues defaults to true
+      server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false,
false, false, true, 1, false, true);
+      assertNotNull(server.locateQueue(queueA));
+      ClientSession session = cf.createSession();
+      ClientProducer producer = session.createProducer(addressA);
+      producer.send(session.createMessage(true));
+      Wait.assertEquals(1, server.locateQueue(queueA)::getMessageCount);
+      server.locateQueue(queueA).deleteAllReferences();
+      Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
+   }
+
+   @Test
+   public void testAutoDeleteAutoCreatedQueueOnLastMessageExpired() throws Exception {
+      // auto-delete-queues defaults to true
+      server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false,
false, false, true, 1, false, true);
+      assertNotNull(server.locateQueue(queueA));
+      ClientSession session = cf.createSession();
+      ClientProducer producer = session.createProducer(addressA);
+      producer.send(session.createMessage(true).setExpiration(System.currentTimeMillis()));
+      Wait.assertTrue(() -> server.locateQueue(queueA) == null, 2000, 100);
+   }
+
+   @Test
    public void testNegativeAutoDeleteAutoCreatedQueue() throws Exception {
       server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings().setAutoDeleteQueues(false));
       server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false,
false, false, true, 1, false, true);
@@ -64,4 +92,12 @@ public class AutoDeleteQueueTest extends ActiveMQTestBase {
       cf.createSession().createConsumer(queueA).close();
       assertNotNull(server.locateQueue(queueA));
    }
+
+   @Test
+   public void testNegativeAutoDeleteAutoCreatedQueue2() throws Exception {
+      server.getAddressSettingsRepository().addMatch(addressA.toString(), new AddressSettings());
+      server.createQueue(addressA, RoutingType.ANYCAST, queueA, null, null, true, false,
false, false, true, 1, false, true);
+      assertNotNull(server.locateQueue(queueA));
+      assertFalse(Wait.waitFor(() -> server.locateQueue(queueA) == null, 5000, 100));
+   }
 }


Mime
View raw message