activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [22/34] activemq-artemis git commit: Implemented MaxConsumers DeleteOnNoConsumers for Queues
Date Tue, 01 Nov 2016 10:21:51 GMT
Implemented MaxConsumers DeleteOnNoConsumers for Queues


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/ec1762b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/ec1762b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/ec1762b1

Branch: refs/heads/ARTEMIS-780
Commit: ec1762b1c37d78289580d6a4190319812df32bac
Parents: 19ce2fb
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue Nov 1 10:19:55 2016 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Tue Nov 1 10:20:52 2016 +0000

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQExceptionType.java |   8 +-
 .../ActiveMQQueueMaxConsumerLimitReached.java   |  31 ++++++
 .../core/ServerSessionPacketHandler.java        |   8 ++
 .../core/server/ActiveMQMessageBundle.java      |   3 +
 .../artemis/core/server/ActiveMQServer.java     |  26 ++++-
 .../activemq/artemis/core/server/Queue.java     |   2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  50 +++++++---
 .../core/server/impl/LastValueQueue.java        |   4 +-
 .../server/impl/PostOfficeJournalLoader.java    |   4 +-
 .../core/server/impl/QueueFactoryImpl.java      |   6 +-
 .../artemis/core/server/impl/QueueImpl.java     |  63 ++++++++++++
 .../core/server/impl/ServerConsumerImpl.java    |   1 +
 .../impl/ScheduledDeliveryHandlerTest.java      |  10 ++
 .../integration/addressing/AddressingTest.java  | 100 ++++++++++++++++---
 .../integration/client/HangConsumerTest.java    |   8 +-
 .../unit/core/postoffice/impl/FakeQueue.java    |  10 ++
 16 files changed, 290 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 752574a..0221562 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -213,7 +213,13 @@ public enum ActiveMQExceptionType {
       }
 
    },
-   NOT_IMPLEMTNED_EXCEPTION(213);
+   NOT_IMPLEMTNED_EXCEPTION(213),
+   MAX_CONSUMER_LIMIT_EXCEEDED(214) {
+      @Override
+      public ActiveMQException createException(String msg) {
+         return new ActiveMQQueueMaxConsumerLimitReached(msg);
+      }
+   };
 
    private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
