activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-990 Dont require Perms on MQTT mngment Q
Date Fri, 10 Mar 2017 13:14:58 GMT
ARTEMIS-990 Dont require Perms on MQTT mngment Q

(cherry picked from commit b33fea0d7fbc94a43d04ca66a89880442e0f91c5)


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2779ad85
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2779ad85
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2779ad85

Branch: refs/heads/1.x
Commit: 2779ad85539be3292b38a683eadb01837bf1024e
Parents: e0cd9aa
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Mar 10 10:14:19 2017 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Mar 10 13:13:31 2017 +0000

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java  | 23 ++++++++++++--------
 .../protocol/mqtt/MQTTRetainMessageManager.java |  2 +-
 .../integration/mqtt/imported/MQTTTest.java     |  4 ++--
 3 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 77b45ab..626916f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -62,10 +62,6 @@ public class MQTTPublishManager {
    synchronized void start() throws Exception {
       this.state = session.getSessionState();
       this.outboundStore = state.getOutboundStore();
-
-      createManagementAddress();
-      createManagementQueue();
-      createManagementConsumer();
    }
 
    synchronized void stop() throws Exception {
@@ -77,7 +73,7 @@ public class MQTTPublishManager {
    }
 
    void clean() throws Exception {
-      createManagementAddress();
+      SimpleString managementAddress = createManagementAddress();
       Queue queue = session.getServer().locateQueue(managementAddress);
       if (queue != null) {
          queue.deleteQueue();
@@ -90,14 +86,14 @@ public class MQTTPublishManager {
       managementConsumer.setStarted(true);
    }
 
-   private void createManagementAddress() {
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
+   private SimpleString createManagementAddress() {
+      return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
    }
 
    private void createManagementQueue() throws Exception {
       Queue q = session.getServer().locateQueue(managementAddress);
       if (q == null) {
-         session.getServerSession().createQueue(managementAddress, managementAddress, null,
false, MQTTUtil.DURABLE_MESSAGES);
+         session.getServer().createQueue(managementAddress, managementAddress, null, MQTTUtil.DURABLE_MESSAGES,
false);
       }
    }
 
@@ -183,11 +179,20 @@ public class MQTTPublishManager {
       session.getProtocolHandler().sendPubRel(messageId);
    }
 
+   private SimpleString getManagementAddress() throws Exception {
+      if (managementAddress == null) {
+         managementAddress = createManagementAddress();
+         createManagementQueue();
+         createManagementConsumer();
+      }
+      return managementAddress;
+   }
+
    void handlePubRec(int messageId) throws Exception {
       try {
          Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
          if (ref != null) {
-            ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+            ServerMessage m = MQTTUtil.createPubRelMessage(session, getManagementAddress(),
messageId);
             session.getServerSession().send(m, true);
             session.getServerSession().acknowledge(ref.getB(), ref.getA());
          } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 70db040..7acc3b4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -49,7 +49,7 @@ public class MQTTRetainMessageManager {
 
       Queue queue = session.getServer().locateQueue(retainAddress);
       if (queue == null) {
-         queue = session.getServerSession().createQueue(retainAddress, retainAddress, null,
false, true);
+         queue = session.getServer().createQueue(retainAddress, retainAddress, null, true,
false);
       }
 
       try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 7cd1bf1..c211260 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
 import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.MQTTException;
@@ -53,7 +54,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.vertx.java.core.impl.ConcurrentHashSet;
 
 /**
  * QT
@@ -1711,7 +1711,7 @@ public class MQTTTest extends MQTTTestSupport {
       connection2.connect();
       connection2.subscribe(mqttTopic);
 
-      Message message = connection2.receive();
+      Message message = connection2.receive(5000, TimeUnit.MILLISECONDS);
       assertEquals(payload, new String(message.getPayload()));
    }
 


Mime
View raw message