Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9F5C5200BC5 for ; Mon, 7 Nov 2016 17:36:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9E0F0160B21; Mon, 7 Nov 2016 16:36:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 93298160B18 for ; Mon, 7 Nov 2016 17:36:23 +0100 (CET) Received: (qmail 24858 invoked by uid 500); 7 Nov 2016 16:36:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 23552 invoked by uid 99); 7 Nov 2016 16:36:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 07 Nov 2016 16:36:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A6CD9F173F; Mon, 7 Nov 2016 16:36:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 07 Nov 2016 16:36:42 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [22/50] [abbrv] activemq-artemis git commit: Added MQTT DeleteOnNoConsumer Error archived-at: Mon, 07 Nov 2016 16:36:25 -0000 Added MQTT DeleteOnNoConsumer Error Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/31173e9b Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/31173e9b Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/31173e9b Branch: refs/heads/ARTEMIS-780 Commit: 31173e9b5821c007ef4305bd7380b688ebcfd808 Parents: 0772b54 Author: Martyn Taylor Authored: Tue Nov 1 15:15:32 2016 +0000 Committer: Clebert Suconic Committed: Mon Nov 7 11:28:07 2016 -0500 ---------------------------------------------------------------------- .../artemis/api/core/ActiveMQExceptionType.java | 6 ++++ .../core/ActiveMQInvalidQueueConfiguration.java | 31 ++++++++++++++++++++ .../protocol/mqtt/MQTTSubscriptionManager.java | 8 +++-- .../core/server/ActiveMQMessageBundle.java | 4 +++ .../artemis/core/server/ActiveMQServer.java | 19 ++++++++++++ .../artemis/core/server/ServerSession.java | 8 +++++ .../core/server/impl/ActiveMQServerImpl.java | 25 ++++++++++++++++ .../core/server/impl/ServerSessionImpl.java | 15 ++++++++-- .../integration/mqtt/imported/MQTTTest.java | 31 ++++++++++++++++++-- 9 files changed, 141 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java index 309a8c4..785dac3 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java @@ -225,6 +225,12 @@ public enum ActiveMQExceptionType { public ActiveMQException createException(String msg) { return new ActiveMQUnexpectedRoutingTypeForAddress(msg); } + }, + INVALID_QUEUE_CONFIGURATION(216) { + @Override + public ActiveMQException createException(String msg) { + return new ActiveMQInvalidQueueConfiguration(msg); + } }; private static final Map TYPE_MAP; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java new file mode 100644 index 0000000..521a266 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.api.core; + +/** + * An operation failed because a queue exists on the server. + */ +public final class ActiveMQInvalidQueueConfiguration extends ActiveMQException { + + public ActiveMQInvalidQueueConfiguration() { + super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION); + } + + public ActiveMQInvalidQueueConfiguration(String msg) { + super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION, msg); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index a264e88..1187db0 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -93,7 +93,11 @@ public class MQTTSubscriptionManager { Queue q = session.getServer().locateQueue(queue); if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0); + q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false); + } else { + if (q.isDeleteOnNoConsumers()) { + throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); + } } return q; } @@ -118,7 +122,7 @@ public class MQTTSubscriptionManager { String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress)); - if (addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) { + if (addressInfo != null && addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) { throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java index 9475461..6d8cf30 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java @@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIncompatibleClientServerException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException; +import org.apache.activemq.artemis.api.core.ActiveMQInvalidQueueConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; @@ -386,4 +387,7 @@ public interface ActiveMQMessageBundle { @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT) ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType); + + @Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}. Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT) + ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index 749969a..9b5578c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -320,8 +320,27 @@ public interface ActiveMQServer extends ActiveMQComponent { SimpleString user, boolean durable, boolean temporary, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, boolean autoCreated) throws Exception; + Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + Queue deployQueue(SimpleString address, SimpleString queueName, SimpleString filterString, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 0df5060..ab3898c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -185,6 +185,14 @@ public interface ServerSession extends SecurityAuth { boolean isClosed(); + Queue createQueue(SimpleString address, + SimpleString name, + SimpleString filterString, + boolean temporary, + boolean durable, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception; + void createSharedQueue(SimpleString address, SimpleString name, boolean durable, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index ba63bb3..8e86067 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1451,6 +1451,18 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers); + } + + @Override public Queue createQueue(final SimpleString address, final SimpleString queueName, final SimpleString filterString, @@ -1462,6 +1474,19 @@ public class ActiveMQServerImpl implements ActiveMQServer { } @Override + public Queue createQueue(SimpleString address, + SimpleString queueName, + SimpleString filter, + SimpleString user, + boolean durable, + boolean temporary, + boolean autoCreated, + Integer maxConsumers, + Boolean deleteOnNoConsumers) throws Exception { + return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers); + } + + @Override public void createSharedQueue(final SimpleString address, final SimpleString name, final SimpleString filterString, http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 37d99bb..4a7a89d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -500,6 +500,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener { final SimpleString filterString, final boolean temporary, final boolean durable) throws Exception { + return createQueue(address, name, filterString, temporary, durable, null, null); + } + + @Override + public Queue createQueue(final SimpleString address, + final SimpleString name, + final SimpleString filterString, + final boolean temporary, + final boolean durable, + final Integer maxConsumers, + final Boolean deleteOnNoConsumers) throws Exception { if (durable) { // make sure the user has privileges to create this queue securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this); @@ -513,9 +524,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener { // any non-temporary JMS destination created via this method should be marked as auto-created if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC))) { - queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true); + queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true, maxConsumers, deleteOnNoConsumers); } else { - queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary); + queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers); } if (temporary) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/31173e9b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index dd0098a..e99fc96 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -1620,6 +1620,7 @@ public class MQTTTest extends MQTTTestSupport { public void testClientDisconnectedOnMaxConsumerLimitReached() throws Exception { Exception peerDisconnectedException = null; try { + String clientId = "test.client"; SimpleString coreAddress = new SimpleString("foo.bar"); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; @@ -1627,8 +1628,10 @@ public class MQTTTest extends MQTTTestSupport { addressInfo.setDefaultMaxConsumers(0); getServer().createOrUpdateAddressInfo(addressInfo); + getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false); + MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("test-mqtt"); + mqtt.setClientId(clientId); mqtt.setKeepAlive((short) 2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); @@ -1644,6 +1647,7 @@ public class MQTTTest extends MQTTTestSupport { public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception { Exception peerDisconnectedException = null; try { + String clientId = "test.mqtt"; SimpleString coreAddress = new SimpleString("foo.bar"); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; @@ -1652,7 +1656,30 @@ public class MQTTTest extends MQTTTestSupport { getServer().createOrUpdateAddressInfo(addressInfo); MQTT mqtt = createMQTTConnection(); - mqtt.setClientId("test-mqtt"); + mqtt.setClientId(clientId); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.subscribe(mqttSubscription); + } catch (EOFException e) { + peerDisconnectedException = e; + } + assertNotNull(peerDisconnectedException); + assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + } + + @Test(timeout = 60 * 1000) + public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception { + Exception peerDisconnectedException = null; + try { + String clientId = "testMqtt"; + SimpleString coreAddress = new SimpleString("foo.bar"); + getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true); + + Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); mqtt.setKeepAlive((short) 2); final BlockingConnection connection = mqtt.blockingConnection(); connection.connect();