activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-990 Dont require Perms on MQTT mngment Q
Date Fri, 10 Mar 2017 11:08:40 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master c1c0354d9 -> 526b42a42


ARTEMIS-990 Dont require Perms on MQTT mngment Q


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/b33fea0d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/b33fea0d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/b33fea0d

Branch: refs/heads/master
Commit: b33fea0d7fbc94a43d04ca66a89880442e0f91c5
Parents: c1c0354
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Fri Mar 10 10:14:19 2017 +0000
Committer: Martyn Taylor <mtaylor@redhat.com>
Committed: Fri Mar 10 11:06:05 2017 +0000

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java  | 24 ++++++++++++--------
 .../protocol/mqtt/MQTTRetainMessageManager.java |  3 +--
 .../integration/mqtt/imported/MQTTTest.java     |  2 +-
 3 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b33fea0d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 553521b..55fdfcc 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -26,6 +26,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
@@ -64,10 +65,6 @@ public class MQTTPublishManager {
    synchronized void start() throws Exception {
       this.state = session.getSessionState();
       this.outboundStore = state.getOutboundStore();
-
-      createManagementAddress();
-      createManagementQueue();
-      createManagementConsumer();
    }
 
    synchronized void stop() throws Exception {
@@ -79,7 +76,7 @@ public class MQTTPublishManager {
    }
 
    void clean() throws Exception {
-      createManagementAddress();
+      SimpleString managementAddress = createManagementAddress();
       Queue queue = session.getServer().locateQueue(managementAddress);
       if (queue != null) {
          queue.deleteQueue();
@@ -92,14 +89,14 @@ public class MQTTPublishManager {
       managementConsumer.setStarted(true);
    }
 
-   private void createManagementAddress() {
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
+   private SimpleString createManagementAddress() {
+      return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
    }
 
    private void createManagementQueue() throws Exception {
       Queue q = session.getServer().locateQueue(managementAddress);
       if (q == null) {
-         session.getServerSession().createQueue(managementAddress, managementAddress, null,
false, MQTTUtil.DURABLE_MESSAGES);
+         session.getServer().createQueue(managementAddress, RoutingType.ANYCAST, managementAddress,
null, MQTTUtil.DURABLE_MESSAGES, false);
       }
    }
 
@@ -189,11 +186,20 @@ public class MQTTPublishManager {
       session.getProtocolHandler().sendPubRel(messageId);
    }
 
+   private SimpleString getManagementAddress() throws Exception {
+      if (managementAddress == null) {
+         managementAddress = createManagementAddress();
+         createManagementQueue();
+         createManagementConsumer();
+      }
+      return managementAddress;
+   }
+
    void handlePubRec(int messageId) throws Exception {
       try {
          Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
          if (ref != null) {
-            Message m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+            Message m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
             session.getServerSession().send(m, true);
             session.getServerSession().acknowledge(ref.getB(), ref.getA());
          } else {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b33fea0d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 0b52a0b..d0a3c07 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -49,10 +49,9 @@ public class MQTTRetainMessageManager {
 
       Queue queue = session.getServer().locateQueue(retainAddress);
       if (queue == null) {
-         queue = session.getServerSession().createQueue(retainAddress, retainAddress, null,
false, true);
+         queue = session.getServer().createQueue(retainAddress, retainAddress, null, true,
false);
       }
 
-
       try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
          synchronized (queue) {
             if (iterator.hasNext()) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/b33fea0d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 91db1d2a..32062c0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1845,7 +1845,7 @@ public class MQTTTest extends MQTTTestSupport {
       connection2.connect();
       connection2.subscribe(mqttTopic);
 
-      Message message = connection2.receive();
+      Message message = connection2.receive(5000, TimeUnit.MILLISECONDS);
       assertEquals(payload, new String(message.getPayload()));
    }
 


Mime
View raw message