Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC8A2200BEB for ; Tue, 13 Dec 2016 21:57:25 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id AB33F160B23; Tue, 13 Dec 2016 20:57:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A36A6160B07 for ; Tue, 13 Dec 2016 21:57:24 +0100 (CET) Received: (qmail 2446 invoked by uid 500); 13 Dec 2016 20:57:21 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 2354 invoked by uid 99); 13 Dec 2016 20:57:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Dec 2016 20:57:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E21AFF169A; Tue, 13 Dec 2016 20:57:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Tue, 13 Dec 2016 20:57:20 -0000 Message-Id: <00d811ad335c4896a9249cbd46939462@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] activemq-artemis git commit: ARTEMIS-788 Enable auto create and Prefix in MQTT archived-at: Tue, 13 Dec 2016 20:57:25 -0000 Repository: activemq-artemis Updated Branches: refs/heads/master d5be1c6dc -> 47f46501e ARTEMIS-788 Enable auto create and Prefix in MQTT Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7f658cad Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7f658cad Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7f658cad Branch: refs/heads/master Commit: 7f658cad2153461c062cabbffff64461971a995a Parents: d5be1c6 Author: Martyn Taylor Authored: Tue Dec 13 16:00:51 2016 +0000 Committer: Clebert Suconic Committed: Tue Dec 13 15:54:50 2016 -0500 ---------------------------------------------------------------------- .../protocol/mqtt/MQTTSubscriptionManager.java | 67 +++++++-- .../core/server/impl/ServerSessionImpl.java | 2 +- .../integration/mqtt/imported/MQTTTest.java | 136 +++++++++++++------ .../mqtt/imported/MQTTTestSupport.java | 4 +- 4 files changed, 156 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index c4b8b94..f09e5c5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -17,17 +17,21 @@ package org.apache.activemq.artemis.core.protocol.mqtt; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; -import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.impl.AddressInfo; @@ -89,20 +93,65 @@ public class MQTTSubscriptionManager { * Creates a Queue if it doesn't already exist, based on a topic and address. Returning the queue name. */ private Queue createQueueForSubscription(String address, int qos) throws Exception { - + // Check to see if a subscription queue already exists. SimpleString queue = getQueueNameForTopic(address); - Queue q = session.getServer().locateQueue(queue); + + // The queue does not exist so we need to create it. if (q == null) { - q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); - } else { - if (q.isDeleteOnNoConsumers()) { - throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true); + SimpleString sAddress = SimpleString.toSimpleString(address); + + // Check we can auto create queues. + BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress); + if (!bindingQueryResult.isAutoCreateQueues()) { + throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress); + } + + // Check that the address exists, if not we try to auto create it. + AddressInfo addressInfo = session.getServerSession().getAddress(sAddress); + if (addressInfo == null) { + if (!bindingQueryResult.isAutoCreateAddresses()) { + throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address)); + } + addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false); } + return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos); } return q; } + private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo, SimpleString queue, int qos) throws Exception { + + if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) { + return session.getServerSession().createQueue(addressInfo.getName(), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); + } + + if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) { + if (!bindingQueryResult.getQueueNames().isEmpty()) { + SimpleString name = null; + for (SimpleString qName : bindingQueryResult.getQueueNames()) { + if (name == null) { + name = qName; + } else if (qName.equals(addressInfo.getName())) { + name = qName; + } + } + return session.getServer().locateQueue(name); + } else { + try { + return session.getServerSession().createQueue(addressInfo.getName(), addressInfo.getName(), RoutingType.ANYCAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false); + } catch (ActiveMQQueueExistsException e) { + return session.getServer().locateQueue(addressInfo.getName()); + } + } + } + + Set routingTypeSet = new HashSet(); + routingTypeSet.add(RoutingType.MULTICAST); + routingTypeSet.add(RoutingType.ANYCAST); + throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(addressInfo.getRoutingType(), addressInfo.getName().toString(), routingTypeSet); + } + /** * Creates a new consumer for the queue associated with a subscription */ @@ -122,10 +171,6 @@ public class MQTTSubscriptionManager { String topic = subscription.topicName(); String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic); - AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress)); - if (addressInfo != null && !addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) { - throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), RoutingType.MULTICAST, addressInfo.getRoutingTypes()); - } session.getSessionState().addSubscription(subscription); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index e80bdc0..49cf471 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1510,7 +1510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener { @Override public AddressInfo getAddress(SimpleString address) { - return server.getPostOffice().getAddressInfo(address); + return server.getPostOffice().getAddressInfo(removePrefix(address)); } @Override http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java index 58d75d8..79029be 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java @@ -27,9 +27,11 @@ import java.lang.reflect.Field; import java.net.ProtocolException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -38,7 +40,6 @@ import java.util.regex.Pattern; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession; -import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait; @@ -1642,50 +1643,107 @@ public class MQTTTest extends MQTTTestSupport { } @Test(timeout = 60 * 1000) - public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception { - Exception peerDisconnectedException = null; - try { - String clientId = "test.mqtt"; - SimpleString coreAddress = new SimpleString("foo.bar"); - Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + public void testAnycastPrefixWorksWithMQTT() throws Exception { + String clientId = "testMqtt"; - AddressInfo addressInfo = new AddressInfo(coreAddress); - addressInfo.addRoutingType(RoutingType.ANYCAST); - getServer().createOrUpdateAddressInfo(addressInfo); + String anycastAddress = "anycast:foo/bar"; + String sendAddress = "foo/bar"; + Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(clientId); - mqtt.setKeepAlive((short) 2); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - connection.subscribe(mqttSubscription); - } catch (EOFException e) { - peerDisconnectedException = e; - } - assertNotNull(peerDisconnectedException); - assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.subscribe(mqttSubscription); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttSubscription); + + String message1 = "TestMessage1"; + String message2 = "TestMessage2"; + + connection1.publish(sendAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection2.publish(sendAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false); + + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); } @Test(timeout = 60 * 1000) - public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception { - Exception peerDisconnectedException = null; - try { - String clientId = "testMqtt"; - SimpleString coreAddress = new SimpleString("foo.bar"); - getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true); + public void testAnycastAddressWorksWithMQTT() throws Exception { + String anycastAddress = "foo/bar"; - Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; + getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), RoutingType.ANYCAST)); + String clientId = "testMqtt"; - MQTT mqtt = createMQTTConnection(); - mqtt.setClientId(clientId); - mqtt.setKeepAlive((short) 2); - final BlockingConnection connection = mqtt.blockingConnection(); - connection.connect(); - connection.subscribe(mqttSubscription); - } catch (EOFException e) { - peerDisconnectedException = e; - } - assertNotNull(peerDisconnectedException); - assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected")); + Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.subscribe(mqttSubscription); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttSubscription); + + String message1 = "TestMessage1"; + String message2 = "TestMessage2"; + + connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false); + + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + } + + @Test(timeout = 60 * 1000) + public void testAmbiguousRoutingWithMQTT() throws Exception { + String anycastAddress = "foo/bar"; + + Set routingTypeSet = new HashSet<>(); + routingTypeSet.add(RoutingType.ANYCAST); + routingTypeSet.add(RoutingType.MULTICAST); + + getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"), routingTypeSet)); + String clientId = "testMqtt"; + + Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)}; + + MQTT mqtt = createMQTTConnection(); + mqtt.setClientId(clientId); + BlockingConnection connection1 = mqtt.blockingConnection(); + connection1.connect(); + connection1.subscribe(mqttSubscription); + + MQTT mqtt2 = createMQTTConnection(); + mqtt2.setClientId(clientId + "2"); + BlockingConnection connection2 = mqtt2.blockingConnection(); + connection2.connect(); + connection2.subscribe(mqttSubscription); + + String message1 = "TestMessage1"; + String message2 = "TestMessage2"; + + connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false); + + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS)); + + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); + assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS)); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java index 877d5a7..965804c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java @@ -167,8 +167,8 @@ public class MQTTTestSupport extends ActiveMQTestBase { Map params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, "" + port); params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT"); - TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params); - server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration); + + server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:"); LOG.info("Added connector {} to broker", getProtocolScheme()); }