activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [1/2] git commit: https://issues.apache.org/jira/browse/AMQ-5290
Date Wed, 30 Jul 2014 16:00:45 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk b550fb774 -> 2cd54248c


https://issues.apache.org/jira/browse/AMQ-5290

Reduce memory overhead of the MQTT Protocol converter by not storing the
UTF8Buffer instances and instead simply store the needed String values.
We always access the String value anyway so all of the UTF8Buffer object
we store carry the overhead of both marshalled UTF-8 bytes and an
unmarshalled String object.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2cd54248
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2cd54248
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2cd54248

Branch: refs/heads/trunk
Commit: 2cd54248c693b7b561bec8ef124d8ebde90fd1a0
Parents: 4b0e3e5
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jul 30 12:00:23 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jul 30 12:00:38 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 33 ++++++++++----------
 1 file changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/2cd54248/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 3e64556..cc51ce7 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -112,9 +112,9 @@ public class MQTTProtocolConverter {
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new
ConcurrentHashMap<Integer, ResponseHandler>();
     private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
-    private final ConcurrentHashMap<UTF8Buffer, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<UTF8Buffer, MQTTSubscription>();
-    private final Map<UTF8Buffer, ActiveMQTopic> activeMQTopicMap = new LRUCache<UTF8Buffer,
ActiveMQTopic>(DEFAULT_CACHE_SIZE);
-    private final Map<Destination, UTF8Buffer> mqttTopicMap = new LRUCache<Destination,
UTF8Buffer>(DEFAULT_CACHE_SIZE);
+    private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<String, MQTTSubscription>();
+    private final Map<String, ActiveMQTopic> activeMQTopicMap = new LRUCache<String,
ActiveMQTopic>(DEFAULT_CACHE_SIZE);
+    private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination,
String>(DEFAULT_CACHE_SIZE);
     private final Set<String> restoredSubs = Collections.synchronizedSet(new HashSet<String>());
 
     private final Map<Short, MessageAck> consumerAcks = new LRUCache<Short, MessageAck>(DEFAULT_CACHE_SIZE);
@@ -413,9 +413,9 @@ public class MQTTProtocolConverter {
 
     byte onSubscribe(final Topic topic) throws MQTTProtocolException {
 
-        final UTF8Buffer topicName = topic.name();
+        final String topicName = topic.name().toString();
         final QoS topicQoS = topic.qos();
-        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString()));
+        ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName));
 
         if( mqttSubscriptionByTopic.containsKey(topicName)) {
             final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
@@ -439,7 +439,7 @@ public class MQTTProtocolConverter {
         consumerInfo.setDispatchAsync(true);
         // create durable subscriptions only when cleansession is false
         if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal()
>= QoS.AT_LEAST_ONCE.ordinal() ) {
-            consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString());
+            consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
         }
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
 
@@ -471,7 +471,7 @@ public class MQTTProtocolConverter {
         return qos[0];
     }
 
-    private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination,
+    private void resendRetainedMessages(String topicName, ActiveMQDestination destination,
                                         MQTTSubscription mqttSubscription) throws MQTTProtocolException
{
         // check whether the Topic has been recovered in restoreDurableSubs
         // mark subscription available for recovery for duplicate subscription
@@ -524,7 +524,7 @@ public class MQTTProtocolConverter {
         UTF8Buffer[] topics = command.topics();
         if (topics != null) {
             for (UTF8Buffer topic : topics) {
-                onUnSubscribe(topic);
+                onUnSubscribe(topic.toString());
             }
         }
         UNSUBACK ack = new UNSUBACK();
@@ -532,7 +532,7 @@ public class MQTTProtocolConverter {
         sendToMQTT(ack.encode());
     }
 
-    void onUnSubscribe(UTF8Buffer topicName) {
+    void onUnSubscribe(String topicName) {
         MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName);
         if (subs != null) {
             ConsumerInfo info = subs.getConsumerInfo();
@@ -548,7 +548,7 @@ public class MQTTProtocolConverter {
             // check if the durable sub also needs to be removed
             if (subs.getConsumerInfo().getSubscriptionName() != null) {
                 // also remove it from restored durable subscriptions set
-                restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString()));
+                restoredSubs.remove(convertMQTTToActiveMQ(topicName));
 
                 RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
                 rsi.setConnectionId(connectionId);
@@ -680,7 +680,7 @@ public class MQTTProtocolConverter {
             if (topic == null) {
                 String topicName = convertMQTTToActiveMQ(command.topicName().toString());
                 topic = new ActiveMQTopic(topicName);
-                activeMQTopicMap.put(command.topicName(), topic);
+                activeMQTopicMap.put(command.topicName().toString(), topic);
             }
         }
         msg.setJMSDestination(topic);
@@ -704,15 +704,15 @@ public class MQTTProtocolConverter {
             result.retain(true);
         }
 
-        UTF8Buffer topicName;
+        String topicName;
         synchronized (mqttTopicMap) {
             topicName = mqttTopicMap.get(message.getJMSDestination());
             if (topicName == null) {
-                topicName = new UTF8Buffer(convertActiveMQToMQTT(message.getDestination().getPhysicalName()));
+                topicName = convertActiveMQToMQTT(message.getDestination().getPhysicalName());
                 mqttTopicMap.put(message.getJMSDestination(), topicName);
             }
         }
-        result.topicName(topicName);
+        result.topicName(new UTF8Buffer(topicName));
 
         if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) {
             ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy();
@@ -962,9 +962,10 @@ public class MQTTProtocolConverter {
     /**
      * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one
      * The default = 1
-     * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ
subscription
+     *
+     * @param activeMQSubscriptionPrefetch
+     *        set the prefetch for the corresponding ActiveMQ subscription
      */
-
     public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) {
         this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch;
     }


Mime
View raw message