activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-788 Enable auto create and Prefix in MQTT
Date Tue, 13 Dec 2016 20:57:20 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master d5be1c6dc -> 47f46501e


ARTEMIS-788 Enable auto create and Prefix in MQTT


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7f658cad
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7f658cad
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7f658cad

Branch: refs/heads/master
Commit: 7f658cad2153461c062cabbffff64461971a995a
Parents: d5be1c6
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue Dec 13 16:00:51 2016 +0000
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Tue Dec 13 15:54:50 2016 -0500

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  67 +++++++--
 .../core/server/impl/ServerSessionImpl.java     |   2 +-
 .../integration/mqtt/imported/MQTTTest.java     | 136 +++++++++++++------
 .../mqtt/imported/MQTTTestSupport.java          |   4 +-
 4 files changed, 156 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index c4b8b94..f09e5c5 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -17,17 +17,21 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
-import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 
@@ -89,20 +93,65 @@ public class MQTTSubscriptionManager {
     * Creates a Queue if it doesn't already exist, based on a topic and address.  Returning
the queue name.
     */
    private Queue createQueueForSubscription(String address, int qos) throws Exception {
-
+      // Check to see if a subscription queue already exists.
       SimpleString queue = getQueueNameForTopic(address);
-
       Queue q = session.getServer().locateQueue(queue);
+
+      // The queue does not exist so we need to create it.
       if (q == null) {
-         q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST,
managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
-      } else {
-         if (q.isDeleteOnNoConsumers()) {
-            throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(),
q.getName(), "deleteOnNoConsumers", false, true);
+         SimpleString sAddress = SimpleString.toSimpleString(address);
+
+         // Check we can auto create queues.
+         BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress);
+         if (!bindingQueryResult.isAutoCreateQueues()) {
+            throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress);
+         }
+
+         // Check that the address exists, if not we try to auto create it.
+         AddressInfo addressInfo = session.getServerSession().getAddress(sAddress);
+         if (addressInfo == null) {
+            if (!bindingQueryResult.isAutoCreateAddresses()) {
+               throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
+            }
+            addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
RoutingType.MULTICAST, false);
          }
+         return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
       }
       return q;
    }
 
+   private Queue findOrCreateQueue(BindingQueryResult bindingQueryResult, AddressInfo addressInfo,
SimpleString queue, int qos) throws Exception {
+
+      if (addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST)) {
+         return session.getServerSession().createQueue(addressInfo.getName(), queue, RoutingType.MULTICAST,
managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
+      }
+
+      if (addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST)) {
+         if (!bindingQueryResult.getQueueNames().isEmpty()) {
+            SimpleString name = null;
+            for (SimpleString qName : bindingQueryResult.getQueueNames()) {
+               if (name == null) {
+                  name = qName;
+               } else if (qName.equals(addressInfo.getName())) {
+                  name = qName;
+               }
+            }
+            return session.getServer().locateQueue(name);
+         } else {
+            try {
+               return session.getServerSession().createQueue(addressInfo.getName(), addressInfo.getName(),
RoutingType.ANYCAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >=
0, false);
+            } catch (ActiveMQQueueExistsException e) {
+               return session.getServer().locateQueue(addressInfo.getName());
+            }
+         }
+      }
+
+      Set<RoutingType> routingTypeSet = new HashSet();
+      routingTypeSet.add(RoutingType.MULTICAST);
+      routingTypeSet.add(RoutingType.ANYCAST);
+      throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(addressInfo.getRoutingType(),
addressInfo.getName().toString(), routingTypeSet);
+   }
+
    /**
     * Creates a new consumer for the queue associated with a subscription
     */
