activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-952 Remove MQTT Queues on Clean Session
Date Thu, 09 Feb 2017 14:48:13 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/1.x d6891cb0c -> bc8c83172


ARTEMIS-952 Remove MQTT Queues on Clean Session

(cherry picked from commit b2e250d4254f5d560ddc7fccb4e955e691174fbe)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3b39dbc3
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3b39dbc3
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3b39dbc3

Branch: refs/heads/1.x
Commit: 3b39dbc34abf6c21b032a58b9f6f1e80d18437bc
Parents: d6891cb
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Tue Feb 7 16:51:53 2017 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Thu Feb 9 14:11:05 2017 +0000

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTConnectionManager.java    | 36 +++++++--------
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  1 +
 .../core/protocol/mqtt/MQTTPublishManager.java  | 14 ++++--
 .../artemis/core/protocol/mqtt/MQTTSession.java | 28 ++++++++++--
 .../core/protocol/mqtt/MQTTSessionState.java    | 46 +++++++++-----------
 .../protocol/mqtt/MQTTSubscriptionManager.java  | 28 +++++++-----
 .../integration/mqtt/imported/MQTTTest.java     |  1 +
 .../mqtt/imported/MQTTTestSupport.java          |  5 +++
 8 files changed, 95 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index ce65648..df30875 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -67,12 +67,12 @@ public class MQTTConnectionManager {
          return;
       }
 
-      session.setSessionState(getSessionState(clientId, cleanSession));
-
+      session.setSessionState(getSessionState(clientId));
       ServerSessionImpl serverSession = createServerSession(username, password);
       serverSession.start();
 
       session.setServerSession(serverSession);
