activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5299
Date Thu, 31 Jul 2014 00:03:59 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 2cd54248c -> efa55278e


https://issues.apache.org/jira/browse/AMQ-5299

Fix duplicate call to unsubscribe.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/efa55278
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/efa55278
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/efa55278

Branch: refs/heads/trunk
Commit: efa55278ec3a7045240c56bbaa74fcbcff837159
Parents: 2cd5424
Author: Timothy Bish <tabish121@gmail.com>
Authored: Wed Jul 30 20:03:48 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Jul 30 20:03:48 2014 -0400

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   | 24 ++++++++------------
 1 file changed, 9 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/efa55278/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index cc51ce7..eb5bb2b 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -151,11 +151,12 @@ public class MQTTProtocolConverter {
     void sendToActiveMQ(Command command, ResponseHandler handler) {
 
         // Lets intercept message send requests..
-        if( command instanceof ActiveMQMessage) {
+        if (command instanceof ActiveMQMessage) {
             ActiveMQMessage msg = (ActiveMQMessage) command;
-            if( !getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$")
) {
-                // We don't allow users to send to $ prefixed topics to avoid failing MQTT
3.1.1 spec requirements
-                if( handler!=null ) {
+            if (!getPublishDollarTopics() && msg.getDestination().getPhysicalName().startsWith("$"))
{
+                // We don't allow users to send to $ prefixed topics to avoid failing MQTT
3.1.1
+                // specification requirements
+                if (handler != null) {
                     try {
                         handler.onResponse(this, new Response());
                     } catch (IOException e) {
@@ -186,7 +187,6 @@ public class MQTTProtocolConverter {
      * Convert a MQTT command
      */
     public void onMQTTCommand(MQTTFrame frame) throws IOException, JMSException {
-
         switch (frame.messageType()) {
             case PINGREQ.TYPE: {
                 LOG.debug("Received a ping from client: " + getClientId());
@@ -240,7 +240,6 @@ public class MQTTProtocolConverter {
     }
 
     void onMQTTConnect(final CONNECT connect) throws MQTTProtocolException {
-
         if (connected.get()) {
             throw new MQTTProtocolException("Already connected.");
         }
@@ -333,7 +332,7 @@ public class MQTTProtocolConverter {
                         getMQTTTransport().sendToMQTT(ack.encode());
 
                         List<SubscriptionInfo> subs = PersistenceAdapterSupport.listSubscriptions(brokerService.getPersistenceAdapter(),
connectionInfo.getClientId());
-                        if( connect.cleanSession() ) {
+                        if (connect.cleanSession()) {
                             packetIdGenerator.stopClientSession(getClientId());
                             deleteDurableSubs(subs);
                         } else {
@@ -417,18 +416,16 @@ public class MQTTProtocolConverter {
         final QoS topicQoS = topic.qos();
         ActiveMQDestination destination = new ActiveMQTopic(convertMQTTToActiveMQ(topicName));
 
-        if( mqttSubscriptionByTopic.containsKey(topicName)) {
+        if (mqttSubscriptionByTopic.containsKey(topicName)) {
             final MQTTSubscription mqttSubscription = mqttSubscriptionByTopic.get(topicName);
             if (topicQoS != mqttSubscription.qos()) {
                 // remove old subscription as the QoS has changed
                 onUnSubscribe(topicName);
             } else {
-                // duplicate SUBSCRIBE packet, find all matching topics and resend retained
messages
+                // duplicate SUBSCRIBE packet, find all matching topics and re-send retained
messages
                 resendRetainedMessages(topicName, destination, mqttSubscription);
-
                 return (byte) topicQoS.ordinal();
             }
-            onUnSubscribe(topicName);
         }
 
         ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
@@ -438,7 +435,7 @@ public class MQTTProtocolConverter {
         consumerInfo.setRetroactive(true);
         consumerInfo.setDispatchAsync(true);
         // create durable subscriptions only when cleansession is false
-        if ( !connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal()
>= QoS.AT_LEAST_ONCE.ordinal() ) {
+        if (!connect.cleanSession() && connect.clientId() != null && topicQoS.ordinal()
>= QoS.AT_LEAST_ONCE.ordinal()) {
             consumerInfo.setSubscriptionName(topicQoS + ":" + topicName);
         }
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topicQoS, consumerInfo);
@@ -914,7 +911,6 @@ public class MQTTProtocolConverter {
         char[] chars = name.toCharArray();
         for (int i = 0; i < chars.length; i++) {
             switch(chars[i]) {
-
                 case '#':
                     chars[i] = '>';
                     break;
@@ -928,14 +924,12 @@ public class MQTTProtocolConverter {
                 case '*':
                     chars[i] = '+';
                     break;
-
                 case '/':
                     chars[i] = '.';
                     break;
                 case '.':
                     chars[i] = '/';
                     break;
-
             }
         }
         String rc = new String(chars);


Mime
View raw message