Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B72E511DB2 for ; Wed, 30 Jul 2014 16:00:45 +0000 (UTC) Received: (qmail 22925 invoked by uid 500); 30 Jul 2014 16:00:45 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 22877 invoked by uid 500); 30 Jul 2014 16:00:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 22864 invoked by uid 99); 30 Jul 2014 16:00:45 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Jul 2014 16:00:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 5926199F33B; Wed, 30 Jul 2014 16:00:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Date: Wed, 30 Jul 2014 16:00:45 -0000 Message-Id: <3491e08f133344e8bc3a6542fae671cf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] git commit: https://issues.apache.org/jira/browse/AMQ-5290 Repository: activemq Updated Branches: refs/heads/trunk b550fb774 -> 2cd54248c https://issues.apache.org/jira/browse/AMQ-5290 Reduce memory overhead of the MQTT Protocol converter by not storing the UTF8Buffer instances and instead simply store the needed String values. We always access the String value anyway so all of the UTF8Buffer object we store carry the overhead of both marshalled UTF-8 bytes and an unmarshalled String object. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/2cd54248 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/2cd54248 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/2cd54248 Branch: refs/heads/trunk Commit: 2cd54248c693b7b561bec8ef124d8ebde90fd1a0 Parents: 4b0e3e5 Author: Timothy Bish Authored: Wed Jul 30 12:00:23 2014 -0400 Committer: Timothy Bish Committed: Wed Jul 30 12:00:38 2014 -0400 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 33 ++++++++++---------- 1 file changed, 17 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/2cd54248/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 3e64556..cc51ce7 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -112,9 +112,9 @@ public class MQTTProtocolConverter { private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); - private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); - private final Map activeMQTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); - private final Map mqttTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); + private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); + private final Map activeMQTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); + private final Map mqttTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); private final Set restoredSubs = Collections.synchronizedSet(new HashSet()); private final Map consumerAcks = new LRUCache(DEFAULT_CACHE_SIZE); @@ -413,9 +413,9 @@ public class MQTTProtocolConverter { byte onSubscribe(final Topic topic) throws MQTTProtocolException { - final UTF8Buffer topicName = topic.name(); + final String topicName = topic.name().toString(); final QoS topicQoS = topic.qos(); - ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName.toString())); + ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName)); if( mqttSubscriptionByTopic.containsKey(topicName)) { final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName); @@ -439,7 +439,7 @@ public class MQTTProtocolConverter { consumerInfo.setDispatchAsync(true); // create durable subscriptions only when cleansession is false if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal() >= QoS.AT_LEAST_ONCE.ordinal() ) { - consumerInfo.setSubscriptionName(topicQoS + ":" + topicName.toString()); + consumerInfo.setSubscriptionName(topicQoS + ":" + topicName); } MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo); @@ -471,7 +471,7 @@ public class MQTTProtocolConverter { return qos[0]; } - private void resendRetainedMessages(UTF8Buffer topicName, ActiveMQDestination destination, + private void resendRetainedMessages(String topicName, ActiveMQDestination destination, MQTTSubscription mqttSubscription) throws MQTTProtocolException { // check whether the Topic has been recovered in restoreDurableSubs // mark subscription available for recovery for duplicate subscription @@ -524,7 +524,7 @@ public class MQTTProtocolConverter { UTF8Buffer[] topics = command.topics(); if (topics != null) { for (UTF8Buffer topic : topics) { - onUnSubscribe(topic); + onUnSubscribe(topic.toString()); } } UNSUBACK ack = new UNSUBACK(); @@ -532,7 +532,7 @@ public class MQTTProtocolConverter { sendToMQTT(ack.encode()); } - void onUnSubscribe(UTF8Buffer topicName) { + void onUnSubscribe(String topicName) { MQTTSubscription subs = mqttSubscriptionByTopic.remove(topicName); if (subs != null) { ConsumerInfo info = subs.getConsumerInfo(); @@ -548,7 +548,7 @@ public class MQTTProtocolConverter { // check if the durable sub also needs to be removed if (subs.getConsumerInfo().getSubscriptionName() != null) { // also remove it from restored durable subscriptions set - restoredSubs.remove(convertMQTTToActiveMQ(topicName.toString())); + restoredSubs.remove(convertMQTTToActiveMQ(topicName)); RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); rsi.setConnectionId(connectionId); @@ -680,7 +680,7 @@ public class MQTTProtocolConverter { if (topic == null) { String topicName = convertMQTTToActiveMQ(command.topicName().toString()); topic = new ActiveMQTopic(topicName); - activeMQTopicMap.put(command.topicName(), topic); + activeMQTopicMap.put(command.topicName().toString(), topic); } } msg.setJMSDestination(topic); @@ -704,15 +704,15 @@ public class MQTTProtocolConverter { result.retain(true); } - UTF8Buffer topicName; + String topicName; synchronized (mqttTopicMap) { topicName = mqttTopicMap.get(message.getJMSDestination()); if (topicName == null) { - topicName = new UTF8Buffer(convertActiveMQToMQTT(message.getDestination().getPhysicalName())); + topicName = convertActiveMQToMQTT(message.getDestination().getPhysicalName()); mqttTopicMap.put(message.getJMSDestination(), topicName); } } - result.topicName(topicName); + result.topicName(new UTF8Buffer(topicName)); if (message.getDataStructureType() == ActiveMQTextMessage.DATA_STRUCTURE_TYPE) { ActiveMQTextMessage msg = (ActiveMQTextMessage) message.copy(); @@ -962,9 +962,10 @@ public class MQTTProtocolConverter { /** * set the default prefetch size when mapping the MQTT subscription to an ActiveMQ one * The default = 1 - * @param activeMQSubscriptionPrefetch set the prefetch for the corresponding ActiveMQ subscription + * + * @param activeMQSubscriptionPrefetch + * set the prefetch for the corresponding ActiveMQ subscription */ - public void setActiveMQSubscriptionPrefetch(int activeMQSubscriptionPrefetch) { this.activeMQSubscriptionPrefetch = activeMQSubscriptionPrefetch; }