@@ -122,10 +171,6 @@ public class MQTTSubscriptionManager {
       String topic = subscription.topicName();
 
       String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
-      AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress));
-      if (addressInfo != null && !addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST))
{
-         throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress),
RoutingType.MULTICAST, addressInfo.getRoutingTypes());
-      }
 
       session.getSessionState().addSubscription(subscription);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index e80bdc0..49cf471 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -1510,7 +1510,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
{
 
    @Override
    public AddressInfo getAddress(SimpleString address) {
-      return server.getPostOffice().getAddressInfo(address);
+      return server.getPostOffice().getAddressInfo(removePrefix(address));
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 58d75d8..79029be 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -27,9 +27,11 @@ import java.lang.reflect.Field;
 import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -38,7 +40,6 @@ import java.util.regex.Pattern;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
-import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
@@ -1642,50 +1643,107 @@ public class MQTTTest extends MQTTTestSupport {
    }
 
    @Test(timeout = 60 * 1000)
-   public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception
{
-      Exception peerDisconnectedException = null;
-      try {
-         String clientId = "test.mqtt";
-         SimpleString coreAddress = new SimpleString("foo.bar");
-         Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
+   public void testAnycastPrefixWorksWithMQTT() throws Exception {
+      String clientId = "testMqtt";
 
-         AddressInfo addressInfo = new AddressInfo(coreAddress);
-         addressInfo.addRoutingType(RoutingType.ANYCAST);
-         getServer().createOrUpdateAddressInfo(addressInfo);
+      String anycastAddress = "anycast:foo/bar";
+      String sendAddress = "foo/bar";
+      Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
 
-         MQTT mqtt = createMQTTConnection();
-         mqtt.setClientId(clientId);
-         mqtt.setKeepAlive((short) 2);
-         final BlockingConnection connection = mqtt.blockingConnection();
-         connection.connect();
-         connection.subscribe(mqttSubscription);
-      } catch (EOFException e) {
-         peerDisconnectedException = e;
-      }
-      assertNotNull(peerDisconnectedException);
-      assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      BlockingConnection connection1 = mqtt.blockingConnection();
+      connection1.connect();
+      connection1.subscribe(mqttSubscription);
+
+      MQTT mqtt2 = createMQTTConnection();
+      mqtt2.setClientId(clientId + "2");
+      BlockingConnection connection2 = mqtt2.blockingConnection();
+      connection2.connect();
+      connection2.subscribe(mqttSubscription);
+
+      String message1 = "TestMessage1";
+      String message2 = "TestMessage2";
+
+      connection1.publish(sendAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false);
+      connection2.publish(sendAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false);
+
+      assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
+      assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
+
+      assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
+      assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
    }
 
    @Test(timeout = 60 * 1000)
-   public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers()
throws Exception {
-      Exception peerDisconnectedException = null;
-      try {
-         String clientId = "testMqtt";
-         SimpleString coreAddress = new SimpleString("foo.bar");
-         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId
+ "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true);
+   public void testAnycastAddressWorksWithMQTT() throws Exception {
+      String anycastAddress = "foo/bar";
 
-         Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
+      getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"),
RoutingType.ANYCAST));
+      String clientId = "testMqtt";
 
-         MQTT mqtt = createMQTTConnection();
-         mqtt.setClientId(clientId);
-         mqtt.setKeepAlive((short) 2);
-         final BlockingConnection connection = mqtt.blockingConnection();
-         connection.connect();
-         connection.subscribe(mqttSubscription);
-      } catch (EOFException e) {
-         peerDisconnectedException = e;
-      }
-      assertNotNull(peerDisconnectedException);
-      assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
+      Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      BlockingConnection connection1 = mqtt.blockingConnection();
+      connection1.connect();
+      connection1.subscribe(mqttSubscription);
+
+      MQTT mqtt2 = createMQTTConnection();
+      mqtt2.setClientId(clientId + "2");
+      BlockingConnection connection2 = mqtt2.blockingConnection();
+      connection2.connect();
+      connection2.subscribe(mqttSubscription);
+
+      String message1 = "TestMessage1";
+      String message2 = "TestMessage2";
+
+      connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false);
+      connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false);
+
+      assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
+      assertNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
+
+      assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
+      assertNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testAmbiguousRoutingWithMQTT() throws Exception {
+      String anycastAddress = "foo/bar";
+
+      Set<RoutingType> routingTypeSet = new HashSet<>();
+      routingTypeSet.add(RoutingType.ANYCAST);
+      routingTypeSet.add(RoutingType.MULTICAST);
+
+      getServer().createAddressInfo(new AddressInfo(SimpleString.toSimpleString("foo.bar"),
routingTypeSet));
+      String clientId = "testMqtt";
+
+      Topic[] mqttSubscription = new Topic[]{new Topic(anycastAddress, QoS.AT_LEAST_ONCE)};
+
+      MQTT mqtt = createMQTTConnection();
+      mqtt.setClientId(clientId);
+      BlockingConnection connection1 = mqtt.blockingConnection();
+      connection1.connect();
+      connection1.subscribe(mqttSubscription);
+
+      MQTT mqtt2 = createMQTTConnection();
+      mqtt2.setClientId(clientId + "2");
+      BlockingConnection connection2 = mqtt2.blockingConnection();
+      connection2.connect();
+      connection2.subscribe(mqttSubscription);
+
+      String message1 = "TestMessage1";
+      String message2 = "TestMessage2";
+
+      connection1.publish(anycastAddress, message1.getBytes(), QoS.AT_LEAST_ONCE, false);
+      connection2.publish(anycastAddress, message2.getBytes(), QoS.AT_LEAST_ONCE, false);
+
+      assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
+      assertNotNull(connection1.receive(1000, TimeUnit.MILLISECONDS));
+
+      assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
+      assertNotNull(connection2.receive(1000, TimeUnit.MILLISECONDS));
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7f658cad/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 877d5a7..965804c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -167,8 +167,8 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       Map<String, Object> params = new HashMap<>();
       params.put(TransportConstants.PORT_PROP_NAME, "" + port);
       params.put(TransportConstants.PROTOCOLS_PROP_NAME, "MQTT");
-      TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY,
params);
-      server.getConfiguration().getAcceptorConfigurations().add(transportConfiguration);
+
+      server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port
+ "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:");
 
       LOG.info("Added connector {} to broker", getProtocolScheme());
    }


Mime
View raw message