activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From martyntay...@apache.org
Subject activemq-artemis git commit: mqtt improvement over retained queue
Date Tue, 28 Jul 2015 09:04:03 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 414d4e24e -> d002da850


mqtt improvement over retained queue


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d002da85
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d002da85
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d002da85

Branch: refs/heads/master
Commit: d002da8506ff6c962ed39d957fab21730a09f8c9
Parents: 414d4e2
Author: Clebert Suconic <clebertsuconic@apache.org>
Authored: Sat Jul 25 09:59:30 2015 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Jul 27 10:32:20 2015 -0400

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTRetainMessageManager.java     | 10 +++++-----
 .../core/protocol/mqtt/MQTTSubscriptionManager.java      | 11 ++++++-----
 .../core/protocol/openwire/amq/AMQServerSession.java     |  8 ++++----
 .../activemq/artemis/core/server/ServerSession.java      |  2 +-
 .../artemis/core/server/impl/ServerSessionImpl.java      | 10 +++++++---
 5 files changed, 23 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d002da85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index c1fd17f..5fdcb77 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -17,14 +17,14 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.util.Iterator;
+
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 
-import java.util.Iterator;
-
 
 public class MQTTRetainMessageManager
 {
@@ -46,11 +46,11 @@ public class MQTTRetainMessageManager
    {
       SimpleString retainAddress = new SimpleString(MQTTUtil.convertMQTTAddressFilterToCoreRetain(address));
 
-      if (!session.getServerSession().executeQueueQuery(retainAddress).isExists())
+      Queue queue = session.getServer().locateQueue(retainAddress);
+      if (queue == null)
       {
-         session.getServerSession().createQueue(retainAddress, retainAddress, null, false,
true);
+         queue = session.getServerSession().createQueue(retainAddress, retainAddress, null,
false, true);
       }
-      Queue queue = session.getServer().locateQueue(retainAddress);
 
       // Set the address of this message to the retained queue.
       message.setAddress(retainAddress);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d002da85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index c523938..fc6dccb 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -17,16 +17,16 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-import io.netty.handler.codec.mqtt.MqttTopicSubscription;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.server.ServerConsumer;
-
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.ServerConsumer;
+
 public class MQTTSubscriptionManager
 {
    private MQTTSession session;
@@ -134,6 +134,7 @@ public class MQTTSubscriptionManager
       }
    }
 
+   // FIXME: Do we need this synchronzied?
    private synchronized void removeSubscription(String address) throws Exception
    {
       ServerConsumer consumer = consumers.get(address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d002da85/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
index d3d7bfa..a28bcfe 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQServerSession.java
@@ -331,7 +331,7 @@ public class AMQServerSession extends ServerSessionImpl
    }
 
    @Override
-   public void createQueue(final SimpleString address,
+   public Queue createQueue(final SimpleString address,
                            final SimpleString name,
                            final SimpleString filterString,
                            final boolean temporary,
@@ -339,11 +339,10 @@ public class AMQServerSession extends ServerSessionImpl
    {
       if (!this.internal)
       {
-         super.createQueue(address, name, filterString, temporary, durable);
-         return;
+         return super.createQueue(address, name, filterString, temporary, durable);
       }
 
-      server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()),
durable, temporary);
+      Queue queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()),
durable, temporary);
 
       if (temporary)
       {
@@ -368,6 +367,7 @@ public class AMQServerSession extends ServerSessionImpl
                                              temporary + " durable=" + durable + " on session
user=" + this.username + ", connection=" + this.remotingConnection);
       }
 
+      return queue;
    }
 
    @Override

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d002da85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index f993292..7623781 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -87,7 +87,7 @@ public interface ServerSession
 
    void stop();
 
-   void createQueue(SimpleString address,
+   Queue createQueue(SimpleString address,
                     SimpleString name,
                     SimpleString filterString,
                     boolean temporary,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d002da85/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index f75a09a..98b6bb7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -543,7 +543,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener
             credits);
    }
 
-   public void createQueue(final SimpleString address,
+   public Queue createQueue(final SimpleString address,
                            final SimpleString name,
                            final SimpleString filterString,
                            final boolean temporary,
@@ -561,14 +561,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener
 
       ((ActiveMQServerImpl)server).checkQueueCreationLimit(getUsername());
 
+      Queue queue;
+
       // any non-temporary JMS queue created via this method should be marked as auto-created
       if (!temporary && address.toString().startsWith(ResourceNames.JMS_QUEUE) &&
address.equals(name))
       {
-         server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()),
durable, temporary, true);
+         queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()),
durable, temporary, true);
       }
       else
       {
-         server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()),
durable, temporary);
+         queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()),
durable, temporary);
       }
 
       if (temporary)
@@ -594,6 +596,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener
                                              temporary + " durable=" + durable + " on session
user=" + this.username + ", connection=" + this.remotingConnection);
       }
 
+      return queue;
+
    }
 
    @Override


Mime
View raw message