activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5290
Date Mon, 11 Aug 2014 18:03:42 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 8b64e139f -> ec049a088


https://issues.apache.org/jira/browse/AMQ-5290

Some more refactorings to allow the strategies to have more control of
the subsciption model in use.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ec049a08
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ec049a08
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ec049a08

Branch: refs/heads/trunk
Commit: ec049a0882172b2ed288036f36876a8925b8c1bd
Parents: 8b64e13
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Aug 11 14:03:35 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Aug 11 14:03:35 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 118 +++----------------
 .../AbstractMQTTSubscriptionStrategy.java       | 105 +++++++++++++++++
 .../MQTTDefaultSubscriptionStrategy.java        |  43 ++++---
 .../mqtt/strategy/MQTTSubscriptionStrategy.java |  31 ++++-
 .../MQTTVirtualTopicSubscriptionStrategy.java   |  44 +++----
 5 files changed, 195 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 3969d87..62a6f51 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -41,15 +41,12 @@ import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.ExceptionResponse;
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.command.SessionId;
 import org.apache.activemq.command.SessionInfo;
@@ -93,17 +90,13 @@ public class MQTTProtocolConverter {
     private static final MQTTFrame PING_RESP_FRAME = new PINGRESP().encode();
     private static final double MQTT_KEEP_ALIVE_GRACE_PERIOD= 0.5;
     static final int DEFAULT_CACHE_SIZE = 5000;
-    private static final byte SUBSCRIBE_ERROR = (byte) 0x80;
 
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
     private final ProducerId producerId = new ProducerId(sessionId, 1);
     private final LongSequenceGenerator publisherIdGenerator = new LongSequenceGenerator();
-    private final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new
ConcurrentHashMap<Integer, ResponseHandler>();
-    private final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
-    private final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<String, MQTTSubscription>();
     private final Map<String, ActiveMQDestination> activeMQDestinationMap = new LRUCache<String,
ActiveMQDestination>(DEFAULT_CACHE_SIZE);
     private final Map<Destination, String> mqttTopicMap = new LRUCache<Destination,
String>(DEFAULT_CACHE_SIZE);
 
@@ -120,7 +113,7 @@ public class MQTTProtocolConverter {
     private CONNECT connect;
     private String clientId;
     private long defaultKeepAlive;
-    private int activeMQSubscriptionPrefetch=1;
+    private int activeMQSubscriptionPrefetch = 1;
     protected static final String QOS_PROPERTY_NAME = "ActiveMQ.MQTT.QoS";
     private final MQTTPacketIdGenerator packetIdGenerator;
     private boolean publishDollarTopics;
@@ -351,7 +344,11 @@ public class MQTTProtocolConverter {
         if (topics != null) {
             byte[] qos = new byte[topics.length];
             for (int i = 0; i < topics.length; i++) {
-                qos[i] = onSubscribe(topics[i]);
+                try {
+                    qos[i] = getSubscriptionStrategy().onSubscribe(topics[i]);
+                } catch (IOException e) {
+                    throw new MQTTProtocolException("Failed to process subscription request",
true, e);
+                }
             }
             SUBACK ack = new SUBACK();
             ack.messageId(command.messageId());
@@ -366,71 +363,16 @@ public class MQTTProtocolConverter {
         }
     }
 
-    public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
-
-        final String destinationName = topic.name().toString();
-        final QoS requestedQoS = topic.qos();
-
-        if (mqttSubscriptionByTopic.containsKey(destinationName)) {
-            final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
-            if (requestedQoS != mqttSubscription.getQoS()) {
-                // remove old subscription as the QoS has changed
-                onUnSubscribe(destinationName);
-            } else {
-                try {
-                    getSubscriptionStrategy().onReSubscribe(mqttSubscription);
-                } catch (IOException e) {
-                    throw new MQTTProtocolException("Failed to find subscription strategy",
true, e);
-                }
-                return (byte) requestedQoS.ordinal();
-            }
-        }
-
-        try {
-            return getSubscriptionStrategy().onSubscribe(destinationName, requestedQoS);
-        } catch (IOException e) {
-            throw new MQTTProtocolException("Failed while intercepting subscribe", true,
e);
-        }
-    }
-
-    public byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS
qoS) throws MQTTProtocolException {
-
-        MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicName, qoS, consumerInfo);
-
-        // optimistic add to local maps first to be able to handle commands in onActiveMQCommand
-        subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
-        mqttSubscriptionByTopic.put(topicName, mqttSubscription);
-
-        final byte[] qos = {-1};
-        sendToActiveMQ(consumerInfo, new ResponseHandler() {
-            @Override
-            public void onResponse(MQTTProtocolConverter converter, Response response) throws
IOException {
-                // validate subscription request
-                if (response.isException()) {
-                    final Throwable throwable = ((ExceptionResponse) response).getException();
-                    LOG.warn("Error subscribing to {}", topicName, throwable);
-                    qos[0] = SUBSCRIBE_ERROR;
-                } else {
-                    qos[0] = (byte) qoS.ordinal();
-                }
-            }
-        });
-
-        if (qos[0] == SUBSCRIBE_ERROR) {
-            // remove from local maps if subscribe failed
-            subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
-            mqttSubscriptionByTopic.remove(topicName);
-        }
-
-        return qos[0];
-    }
-
     public void onUnSubscribe(UNSUBSCRIBE command) throws MQTTProtocolException {
         checkConnected();
         UTF8Buffer[] topics = command.topics();
         if (topics != null) {
             for (UTF8Buffer topic : topics) {
-                onUnSubscribe(topic.toString());
+                try {
+                    getSubscriptionStrategy().onUnSubscribe(topic.toString());
+                } catch (IOException e) {
+                    throw new MQTTProtocolException("Failed to process unsubscribe request",
true, e);
+                }
             }
         }
         UNSUBACK ack = new UNSUBACK();
@@ -438,38 +380,6 @@ public class MQTTProtocolConverter {
         sendToMQTT(ack.encode());
     }
 
-    public void onUnSubscribe(String topicName) {
-        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
-        if (subscription != null) {
-            doUnSubscribe(subscription);
-
-            // check if the broker side of the subscription needs to be removed
-            try {
-                getSubscriptionStrategy().onUnSubscribe(subscription);
-            } catch (IOException e) {
-                // Ignore
-            }
-        }
-    }
-
-    public void doUnSubscribe(MQTTSubscription subscription) {
-        mqttSubscriptionByTopic.remove(subscription.getTopicName());
-        ConsumerInfo info = subscription.getConsumerInfo();
-        if (info != null) {
-            subscriptionsByConsumerId.remove(info.getConsumerId());
-        }
-        RemoveInfo removeInfo = null;
-        if (info != null) {
-            removeInfo = info.createRemoveCommand();
-        }
-        sendToActiveMQ(removeInfo, new ResponseHandler() {
-            @Override
-            public void onResponse(MQTTProtocolConverter converter, Response response) throws
IOException {
-                // ignore failures..
-            }
-        });
-    }
-
     /**
      * Dispatch an ActiveMQ command
      */
@@ -488,7 +398,7 @@ public class MQTTProtocolConverter {
             }
         } else if (command.isMessageDispatch()) {
             MessageDispatch md = (MessageDispatch) command;
-            MQTTSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
+            MQTTSubscription sub = getSubscriptionStrategy().getSubscription(md.getConsumerId());
             if (sub != null) {
                 MessageAck ack = sub.createMessageAck(md);
                 PUBLISH publish = sub.createPublish((ActiveMQMessage) md.getMessage());
@@ -848,8 +758,8 @@ public class MQTTProtocolConverter {
         return connectionId;
     }
 
-    public ConsumerId getNextConsumerId() {
-        return new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
+    public SessionId getSessionId() {
+        return sessionId;
     }
 
     public boolean isCleanSession() {

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
index 60259e2..d77c51b 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/AbstractMQTTSubscriptionStrategy.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.transport.mqtt.strategy;
 
+import java.io.IOException;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
@@ -30,9 +32,18 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
 import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
 import org.apache.activemq.transport.mqtt.MQTTProtocolException;
 import org.apache.activemq.transport.mqtt.MQTTSubscription;
+import org.apache.activemq.transport.mqtt.ResponseHandler;
+import org.apache.activemq.util.LongSequenceGenerator;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Abstract implementation of the {@link MQTTSubscriptionStrategy} interface providing
@@ -40,9 +51,18 @@ import org.apache.activemq.transport.mqtt.MQTTSubscription;
  */
 public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscriptionStrategy,
BrokerServiceAware {
 
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractMQTTSubscriptionStrategy.class);
+
+    private static final byte SUBSCRIBE_ERROR = (byte) 0x80;
+
     protected MQTTProtocolConverter protocol;
     protected BrokerService brokerService;
 
+    protected final ConcurrentHashMap<ConsumerId, MQTTSubscription> subscriptionsByConsumerId
= new ConcurrentHashMap<ConsumerId, MQTTSubscription>();
+    protected final ConcurrentHashMap<String, MQTTSubscription> mqttSubscriptionByTopic
= new ConcurrentHashMap<String, MQTTSubscription>();
+
+    protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
+
     @Override
     public void initialize(MQTTProtocolConverter protocol) throws MQTTProtocolException {
         setProtocolConverter(protocol);
@@ -64,6 +84,34 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
     }
 
     @Override
+    public byte onSubscribe(final Topic topic) throws MQTTProtocolException {
+
+        final String destinationName = topic.name().toString();
+        final QoS requestedQoS = topic.qos();
+
+        final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(destinationName);
+        if (mqttSubscription != null) {
+            if (requestedQoS != mqttSubscription.getQoS()) {
+                // remove old subscription as the QoS has changed
+                onUnSubscribe(destinationName);
+            } else {
+                try {
+                    onReSubscribe(mqttSubscription);
+                } catch (IOException e) {
+                    throw new MQTTProtocolException("Failed to find subscription strategy",
true, e);
+                }
+                return (byte) requestedQoS.ordinal();
+            }
+        }
+
+        try {
+            return onSubscribe(destinationName, requestedQoS);
+        } catch (IOException e) {
+            throw new MQTTProtocolException("Failed while intercepting subscribe", true,
e);
+        }
+    }
+
+    @Override
     public void onReSubscribe(MQTTSubscription mqttSubscription) throws MQTTProtocolException
{
         String topicName = mqttSubscription.getTopicName();
 
@@ -126,4 +174,61 @@ public abstract class AbstractMQTTSubscriptionStrategy implements MQTTSubscripti
     public boolean isControlTopic(ActiveMQDestination destination) {
         return destination.getPhysicalName().startsWith("$");
     }
+
+    @Override
+    public MQTTSubscription getSubscription(ConsumerId consumerId) {
+        return subscriptionsByConsumerId.get(consumerId);
+    }
+
+    protected ConsumerId getNextConsumerId() {
+        return new ConsumerId(protocol.getSessionId(), consumerIdGenerator.getNextSequenceId());
+    }
+
+    protected byte doSubscribe(ConsumerInfo consumerInfo, final String topicName, final QoS
qoS) throws MQTTProtocolException {
+
+        MQTTSubscription mqttSubscription = new MQTTSubscription(protocol, topicName, qoS,
consumerInfo);
+
+        // optimistic add to local maps first to be able to handle commands in onActiveMQCommand
+        subscriptionsByConsumerId.put(consumerInfo.getConsumerId(), mqttSubscription);
+        mqttSubscriptionByTopic.put(topicName, mqttSubscription);
+
+        final byte[] qos = {-1};
+        protocol.sendToActiveMQ(consumerInfo, new ResponseHandler() {
+            @Override
+            public void onResponse(MQTTProtocolConverter converter, Response response) throws
IOException {
+                // validate subscription request
+                if (response.isException()) {
+                    final Throwable throwable = ((ExceptionResponse) response).getException();
+                    LOG.warn("Error subscribing to {}", topicName, throwable);
+                    qos[0] = SUBSCRIBE_ERROR;
+                } else {
+                    qos[0] = (byte) qoS.ordinal();
+                }
+            }
+        });
+
+        if (qos[0] == SUBSCRIBE_ERROR) {
+            // remove from local maps if subscribe failed
+            subscriptionsByConsumerId.remove(consumerInfo.getConsumerId());
+            mqttSubscriptionByTopic.remove(topicName);
+        }
+
+        return qos[0];
+    }
+
+    public void doUnSubscribe(MQTTSubscription subscription) {
+        mqttSubscriptionByTopic.remove(subscription.getTopicName());
+        ConsumerInfo info = subscription.getConsumerInfo();
+        if (info != null) {
+            subscriptionsByConsumerId.remove(info.getConsumerId());
+
+            RemoveInfo removeInfo = info.createRemoveCommand();
+            protocol.sendToActiveMQ(removeInfo, new ResponseHandler() {
+                @Override
+                public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
+                    // ignore failures..
+                }
+            });
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
index 4e8f362..14530bd 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTDefaultSubscriptionStrategy.java
@@ -68,7 +68,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
     public byte onSubscribe(String topicName, QoS requestedQoS) throws MQTTProtocolException
{
         ActiveMQDestination destination = new ActiveMQTopic(MQTTProtocolSupport.convertMQTTToActiveMQ(topicName));
 
-        ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
         consumerInfo.setDestination(destination);
         consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
         consumerInfo.setRetroactive(true);
@@ -78,7 +78,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
             consumerInfo.setSubscriptionName(requestedQoS + ":" + topicName);
         }
 
-        return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
+        return doSubscribe(consumerInfo, topicName, requestedQoS);
     }
 
     @Override
@@ -96,22 +96,27 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
     }
 
     @Override
-    public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException
{
-        // check if the durable sub also needs to be removed
-        if (subscription.getConsumerInfo().getSubscriptionName() != null) {
-            // also remove it from restored durable subscriptions set
-            restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
-
-            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
-            rsi.setConnectionId(protocol.getConnectionId());
-            rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
-            rsi.setClientId(protocol.getClientId());
-            protocol.sendToActiveMQ(rsi, new ResponseHandler() {
-                @Override
-                public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
-                    // ignore failures..
-                }
-            });
+    public void onUnSubscribe(String topicName) throws MQTTProtocolException {
+        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
+        if (subscription != null) {
+            doUnSubscribe(subscription);
+
+            // check if the durable sub also needs to be removed
+            if (subscription.getConsumerInfo().getSubscriptionName() != null) {
+                // also remove it from restored durable subscriptions set
+                restoredSubs.remove(MQTTProtocolSupport.convertMQTTToActiveMQ(subscription.getTopicName()));
+
+                RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+                rsi.setConnectionId(protocol.getConnectionId());
+                rsi.setSubscriptionName(subscription.getConsumerInfo().getSubscriptionName());
+                rsi.setClientId(protocol.getClientId());
+                protocol.sendToActiveMQ(rsi, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
+                        // ignore failures..
+                    }
+                });
+            }
         }
     }
 
@@ -140,7 +145,7 @@ public class MQTTDefaultSubscriptionStrategy extends AbstractMQTTSubscriptionStr
                 String name = sub.getSubcriptionName();
                 String[] split = name.split(":", 2);
                 QoS qoS = QoS.valueOf(split[0]);
-                protocol.onSubscribe(new Topic(split[1], qoS));
+                onSubscribe(new Topic(split[1], qoS));
                 // mark this durable subscription as restored by Broker
                 restoredSubs.add(split[1]);
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
index 0eaa72a..5bea11d 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTSubscriptionStrategy.java
@@ -17,10 +17,12 @@
 package org.apache.activemq.transport.mqtt.strategy;
 
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.transport.mqtt.MQTTProtocolConverter;
 import org.apache.activemq.transport.mqtt.MQTTProtocolException;
 import org.apache.activemq.transport.mqtt.MQTTSubscription;
 import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.codec.CONNECT;
 
 /**
@@ -50,6 +52,19 @@ public interface MQTTSubscriptionStrategy {
     public void onConnect(CONNECT connect) throws MQTTProtocolException;
 
     /**
+     * Called for each Topic that a client requests to subscribe to.  The strategy needs
+     * check each Topic for duplicate subscription requests and change of QoS state.
+     *
+     * @param topic
+     *        the MQTT Topic instance being subscribed to.
+     *
+     * @return the assigned QoS value given to the new subscription.
+     *
+     * @throws MQTTProtocolException if an error occurs while processing the subscribe actions.
+     */
+    public byte onSubscribe(Topic topic) throws MQTTProtocolException;
+
+    /**
      * Called when a new Subscription is being requested.  This method allows the
      * strategy to create a specific type of subscription for the client such as
      * mapping topic subscriptions to Queues etc.
@@ -80,12 +95,12 @@ public interface MQTTSubscriptionStrategy {
     /**
      * Called when a client requests an un-subscribe a previous subscription.
      *
-     * @param subscription
-     *        the {@link MQTTSubscription} that is being removed.
+     * @param topicName
+     *        the name of the Topic the client wishes to unsubscribe from.
      *
      * @throws MQTTProtocolException if an error occurs during the un-subscribe processing.
      */
-    public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException;
+    public void onUnSubscribe(String topicName) throws MQTTProtocolException;
 
     /**
      * Intercepts PUBLISH operations from the client and allows the strategy to map the
@@ -136,4 +151,14 @@ public interface MQTTSubscriptionStrategy {
      */
     public MQTTProtocolConverter getProtocolConverter();
 
+    /**
+     * Lookup an {@link MQTTSubscription} instance based on known {@link ConsumerId} value.
+     *
+     * @param consumer
+     *        the consumer ID to lookup.
+     *
+     * @return the {@link MQTTSubscription} for the consumer or null if no subscription exists.
+     */
+    public MQTTSubscription getSubscription(ConsumerId consumer);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ec049a08/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
index 64995c6..0778764 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/strategy/MQTTVirtualTopicSubscriptionStrategy.java
@@ -97,13 +97,13 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
             destination = new ActiveMQTopic(converted);
         }
 
-        ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+        ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
         consumerInfo.setDestination(destination);
         consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
         consumerInfo.setRetroactive(true);
         consumerInfo.setDispatchAsync(true);
 
-        return protocol.doSubscribe(consumerInfo, topicName, requestedQoS);
+        return doSubscribe(consumerInfo, topicName, requestedQoS);
     }
 
     @Override
@@ -120,27 +120,31 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
         if (mqttSubscription.getDestination().isTopic()) {
             super.onReSubscribe(mqttSubscription);
         } else {
-            protocol.doUnSubscribe(mqttSubscription);
+            doUnSubscribe(mqttSubscription);
             ConsumerInfo consumerInfo = mqttSubscription.getConsumerInfo();
-            consumerInfo.setConsumerId(protocol.getNextConsumerId());
-            protocol.doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
+            consumerInfo.setConsumerId(getNextConsumerId());
+            doSubscribe(consumerInfo, mqttSubscription.getTopicName(), mqttSubscription.getQoS());
         }
     }
 
     @Override
