Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 232D511FF6 for ; Mon, 11 Aug 2014 18:03:43 +0000 (UTC) Received: (qmail 47848 invoked by uid 500); 11 Aug 2014 18:03:43 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 47807 invoked by uid 500); 11 Aug 2014 18:03:42 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 47798 invoked by uid 99); 11 Aug 2014 18:03:42 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Aug 2014 18:03:42 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 72B739AB945; Mon, 11 Aug 2014 18:03:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tabish@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: https://issues.apache.org/jira/browse/AMQ-5290 Date: Mon, 11 Aug 2014 18:03:42 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 8b64e139f -> ec049a088 https://issues.apache.org/jira/browse/AMQ-5290 Some more refactorings to allow the strategies to have more control of the subsciption model in use. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ec049a08 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ec049a08 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ec049a08 Branch: refs/heads/trunk Commit: ec049a0882172b2ed288036f36876a8925b8c1bd Parents: 8b64e13 Author: Timothy Bish Authored: Mon Aug 11 14:03:35 2014 -0400 Committer: Timothy Bish Committed: Mon Aug 11 14:03:35 2014 -0400 ---------------------------------------------------------------------- .../transport/mqtt/MQTTProtocolConverter.java | 118 +++---------------- .../AbstractMQTTSubscriptionStrategy.java | 105 +++++++++++++++++ .../MQTTDefaultSubscriptionStrategy.java | 43 ++++--- .../mqtt/strategy/MQTTSubscriptionStrategy.java | 31 ++++- .../MQTTVirtualTopicSubscriptionStrategy.java | 44 +++---- 5 files changed, 195 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java index 3969d87..62a6f51 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java @@ -41,15 +41,12 @@ import org.apache.activemq.command.Command; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ExceptionResponse; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; -import org.apache.activemq.command.RemoveInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; @@ -93,17 +90,13 @@ public class MQTTProtocolConverter { private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode(); private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5; static final int DEFAULT_CACHE_SIZE = 5000; - private static final byte SUBSCRIBE_ERROR = (byte) 0x80; private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId()); private final SessionId sessionId = new SessionId(connectionId, -1); private final ProducerId producerId = new ProducerId(sessionId, 1); private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator(); - private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); - private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); - private final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); private final Map activeMQDestinationMap = new LRUCache(DEFAULT_CACHE_SIZE); private final Map mqttTopicMap = new LRUCache(DEFAULT_CACHE_SIZE); @@ -120,7 +113,7 @@ public class MQTTProtocolConverter { private CONNECT connect; private String clientId; private long defaultKeepAlive; - private int activeMQSubscriptionPrefetch=1; + private int activeMQSubscriptionPrefetch = 1; protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS"; private final MQTTPacketIdGenerator packetIdGenerator; private boolean publishDollarTopics; @@ -351,7 +344,11 @@ public class MQTTProtocolConverter { if (topics != null) { byte[] qos = new byte[topics.length]; for (int i = 0; i < topics.length; i++) { - qos[i] = onSubscribe(topics[i]); + try { + qos[i] = getSubscriptionStrategy().onSubscribe(topics[i]); + } catch (IOException e) { + throw new MQTTProtocolException("Failed to process subscription request", true, e); + } } SUBACK ack = new SUBACK(); ack.messageId(command.messageId()); @@ -366,71 +363,16 @@ public class MQTTProtocolConverter { } } - public byte onSubscribe(final Topic topic) throws MQTTProtocolException { - - final String destinationName = topic.name().toString(); - final QoS requestedQoS = topic.qos(); - - if (mqttSubscriptionByTopic.containsKey(destinationName)) { - final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName); - if (requestedQoS != mqttSubscription.getQoS()) { - // remove old subscription as the QoS has changed - onUnSubscribe(destinationName); - } else { - try { - getSubscriptionStrategy().onReSubscribe(mqttSubscription); - } catch (IOException e) { - throw new MQTTProtocolException("Failed to find subscription strategy", true, e); - } - return (byte) requestedQoS.ordinal(); - } - } - - try { - return getSubscriptionStrategy().onSubscribe(destinationName, requestedQoS); - } catch (IOException e) { - throw new MQTTProtocolException("Failed while intercepting subscribe", true, e); - } - } - - public byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException { - - MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, qoS, consumerInfo); - - // optimistic add to local maps first to be able to handle commands in onActiveMQCommand - subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription); - mqttSubscriptionByTopic.put(topicName, mqttSubscription); - - final byte[] qos = {-1}; - sendToActiveMQ(consumerInfo, new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - // validate subscription request - if (response.isException()) { - final Throwable throwable = ((ExceptionResponse) response).getException(); - LOG.warn("Error subscribing to {}", topicName, throwable); - qos[0] = SUBSCRIBE_ERROR; - } else { - qos[0] = (byte) qoS.ordinal(); - } - } - }); - - if (qos[0] == SUBSCRIBE_ERROR) { - // remove from local maps if subscribe failed - subscriptionsByConsumerId.remove(consumerInfo.getConsumerId()); - mqttSubscriptionByTopic.remove(topicName); - } - - return qos[0]; - } - public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException { checkConnected(); UTF8Buffer[] topics = command.topics(); if (topics != null) { for (UTF8Buffer topic : topics) { - onUnSubscribe(topic.toString()); + try { + getSubscriptionStrategy().onUnSubscribe(topic.toString()); + } catch (IOException e) { + throw new MQTTProtocolException("Failed to process unsubscribe request", true, e); + } } } UNSUBACK ack = new UNSUBACK(); @@ -438,38 +380,6 @@ public class MQTTProtocolConverter { sendToMQTT(ack.encode()); } - public void onUnSubscribe(String topicName) { - MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); - if (subscription != null) { - doUnSubscribe(subscription); - - // check if the broker side of the subscription needs to be removed - try { - getSubscriptionStrategy().onUnSubscribe(subscription); - } catch (IOException e) { - // Ignore - } - } - } - - public void doUnSubscribe(MQTTSubscription subscription) { - mqttSubscriptionByTopic.remove(subscription.getTopicName()); - ConsumerInfo info = subscription.getConsumerInfo(); - if (info != null) { - subscriptionsByConsumerId.remove(info.getConsumerId()); - } - RemoveInfo removeInfo = null; - if (info != null) { - removeInfo = info.createRemoveCommand(); - } - sendToActiveMQ(removeInfo, new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - // ignore failures.. - } - }); - } - /** * Dispatch an ActiveMQ command */ @@ -488,7 +398,7 @@ public class MQTTProtocolConverter { } } else if (command.isMessageDispatch()) { MessageDispatch md = (MessageDispatch) command; - MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId()); + MQTTSubscription sub = getSubscriptionStrategy().getSubscription(md.getConsumerId()); if (sub != null) { MessageAck ack = sub.createMessageAck(md); PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage()); @@ -848,8 +758,8 @@ public class MQTTProtocolConverter { return connectionId; } - public ConsumerId getNextConsumerId() { - return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + public SessionId getSessionId() { + return sessionId; } public boolean isCleanSession() { http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java index 60259e2..d77c51b 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java @@ -16,7 +16,9 @@ */ package org.apache.activemq.transport.mqtt.strategy; +import java.io.IOException; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -30,9 +32,18 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.RemoveInfo; +import org.apache.activemq.command.Response; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolException; import org.apache.activemq.transport.mqtt.MQTTSubscription; +import org.apache.activemq.transport.mqtt.ResponseHandler; +import org.apache.activemq.util.LongSequenceGenerator; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing @@ -40,9 +51,18 @@ import org.apache.activemq.transport.mqtt.MQTTSubscription; */ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy, BrokerServiceAware { + private static final Logger LOG = LoggerFactory.getLogger(AbstractMQTTSubscriptionStrategy.class); + + private static final byte SUBSCRIBE_ERROR = (byte) 0x80; + protected MQTTProtocolConverter protocol; protected BrokerService brokerService; + protected final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); + protected final ConcurrentHashMap mqttSubscriptionByTopic = new ConcurrentHashMap(); + + protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator(); + @Override public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException { setProtocolConverter(protocol); @@ -64,6 +84,34 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti } @Override + public byte onSubscribe(final Topic topic) throws MQTTProtocolException { + + final String destinationName = topic.name().toString(); + final QoS requestedQoS = topic.qos(); + + final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName); + if (mqttSubscription != null) { + if (requestedQoS != mqttSubscription.getQoS()) { + // remove old subscription as the QoS has changed + onUnSubscribe(destinationName); + } else { + try { + onReSubscribe(mqttSubscription); + } catch (IOException e) { + throw new MQTTProtocolException("Failed to find subscription strategy", true, e); + } + return (byte) requestedQoS.ordinal(); + } + } + + try { + return onSubscribe(destinationName, requestedQoS); + } catch (IOException e) { + throw new MQTTProtocolException("Failed while intercepting subscribe", true, e); + } + } + + @Override public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException { String topicName = mqttSubscription.getTopicName(); @@ -126,4 +174,61 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti public boolean isControlTopic(ActiveMQDestination destination) { return destination.getPhysicalName().startsWith("$"); } + + @Override + public MQTTSubscription getSubscription(ConsumerId consumerId) { + return subscriptionsByConsumerId.get(consumerId); + } + + protected ConsumerId getNextConsumerId() { + return new ConsumerId(protocol.getSessionId(), consumerIdGenerator.getNextSequenceId()); + } + + protected byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS qoS) throws MQTTProtocolException { + + MQTTSubscription mqttSubscription = new MQTTSubscription(protocol, topicName, qoS, consumerInfo); + + // optimistic add to local maps first to be able to handle commands in onActiveMQCommand + subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription); + mqttSubscriptionByTopic.put(topicName, mqttSubscription); + + final byte[] qos = {-1}; + protocol.sendToActiveMQ(consumerInfo, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // validate subscription request + if (response.isException()) { + final Throwable throwable = ((ExceptionResponse) response).getException(); + LOG.warn("Error subscribing to {}", topicName, throwable); + qos[0] = SUBSCRIBE_ERROR; + } else { + qos[0] = (byte) qoS.ordinal(); + } + } + }); + + if (qos[0] == SUBSCRIBE_ERROR) { + // remove from local maps if subscribe failed + subscriptionsByConsumerId.remove(consumerInfo.getConsumerId()); + mqttSubscriptionByTopic.remove(topicName); + } + + return qos[0]; + } + + public void doUnSubscribe(MQTTSubscription subscription) { + mqttSubscriptionByTopic.remove(subscription.getTopicName()); + ConsumerInfo info = subscription.getConsumerInfo(); + if (info != null) { + subscriptionsByConsumerId.remove(info.getConsumerId()); + + RemoveInfo removeInfo = info.createRemoveCommand(); + protocol.sendToActiveMQ(removeInfo, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // ignore failures.. + } + }); + } + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java index 4e8f362..14530bd 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java @@ -68,7 +68,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException { ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName)); - ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId()); + ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); consumerInfo.setRetroactive(true); @@ -78,7 +78,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName); } - return protocol.doSubscribe(consumerInfo, topicName, requestedQoS); + return doSubscribe(consumerInfo, topicName, requestedQoS); } @Override @@ -96,22 +96,27 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr } @Override - public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException { - // check if the durable sub also needs to be removed - if (subscription.getConsumerInfo().getSubscriptionName() != null) { - // also remove it from restored durable subscriptions set - restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); - - RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); - rsi.setConnectionId(protocol.getConnectionId()); - rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); - rsi.setClientId(protocol.getClientId()); - protocol.sendToActiveMQ(rsi, new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - // ignore failures.. - } - }); + public void onUnSubscribe(String topicName) throws MQTTProtocolException { + MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); + if (subscription != null) { + doUnSubscribe(subscription); + + // check if the durable sub also needs to be removed + if (subscription.getConsumerInfo().getSubscriptionName() != null) { + // also remove it from restored durable subscriptions set + restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName())); + + RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo(); + rsi.setConnectionId(protocol.getConnectionId()); + rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName()); + rsi.setClientId(protocol.getClientId()); + protocol.sendToActiveMQ(rsi, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // ignore failures.. + } + }); + } } } @@ -140,7 +145,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr String name = sub.getSubcriptionName(); String[] split = name.split(":", 2); QoS qoS = QoS.valueOf(split[0]); - protocol.onSubscribe(new Topic(split[1], qoS)); + onSubscribe(new Topic(split[1], qoS)); // mark this durable subscription as restored by Broker restoredSubs.add(split[1]); } http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java index 0eaa72a..5bea11d 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java @@ -17,10 +17,12 @@ package org.apache.activemq.transport.mqtt.strategy; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; import org.apache.activemq.transport.mqtt.MQTTProtocolConverter; import org.apache.activemq.transport.mqtt.MQTTProtocolException; import org.apache.activemq.transport.mqtt.MQTTSubscription; import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; import org.fusesource.mqtt.codec.CONNECT; /** @@ -50,6 +52,19 @@ public interface MQTTSubscriptionStrategy { public void onConnect(CONNECT connect) throws MQTTProtocolException; /** + * Called for each Topic that a client requests to subscribe to. The strategy needs + * check each Topic for duplicate subscription requests and change of QoS state. + * + * @param topic + * the MQTT Topic instance being subscribed to. + * + * @return the assigned QoS value given to the new subscription. + * + * @throws MQTTProtocolException if an error occurs while processing the subscribe actions. + */ + public byte onSubscribe(Topic topic) throws MQTTProtocolException; + + /** * Called when a new Subscription is being requested. This method allows the * strategy to create a specific type of subscription for the client such as * mapping topic subscriptions to Queues etc. @@ -80,12 +95,12 @@ public interface MQTTSubscriptionStrategy { /** * Called when a client requests an un-subscribe a previous subscription. * - * @param subscription - * the {@link MQTTSubscription} that is being removed. + * @param topicName + * the name of the Topic the client wishes to unsubscribe from. * * @throws MQTTProtocolException if an error occurs during the un-subscribe processing. */ - public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException; + public void onUnSubscribe(String topicName) throws MQTTProtocolException; /** * Intercepts PUBLISH operations from the client and allows the strategy to map the @@ -136,4 +151,14 @@ public interface MQTTSubscriptionStrategy { */ public MQTTProtocolConverter getProtocolConverter(); + /** + * Lookup an {@link MQTTSubscription} instance based on known {@link ConsumerId} value. + * + * @param consumer + * the consumer ID to lookup. + * + * @return the {@link MQTTSubscription} for the consumer or null if no subscription exists. + */ + public MQTTSubscription getSubscription(ConsumerId consumer); + } http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java index 64995c6..0778764 100644 --- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java +++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java @@ -97,13 +97,13 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti destination = new ActiveMQTopic(converted); } - ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId()); + ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(destination); consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); - return protocol.doSubscribe(consumerInfo, topicName, requestedQoS); + return doSubscribe(consumerInfo, topicName, requestedQoS); } @Override @@ -120,27 +120,31 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti if (mqttSubscription.getDestination().isTopic()) { super.onReSubscribe(mqttSubscription); } else { - protocol.doUnSubscribe(mqttSubscription); + doUnSubscribe(mqttSubscription); ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo(); - consumerInfo.setConsumerId(protocol.getNextConsumerId()); - protocol.doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS()); + consumerInfo.setConsumerId(getNextConsumerId()); + doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS()); } } @Override - public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException { - if (subscription.getDestination().isQueue()) { - DestinationInfo remove = new DestinationInfo(); - remove.setConnectionId(protocol.getConnectionId()); - remove.setDestination(subscription.getDestination()); - remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); - - protocol.sendToActiveMQ(remove, new ResponseHandler() { - @Override - public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { - // ignore failures.. - } - }); + public void onUnSubscribe(String topicName) throws MQTTProtocolException { + MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName); + if (subscription != null) { + doUnSubscribe(subscription); + if (subscription.getDestination().isQueue()) { + DestinationInfo remove = new DestinationInfo(); + remove.setConnectionId(protocol.getConnectionId()); + remove.setDestination(subscription.getDestination()); + remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE); + + protocol.sendToActiveMQ(remove, new ResponseHandler() { + @Override + public void onResponse(MQTTProtocolConverter converter, Response response) throws IOException { + // ignore failures.. + } + }); + } } } @@ -203,13 +207,13 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti QoS qoS = QoS.valueOf(qosString); LOG.trace("Restoring subscription: {}:{}", topicName, qoS); - ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId()); + ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId()); consumerInfo.setDestination(queue); consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch()); consumerInfo.setRetroactive(true); consumerInfo.setDispatchAsync(true); - protocol.doSubscribe(consumerInfo, topicName, qoS); + doSubscribe(consumerInfo, topicName, qoS); // mark this durable subscription as restored by Broker restoredQueues.add(queue);