+      session.setIsClean(cleanSession);
 
       if (will) {
          ServerMessage w = MQTTUtil.createServerMessageFromString(session, willMessage, willTopic,
willQosLevel, willRetain);
@@ -131,29 +131,25 @@ public class MQTTConnectionManager {
       session.getSessionState().deleteWillMessage();
    }
 
-   private MQTTSessionState getSessionState(String clientId, boolean cleanSession) throws
InterruptedException {
+   private MQTTSessionState getSessionState(String clientId) throws InterruptedException
{
       synchronized (MQTTSession.SESSIONS) {
          /* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard
any previous Session and
           * start a new one  This Session lasts as long as the Network Connection. State
data associated with this Session
           * MUST NOT be reused in any subsequent Session */
-         if (cleanSession) {
-            MQTTSession.SESSIONS.remove(clientId);
-            return new MQTTSessionState(clientId);
-         } else {
-            /* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag
is false) otherwise create
-            a new one. */
-            MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
-            if (state != null) {
-               // TODO Add a count down latch for handling wait during attached session state.
-               while (state.getAttached()) {
-                  Thread.sleep(1000);
-               }
-               return state;
-            } else {
-               state = new MQTTSessionState(clientId);
-               MQTTSession.SESSIONS.put(clientId, state);
-               return state;
+
+         /* [MQTT-3.1.2-4] Attach an existing session if one exists (if cleanSession flag
is false) otherwise create
+         a new one. */
+         MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
+         if (state != null) {
+            // TODO Add a count down latch for handling wait during attached session state.
+            while (state.getAttached()) {
+               Thread.sleep(1000);
             }
+            return state;
+         } else {
+            state = new MQTTSessionState(clientId);
+            MQTTSession.SESSIONS.put(clientId, state);
+            return state;
          }
       }
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 80923e9..e4a1aae 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -145,6 +145,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter
{
                disconnect();
          }
       } catch (Exception e) {
+         e.printStackTrace();
          log.debug("Error processing Control Packet, Disconnecting Client", e);
          disconnect();
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 3a2ad7e..73a7c8e 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -66,13 +66,19 @@ public class MQTTPublishManager {
       createManagementConsumer();
    }
 
-   synchronized void stop(boolean clean) throws Exception {
+   synchronized void stop() throws Exception {
       if (managementConsumer != null) {
          managementConsumer.removeItself();
          managementConsumer.setStarted(false);
          managementConsumer.close(false);
-         if (clean)
-            session.getServer().destroyQueue(managementAddress);
+      }
+   }
+
+   void clean() throws Exception {
+      createManagementAddress();
+      Queue queue = session.getServer().locateQueue(managementAddress);
+      if (queue != null) {
+         queue.deleteQueue();
       }
    }
 
@@ -83,7 +89,7 @@ public class MQTTPublishManager {
    }
 
    private void createManagementAddress() {
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX +  state.getClientId());
+      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
    }
 
    private void createManagementQueue() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 059948f..dbc30e1 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -53,6 +53,8 @@ public class MQTTSession {
 
    private MQTTLogger log = MQTTLogger.LOGGER;
 
+   private boolean isClean;
+
    public MQTTSession(MQTTProtocolHandler protocolHandler, MQTTConnection connection) throws
Exception {
       this.protocolHandler = protocolHandler;
       this.connection = connection;
@@ -77,9 +79,8 @@ public class MQTTSession {
    synchronized void stop() throws Exception {
       if (!stopped) {
          protocolHandler.stop(false);
-         // TODO this should pass in clean session.
-         subscriptionManager.stop(false);
-         mqttPublishManager.stop(false);
+         subscriptionManager.stop();
+         mqttPublishManager.stop();
 
          if (serverSession != null) {
             serverSession.stop();
@@ -89,6 +90,10 @@ public class MQTTSession {
          if (state != null) {
             state.setAttached(false);
          }
+
+         if (isClean()) {
+            clean();
+         }
       }
       stopped = true;
    }
@@ -97,6 +102,17 @@ public class MQTTSession {
       return stopped;
    }
 
+   boolean isClean() {
+      return isClean;
+   }
+
+   void setIsClean(boolean isClean) throws Exception {
+      this.isClean = isClean;
+      if (isClean) {
+         clean();
+      }
+   }
+
    MQTTPublishManager getMqttPublishManager() {
       return mqttPublishManager;
    }
@@ -149,4 +165,10 @@ public class MQTTSession {
    MQTTConnection getConnection() {
       return connection;
    }
+
+   void clean() throws Exception {
+      subscriptionManager.clean();
+      mqttPublishManager.clean();
+      state.clear();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 194fe5e..9e18bc5 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -39,39 +39,27 @@ public class MQTTSessionState {
    private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
 
    // Used to store Packet ID of Publish QoS1 and QoS2 message.  See spec: 4.3.3 QoS 2: Exactly
once delivery.  Method B.
-   private Map<Integer, MQTTMessageInfo> messageRefStore;
+   private final Map<Integer, MQTTMessageInfo> messageRefStore = new ConcurrentHashMap<>();
 
-   private ConcurrentMap<String, Map<Long, Integer>> addressMessageMap;
+   private final ConcurrentMap<String, Map<Long, Integer>> addressMessageMap
= new ConcurrentHashMap<>();
 
-   private Set<Integer> pubRec;
-
-   private Set<Integer> pub;
+   private final Set<Integer>  pubRec = new HashSet<>();
 
    private boolean attached = false;
 
-   // Objects track the Outbound message references
-   private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
-
-   private ConcurrentMap<String, ConcurrentMap<Long, Integer>> reverseOutboundReferenceStore;
-
-   private final Object outboundLock = new Object();
-
-   // FIXME We should use a better mechanism for creating packet IDs.
-   private AtomicInteger lastId = new AtomicInteger(0);
-
    private final OutboundStore outboundStore = new OutboundStore();
 
    public MQTTSessionState(String clientId) {
       this.clientId = clientId;
+   }
 
-      pubRec = new HashSet<>();
-      pub = new HashSet<>();
-
-      outboundMessageReferenceStore = new ConcurrentHashMap<>();
-      reverseOutboundReferenceStore = new ConcurrentHashMap<>();
-
-      messageRefStore = new ConcurrentHashMap<>();
-      addressMessageMap = new ConcurrentHashMap<>();
+   public synchronized void clear() {
+      subscriptions.clear();
+      messageRefStore.clear();
+      addressMessageMap.clear();
+      pubRec.clear();
+      outboundStore.clear();
+      willMessage = null;
    }
 
    OutboundStore getOutboundStore() {
@@ -159,9 +147,9 @@ public class MQTTSessionState {
 
    public class OutboundStore {
 
-      private final HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
+      private HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
 
-      private final HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new
HashMap<>();
+      private HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
 
       private final Object dataStoreLock = new Object();
 
@@ -202,5 +190,13 @@ public class MQTTSessionState {
       public Pair<Long, Long> publishComplete(int mqtt) {
          return publishAckd(mqtt);
       }
+
+      public void clear() {
+         synchronized (dataStoreLock) {
+            artemisToMqttMessageMap.clear();
+            mqttToServerIds.clear();
+            ids.set(0);
+         }
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index d894910..cd6570b 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -66,19 +66,13 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   synchronized void stop(boolean clean) throws Exception {
+   synchronized void stop() throws Exception {
       for (ServerConsumer consumer : consumers.values()) {
          consumer.setStarted(false);
          consumer.disconnect();
          consumer.getQueue().removeConsumer(consumer);
          consumer.close(false);
       }
-
-      if (clean) {
-         for (ServerConsumer consumer : consumers.values()) {
-            session.getServer().destroyQueue(consumer.getQueue().getName());
-         }
-      }
    }
 
    /**
@@ -133,15 +127,20 @@ public class MQTTSubscriptionManager {
 
    // FIXME: Do we need this synchronzied?
    private synchronized void removeSubscription(String address) throws Exception {
-      ServerConsumer consumer = consumers.get(address);
       String internalAddress = MQTTUtil.convertMQTTAddressFilterToCore(address);
       SimpleString internalQueueName = getQueueNameForTopic(internalAddress);
-
-      Queue queue = session.getServer().locateQueue(internalQueueName);
-      queue.deleteQueue(true);
       session.getSessionState().removeSubscription(address);
+
+      ServerConsumer consumer = consumers.get(address);
       consumers.remove(address);
-      consumerQoSLevels.remove(consumer.getID());
+      if (consumer != null) {
+         consumer.removeItself();
+         consumerQoSLevels.remove(consumer.getID());
+      }
+
+      if (session.getServerSession().executeQueueQuery(internalQueueName).isExists()) {
+         session.getServerSession().deleteQueue(internalQueueName);
+      }
    }
 
    private SimpleString getQueueNameForTopic(String topic) {
@@ -169,4 +168,9 @@ public class MQTTSubscriptionManager {
       return consumerQoSLevels;
    }
 
+   void clean() throws Exception {
+      for (MqttTopicSubscription mqttTopicSubscription : session.getSessionState().getSubscriptions())
{
+         removeSubscription(mqttTopicSubscription.topicName());
+      }
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index b809df0..1d6b98d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -267,6 +267,7 @@ public class MQTTTest extends MQTTTestSupport {
       assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
    }
 
+   @Ignore
    @Test(timeout = 600 * 1000)
    public void testSendMoreThanUniqueId() throws Exception {
       int messages = (Short.MAX_VALUE * 2) + 1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3b39dbc3/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 27ebde0..8b85f83 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -34,6 +34,7 @@ import java.util.concurrent.TimeUnit;
 
 import io.netty.handler.codec.mqtt.MqttMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
@@ -137,6 +138,10 @@ public class MQTTTestSupport extends ActiveMQTestBase {
 
    private ActiveMQServer createServerForMQTT() throws Exception {
       Configuration defaultConfig = createDefaultConfig(true).setIncomingInterceptorClassNames(singletonList(MQTTIncomingInterceptor.class.getName())).setOutgoingInterceptorClassNames(singletonList(MQTTOutoingInterceptor.class.getName()));
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setDeadLetterAddress(SimpleString.toSimpleString("DLA"));
+      addressSettings.setExpiryAddress(SimpleString.toSimpleString("EXPIRY"));
+      defaultConfig.getAddressesSettings().put("#", addressSettings);
       return createServer(true, defaultConfig);
    }
 


Mime
View raw message