-    public void onUnSubscribe(MQTTSubscription subscription) throws MQTTProtocolException
{
-        if (subscription.getDestination().isQueue()) {
-            DestinationInfo remove = new DestinationInfo();
-            remove.setConnectionId(protocol.getConnectionId());
-            remove.setDestination(subscription.getDestination());
-            remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
-
-            protocol.sendToActiveMQ(remove, new ResponseHandler() {
-                @Override
-                public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
-                    // ignore failures..
-                }
-            });
+    public void onUnSubscribe(String topicName) throws MQTTProtocolException {
+        MQTTSubscription subscription = mqttSubscriptionByTopic.remove(topicName);
+        if (subscription != null) {
+            doUnSubscribe(subscription);
+            if (subscription.getDestination().isQueue()) {
+                DestinationInfo remove = new DestinationInfo();
+                remove.setConnectionId(protocol.getConnectionId());
+                remove.setDestination(subscription.getDestination());
+                remove.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
+
+                protocol.sendToActiveMQ(remove, new ResponseHandler() {
+                    @Override
+                    public void onResponse(MQTTProtocolConverter converter, Response response)
throws IOException {
+                        // ignore failures..
+                    }
+                });
+            }
         }
     }
 
@@ -203,13 +207,13 @@ public class MQTTVirtualTopicSubscriptionStrategy extends AbstractMQTTSubscripti
                 QoS qoS = QoS.valueOf(qosString);
                 LOG.trace("Restoring subscription: {}:{}", topicName, qoS);
 
-                ConsumerInfo consumerInfo = new ConsumerInfo(protocol.getNextConsumerId());
+                ConsumerInfo consumerInfo = new ConsumerInfo(getNextConsumerId());
                 consumerInfo.setDestination(queue);
                 consumerInfo.setPrefetchSize(protocol.getActiveMQSubscriptionPrefetch());
                 consumerInfo.setRetroactive(true);
                 consumerInfo.setDispatchAsync(true);
 
-                protocol.doSubscribe(consumerInfo, topicName, qoS);
+                doSubscribe(consumerInfo, topicName, qoS);
 
                 // mark this durable subscription as restored by Broker
                 restoredQueues.add(queue);


Mime
View raw message