new file mode 100644
index 0000000..0577e08
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQQueueMaxConsumerLimitReached.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because a queue exists on the server.
+ */
+public final class ActiveMQQueueMaxConsumerLimitReached extends ActiveMQException {
+
+   public ActiveMQQueueMaxConsumerLimitReached() {
+      super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED);
+   }
+
+   public ActiveMQQueueMaxConsumerLimitReached(String msg) {
+      super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index b52534c..2a45f29 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
 import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
 import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
@@ -494,6 +495,13 @@ public class ServerSessionPacketHandler implements ChannelHandler {
             } else {
                ActiveMQServerLogger.LOGGER.caughtXaException(e);
             }
+         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
+            if (requiresResponse) {
+               logger.debug("Sending exception to client", e);
+               response = new ActiveMQExceptionMessage(e);
+            } else {
+               ActiveMQServerLogger.LOGGER.caughtException(e);
+            }
          } catch (ActiveMQException e) {
             if (requiresResponse) {
                logger.debug("Sending exception to client", e);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index f22873b..769d183 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -32,6 +32,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionExcep
 import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
@@ -378,4 +379,6 @@ public interface ActiveMQMessageBundle {
    @Message(id = 119119, value = "Disk Capacity is Low, cannot produce more messages.")
    ActiveMQIOErrorException diskBeyondLimit();
 
+   @Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})",
format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address,
SimpleString queueName);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 25db8c6..ba2a1c7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -297,6 +297,14 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    Queue createQueue(SimpleString address,
                      SimpleString queueName,
+                     SimpleString filterString,
+                     boolean durable,
+                     boolean temporary,
+                     Integer maxConsumers,
+                     Boolean deleteOnNoConsumers) throws Exception;
+
+   Queue createQueue(SimpleString address,
+                     SimpleString queueName,
                      SimpleString filter,
                      SimpleString user,
                      boolean durable,
@@ -389,10 +397,22 @@ public interface ActiveMQServer extends ActiveMQComponent {
 
    AddressInfo getAddressInfo(SimpleString address);
 
+   Queue createQueue(SimpleString addressName,
+                     SimpleString queueName,
+                     SimpleString filterString,
+                     SimpleString user,
+                     boolean durable,
+                     boolean temporary,
+                     boolean ignoreIfExists,
+                     boolean transientQueue,
+                     boolean autoCreated,
+                     Integer maxConsumers,
+                     Boolean deleteOnNoConsumers) throws Exception;
+
    /*
-      * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols
is tur then this factory will
-      * replace any factories with the same protocol
-      * */
+         * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols
is tur then this factory will
+         * replace any factories with the same protocol
+         * */
    void addProtocolManagerFactory(ProtocolManagerFactory factory);
 
    /*

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 270e0cd..2b845d5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -48,7 +48,7 @@ public interface Queue extends Bindable {
 
    boolean isDeleteOnNoConsumers();
 
-   boolean getMaxConsumers();
+   int getMaxConsumers();
 
    void addConsumer(Consumer consumer) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index c6f5b66..3bda134 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1428,6 +1428,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    public Queue createQueue(final SimpleString address,
                             final SimpleString queueName,
                             final SimpleString filterString,
+                            final boolean durable,
+                            final boolean temporary,
+                            final Integer maxConsumers,
+                            final Boolean deleteOnNoConsumers) throws Exception {
+      return createQueue(address, queueName, filterString, null, durable, temporary, false,
false, false, maxConsumers, deleteOnNoConsumers);
+   }
+
+   @Override
+   public Queue createQueue(final SimpleString address,
+                            final SimpleString queueName,
+                            final SimpleString filterString,
                             final SimpleString user,
                             final boolean durable,
                             final boolean temporary) throws Exception {
@@ -2256,17 +2267,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                          null);
    }
 
-   private Queue createQueue(final SimpleString addressName,
-                             final SimpleString queueName,
-                             final SimpleString filterString,
-                             final SimpleString user,
-                             final boolean durable,
-                             final boolean temporary,
-                             final boolean ignoreIfExists,
-                             final boolean transientQueue,
-                             final boolean autoCreated,
-                             final Integer maxConsumers,
-                             final Boolean deleteOnNoConsumers) throws Exception {
+   @Override
+   public Queue createQueue(final SimpleString addressName,
+                            final SimpleString queueName,
+                            final SimpleString filterString,
+                            final SimpleString user,
+                            final boolean durable,
+                            final boolean temporary,
+                            final boolean ignoreIfExists,
+                            final boolean transientQueue,
+                            final boolean autoCreated,
+                            final Integer maxConsumers,
+                            final Boolean deleteOnNoConsumers) throws Exception {
 
       final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName);
       if (binding != null) {
@@ -2292,8 +2304,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          address = addressName;
       }
 
+
+      AddressInfo defaultAddressInfo = new AddressInfo(address);
       // FIXME This boils down to a putIfAbsent (avoids race).  This should be reflected
in the API.
-      AddressInfo info = postOffice.addAddressInfo(new AddressInfo(address));
+      AddressInfo info = postOffice.addAddressInfo(defaultAddressInfo);
+
+      boolean addressExists = true;
+      if (info == null) {
+         info = defaultAddressInfo;
+         addressExists = false;
+      }
 
       final boolean isDeleteOnNoConsumers = deleteOnNoConsumers == null ? info.isDefaultDeleteOnNoConsumers()
: deleteOnNoConsumers;
       final int noMaxConsumers = maxConsumers == null ? info.getDefaultMaxConsumers() : maxConsumers;
@@ -2318,10 +2338,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       final QueueBinding localQueueBinding = new LocalQueueBinding(getAddressInfo(queue.getAddress()),
queue, nodeManager.getNodeId());
 
       if (queue.isDurable()) {
-         storageManager.addQueueBinding(txID, localQueueBinding);
-         if (info == null) {
-            storageManager.addAddressBinding(txID, getAddressInfo(queue.getAddress()));
+         if (!addressExists) {
+            storageManager.addAddressBinding(txID, getAddressInfo(address));
          }
+         storageManager.addQueueBinding(txID, localQueueBinding);
       }
 
       try {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 453f588..a4fa5dc 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -56,12 +56,14 @@ public class LastValueQueue extends QueueImpl {
                          final boolean durable,
                          final boolean temporary,
                          final boolean autoCreated,
+                         final Integer maxConsumers,
+                         final Boolean deleteOnNoConsumers,
                          final ScheduledExecutorService scheduledExecutor,
                          final PostOffice postOffice,
                          final StorageManager storageManager,
                          final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                          final Executor executor) {
-      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary,
autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary,
autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executor);
       new Exception("LastValueQeue " + this).toString();
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 9bd14f0..6f4cf03 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -148,8 +148,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
             .durable(true)
             .temporary(false)
             .autoCreated(queueBindingInfo.isAutoCreated())
-            .de
-            );
+            .deleteOnNoConsumers(queueBindingInfo.isDeleteOnNoConsumers())
+            .maxConsumers(queueBindingInfo.getMaxConsumers());
          final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
          if (queue.isAutoCreated()) {
             queue.setConsumersRefCount(new AutoCreatedQueueManagerImpl(((PostOfficeImpl)
postOffice).getServer().getJMSQueueDeleter(), queueBindingInfo.getQueueName()));

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index 3678553..bcc7c79 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
       final AddressSettings addressSettings = addressSettingsRepository.getMatch(config.address().toString());
       final Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(),
config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(),
config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executorFactory.getExecutor());
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(),
config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(),
config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(),
config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executorFactory.getExecutor());
       }
       return queue;
    }
@@ -101,7 +101,7 @@ public class QueueFactoryImpl implements QueueFactory {
 
       Queue queue;
       if (addressSettings.isLastValueQueue()) {
-         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription,
user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
executorFactory.getExecutor());
+         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription,
user, durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executorFactory.getExecutor());
       } else {
          queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user,
durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
executorFactory.getExecutor());
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 56a33ef..2246c8e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -53,6 +54,7 @@ import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
 import org.apache.activemq.artemis.core.paging.cursor.PagedReference;
 import org.apache.activemq.artemis.core.persistence.QueueStatus;
 import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.AddressManager;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.postoffice.Bindings;
 import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
@@ -238,6 +240,14 @@ public class QueueImpl implements Queue {
 
    private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
 
+   private int maxConsumers;
+
+   private boolean deleteOnNoConsumers;
+
+   private final AddressInfo addressInfo;
+
+   private final AtomicInteger noConsumers = new AtomicInteger(0);
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -334,10 +344,32 @@ public class QueueImpl implements Queue {
                     final StorageManager storageManager,
                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                     final Executor executor) {
+      this(id, address, name, filter, null, user, durable, temporary, autoCreated, null,
null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+   }
+
+   public QueueImpl(final long id,
+                    final SimpleString address,
+                    final SimpleString name,
+                    final Filter filter,
+                    final PageSubscription pageSubscription,
+                    final SimpleString user,
+                    final boolean durable,
+                    final boolean temporary,
+                    final boolean autoCreated,
+                    final Integer maxConsumers,
+                    final Boolean deleteOnNoConsumers,
+                    final ScheduledExecutorService scheduledExecutor,
+                    final PostOffice postOffice,
+                    final StorageManager storageManager,
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final Executor executor) {
+
       this.id = id;
 
       this.address = address;
 
+      this.addressInfo = postOffice.getAddressInfo(address);
+
       this.name = name;
 
       this.filter = filter;
@@ -350,6 +382,10 @@ public class QueueImpl implements Queue {
 
       this.autoCreated = autoCreated;
 
+      this.maxConsumers = maxConsumers == null ? addressInfo.getDefaultMaxConsumers() : maxConsumers;
+
+      this.deleteOnNoConsumers = deleteOnNoConsumers == null ? addressInfo.isDefaultDeleteOnNoConsumers()
: deleteOnNoConsumers;
+
       this.postOffice = postOffice;
 
       this.storageManager = storageManager;
@@ -437,6 +473,16 @@ public class QueueImpl implements Queue {
    }
 
    @Override
+   public boolean isDeleteOnNoConsumers() {
+      return deleteOnNoConsumers;
+   }
+
+   @Override
+   public int getMaxConsumers() {
+      return maxConsumers;
+   }
+
+   @Override
    public SimpleString getName() {
       return name;
    }
@@ -709,6 +755,11 @@ public class QueueImpl implements Queue {
       }
 
       synchronized (this) {
+
+         if (maxConsumers != -1 && noConsumers.get() >= maxConsumers) {
+            throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name);
+         }
+
          flushDeliveriesInTransit();
 
          consumersChanged = true;
@@ -722,6 +773,8 @@ public class QueueImpl implements Queue {
          if (refCountForConsumers != null) {
             refCountForConsumers.increment();
          }
+
+         noConsumers.incrementAndGet();
       }
 
    }
@@ -770,6 +823,15 @@ public class QueueImpl implements Queue {
          if (refCountForConsumers != null) {
             refCountForConsumers.decrement();
          }
+
+         if (noConsumers.decrementAndGet() == 0 && deleteOnNoConsumers) {
+            try {
+               deleteQueue();
+            }
+            catch (Exception e) {
+               logger.error("Error deleting queue on no consumers.  " + this.toString(),
e);
+            }
+         }
       }
    }
 
@@ -1361,6 +1423,7 @@ public class QueueImpl implements Queue {
    @Override
    public void deleteQueue(boolean removeConsumers) throws Exception {
       synchronized (this) {
+         if (this.queueDestroyed) return;
          this.queueDestroyed = true;
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 98a9c84..389b07e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -205,6 +205,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener
{
 
       this.creationTime = System.currentTimeMillis();
 
+
       if (browseOnly) {
          browserDeliverer = new BrowserDeliverer(messageQueue.browserIterator());
       } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 55a287a..11b11ab 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -901,6 +901,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public boolean isDeleteOnNoConsumers() {
+         return false;
+      }
+
+      @Override
+      public int getMaxConsumers() {
+         return -1;
+      }
+
+      @Override
       public void addConsumer(Consumer consumer) throws Exception {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
index 03739e9..a21a62b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/addressing/AddressingTest.java
@@ -19,8 +19,11 @@ package org.apache.activemq.artemis.tests.integration.addressing;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
@@ -187,8 +190,6 @@ public class AddressingTest extends ActiveMQTestBase {
       assertEquals(0, count);
    }
 
-
-
    @Test
    public void testMulticastRoutingBackwardsCompat() throws Exception {
 
@@ -222,34 +223,103 @@ public class AddressingTest extends ActiveMQTestBase {
       }
    }
 
-   @Ignore
    @Test
-   public void testDeleteQueueOnNoConsumersTrue() {
-      fail("Not Implemented");
+   public void testDeleteQueueOnNoConsumersTrue() throws Exception {
+
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+      // For each address, create 2 Queues with the same address, assert both queues receive
message
+      boolean deleteOnNoConsumers = true;
+      Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
+
+      ClientSession session = sessionFactory.createSession();
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      consumer1.close();
+
+      assertFalse(server.queueQuery(queueName).isExists());
    }
 
-   @Ignore
    @Test
-   public void testDeleteQueueOnNoConsumersFalse() {
-      fail("Not Implemented");
+   public void testDeleteQueueOnNoConsumersFalse() throws Exception {
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+      // For each address, create 2 Queues with the same address, assert both queues receive
message
+      boolean deleteOnNoConsumers = false;
+      Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
+
+      ClientSession session = sessionFactory.createSession();
+      session.start();
+
+      ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      consumer1.close();
+
+      assertTrue(server.queueQuery(queueName).isExists());
    }
 
-   @Ignore
    @Test
-   public void testLimitOnMaxConsumers() {
-      fail("Not Implemented");
+   public void testLimitOnMaxConsumers() throws Exception {
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+      // For each address, create 2 Queues with the same address, assert both queues receive
message
+      boolean deleteOnNoConsumers = false;
+      Queue q1 = server.createQueue(address, queueName, null, true, false, 0, deleteOnNoConsumers);
+
+      Exception expectedException = null;
+      String expectedMessage = "Maximum Consumer Limit Reached on Queue";
+      try {
+         ClientSession session = sessionFactory.createSession();
+         session.start();
+
+         ClientConsumer consumer1 = session.createConsumer(q1.getName());
+      }
+      catch (ActiveMQQueueMaxConsumerLimitReached e) {
+         expectedException = e;
+      }
+
+      assertNotNull(expectedException);
+      assertTrue(expectedException.getMessage().contains(expectedMessage));
+      assertTrue(expectedException.getMessage().contains(address));
+      assertTrue(expectedException.getMessage().contains(queueName));
    }
 
    @Ignore
    @Test
-   public void testUnlimitedMaxConsumers() {
-      fail("Not Implemented");
+   public void testUnlimitedMaxConsumers() throws Exception {
+      int noConsumers = 50;
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+      // For each address, create 2 Queues with the same address, assert both queues receive
message
+      boolean deleteOnNoConsumers = false;
+      Queue q1 = server.createQueue(address, queueName, null, true, false, -1, deleteOnNoConsumers);
+
+      ClientSession session = sessionFactory.createSession();
+      session.start();
+
+      for (int i = 0; i < noConsumers; i++) {
+         session.createConsumer(q1.getName());
+      }
    }
 
    @Ignore
    @Test
-   public void testDefaultMaxConsumersFromAddress() {
-      fail("Not Implemented");
+   public void testDefaultMaxConsumersFromAddress() throws Exception {
+      int noConsumers = 50;
+      SimpleString address = new SimpleString("test.address");
+      SimpleString queueName = SimpleString.toSimpleString(UUID.randomUUID().toString());
+      // For each address, create 2 Queues with the same address, assert both queues receive
message
+      boolean deleteOnNoConsumers = false;
+      AddressInfo addressInfo = new AddressInfo(address);
+      addressInfo.setDefaultMaxConsumers(0);
+      Queue q1 = server.createQueue(address, queueName, null, true, false, null, deleteOnNoConsumers);
+
+      ClientSession session = sessionFactory.createSession();
+      session.start();
+
+      for (int i = 0; i < noConsumers; i++) {
+         session.createConsumer(q1.getName());
+      }
    }
 
    @Ignore

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 2fd5915..124ece3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -224,12 +224,14 @@ public class HangConsumerTest extends ActiveMQTestBase {
                              final boolean durable,
                              final boolean temporary,
                              final boolean autoCreated,
+                             final Integer maxConsumers,
+                             final Boolean deleteOnNoConsumers,
                              final ScheduledExecutorService scheduledExecutor,
                              final PostOffice postOffice,
                              final StorageManager storageManager,
                              final HierarchicalRepository<AddressSettings> addressSettingsRepository,
                              final Executor executor) {
-            super(id, address, name, filter, pageSubscription, user, durable, temporary,
autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor);
+            super(id, address, name, filter, pageSubscription, user, durable, temporary,
autoCreated, maxConsumers, deleteOnNoConsumers, scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executor);
          }
 
          @Override
@@ -256,7 +258,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
 
          @Override
          public Queue createQueueWith(final QueueConfig config) {
-            queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(),
config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(),
config.isAutoCreated(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
executorFactory.getExecutor());
+            queue = new MyQueueWithBlocking(config.id(), config.address(), config.name(),
config.filter(), config.user(), config.pageSubscription(), config.isDurable(), config.isTemporary(),
config.isAutoCreated(), config.maxConsumers(), config.isDeleteOnNoConsumers(), scheduledExecutor,
postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor());
             return queue;
          }
 
@@ -271,7 +273,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                   final boolean durable,
                                   final boolean temporary,
                                   final boolean autoCreated) {
-            queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription,
durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository,
executorFactory.getExecutor());
+            queue = new MyQueueWithBlocking(persistenceID, address, name, filter, user, pageSubscription,
durable, temporary, autoCreated, null, null, scheduledExecutor, postOffice, storageManager,
addressSettingsRepository, executorFactory.getExecutor());
             return queue;
          }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec1762b1/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 9a20d70..ef5c05e 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -443,6 +443,16 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public boolean isDeleteOnNoConsumers() {
+      return false;
+   }
+
+   @Override
+   public int getMaxConsumers() {
+      return -1;
+   }
+
+   @Override
    public LinkedListIterator<MessageReference> iterator() {
       // no-op
       return null;


Mime
View raw message