activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5108 - MQTT subscriptions for cleansession=true MUST be non-durable - patch applied with thanks to Dhiraj Bokde
Date Wed, 19 Mar 2014 14:26:38 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk 47d72dd32 -> bf8eb0e6c


https://issues.apache.org/jira/browse/AMQ-5108 - MQTT subscriptions for cleansession=true
MUST be non-durable - patch applied with thanks to Dhiraj Bokde


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bf8eb0e6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bf8eb0e6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bf8eb0e6

Branch: refs/heads/trunk
Commit: bf8eb0e6ca595308d5a20b4301d985e68903a3dd
Parents: 47d72dd
Author: gtully <gary.tully@gmail.com>
Authored: Wed Mar 19 14:24:51 2014 +0000
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Mar 19 14:24:51 2014 +0000

----------------------------------------------------------------------
 .../transport/mqtt/MQTTProtocolConverter.java   |  47 ++++----
 .../activemq/transport/mqtt/MQTTNioTest.java    |  19 ++-
 .../activemq/transport/mqtt/MQTTSSLTest.java    |  15 +++
 .../activemq/transport/mqtt/MQTTTest.java       | 119 ++++++++++++++-----
 4 files changed, 143 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb0e6/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
index 91f81ab..614b133 100644
--- a/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
+++ b/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java
@@ -398,7 +398,8 @@ public class MQTTProtocolConverter {
         consumerInfo.setDestination(destination);
         consumerInfo.setPrefetchSize(getActiveMQSubscriptionPrefetch());
         consumerInfo.setDispatchAsync(true);
-        if ( connect.clientId() != null && topic.qos().ordinal() >= QoS.AT_LEAST_ONCE.ordinal()
) {
+        // create durable subscriptions only when cleansession is false
+        if ( !connect.cleanSession() && connect.clientId() != null && topic.qos().ordinal()
>= QoS.AT_LEAST_ONCE.ordinal() ) {
             consumerInfo.setSubscriptionName(topic.qos()+":"+topic.name().toString());
         }
         MQTTSubscription mqttSubscription = new MQTTSubscription(this, topic.qos(), consumerInfo);
@@ -410,7 +411,7 @@ public class MQTTProtocolConverter {
                 // validate subscription request
                 if (response.isException()) {
                     final Throwable throwable = ((ExceptionResponse) response).getException();
-                    LOG.debug("Error subscribing to " + topic.name(), throwable);
+                    LOG.warn("Error subscribing to " + topic.name(), throwable);
                     qos[0] = SUBSCRIBE_ERROR;
                 } else {
                     qos[0] = (byte) topic.qos().ordinal();
@@ -654,22 +655,26 @@ public class MQTTProtocolConverter {
     boolean willSent = false;
     public void onTransportError() {
         if (connect != null) {
-            if (connected.get() && connect.willTopic() != null && connect.willMessage()
!= null && !willSent) {
-                willSent = true;
-                try {
-                    PUBLISH publish = new PUBLISH();
-                    publish.topicName(connect.willTopic());
-                    publish.qos(connect.willQos());
-                    publish.messageId(getNextSequenceId());
-                    publish.payload(connect.willMessage());
-                    ActiveMQMessage message = convertMessage(publish);
-                    message.setProducerId(producerId);
-                    message.onSend();
-
-                    sendToActiveMQ(message, null);
-                } catch (Exception e) {
-                    LOG.warn("Failed to publish Will Message " + connect.willMessage());
+            if (connected.get()) {
+                if (connect.willTopic() != null && connect.willMessage() != null
&& !willSent) {
+                    willSent = true;
+                    try {
+                        PUBLISH publish = new PUBLISH();
+                        publish.topicName(connect.willTopic());
+                        publish.qos(connect.willQos());
+                        publish.messageId(getNextSequenceId());
+                        publish.payload(connect.willMessage());
+                        ActiveMQMessage message = convertMessage(publish);
+                        message.setProducerId(producerId);
+                        message.onSend();
+
+                        sendToActiveMQ(message, null);
+                    } catch (Exception e) {
+                        LOG.warn("Failed to publish Will Message " + connect.willMessage());
+                    }
                 }
+                // remove connection info
+                sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
             }
         }
     }
@@ -721,11 +726,11 @@ public class MQTTProtocolConverter {
             LOG.debug("Exception detail", exception);
         }
 
-        try {
-            getMQTTTransport().stop();
-        } catch (Throwable e) {
-            LOG.error("Failed to stop MQTTT Transport ", e);
+        if (connected.get() && connectionInfo != null) {
+            connected.set(false);
+            sendToActiveMQ(connectionInfo.createRemoveCommand(), null);
         }
+        stopTransport();
     }
 
     void checkConnected() throws MQTTProtocolException {

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb0e6/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
index c89c84e..2433d2e 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTNioTest.java
@@ -16,11 +16,17 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.util.LinkedList;
+
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.filter.DestinationMapEntry;
-import org.apache.activemq.security.*;
+import org.apache.activemq.security.AuthenticationUser;
+import org.apache.activemq.security.AuthorizationEntry;
+import org.apache.activemq.security.AuthorizationPlugin;
+import org.apache.activemq.security.DefaultAuthorizationMap;
+import org.apache.activemq.security.SimpleAuthenticationPlugin;
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
@@ -29,10 +35,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.BlockJUnit4ClassRunner;
 
-import java.util.LinkedList;
-
-import static org.junit.Assert.assertTrue;
-
 @RunWith(BlockJUnit4ClassRunner.class)
 public class MQTTNioTest extends MQTTTest {
 
@@ -48,6 +50,13 @@ public class MQTTNioTest extends MQTTTest {
         super.testReceiveMessageSentWhileOffline();
     }
 
+    @Ignore("See AMQ-4712")
+    @Override
+    @Test
+    public void testResendMessageId() throws Exception {
+        super.testResendMessageId();
+    }
+
     @Test
     public void testPingOnMQTTNIO() throws Exception {
         addMQTTConnector("maxInactivityDuration=-1");

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb0e6/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
index c98f921..6b44ae2 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTSSLTest.java
@@ -60,6 +60,12 @@ public class MQTTSSLTest extends MQTTTest {
         super.testReceiveMessageSentWhileOffline();
     }
 
+    @Ignore("See AMQ-4712")
+    @Override
+    @Test
+    public void testResendMessageId() throws Exception {
+        super.testResendMessageId();
+    }
 
     protected MQTT createMQTTConnection() throws Exception {
         MQTT mqtt = new MQTT();
@@ -73,6 +79,15 @@ public class MQTTSSLTest extends MQTTTest {
         return mqtt;
     }
 
+    protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception
{
+        MQTT mqtt = createMQTTConnection();
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
+        return mqtt;
+    }
+
     protected void initializeConnection(MQTTClientProvider provider) throws Exception {
         SSLContext ctx = SSLContext.getInstance("TLS");
         ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom());

http://git-wip-us.apache.org/repos/asf/activemq/blob/bf8eb0e6/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index f44dfd4..d5d3983 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.transport.mqtt;
 
 import java.net.ProtocolException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -115,7 +116,7 @@ public class MQTTTest extends AbstractMQTTTest {
 
         String topic = "foo/bah";
 
-        subscriptionProvider.subscribe(topic,AT_MOST_ONCE);
+        subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
 
         final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
 
@@ -578,7 +579,7 @@ public class MQTTTest extends AbstractMQTTTest {
 
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
-        mqtt.setKeepAlive((short)2);
+        mqtt.setKeepAlive((short) 2);
         mqtt.setCleanSession(true);
 
         final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
@@ -673,10 +674,7 @@ public class MQTTTest extends AbstractMQTTTest {
         addMQTTConnector();
         brokerService.start();
 
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId("foo");
-        mqtt.setKeepAlive((short)2);
-        mqtt.setCleanSession(true);
+        final MQTT mqtt = createMQTTConnection("resend", false);
 
         final List<PUBLISH> publishList = new ArrayList<PUBLISH>();
         mqtt.setTracer(new Tracer() {
@@ -729,8 +727,6 @@ public class MQTTTest extends AbstractMQTTTest {
         // drop subs without acknowledging messages, then subscribe and receive again
         connection.unsubscribe(subs);
         connection.subscribe(new Topic[]{new Topic(subs[0], QoS.AT_LEAST_ONCE), new Topic(subs[1],
QoS.EXACTLY_ONCE)});
-        // wait for all acks to be processed
-        Thread.sleep(1000);
 
         msg = connection.receive(5000, TimeUnit.MILLISECONDS);
         assertNotNull(msg);
@@ -742,24 +738,98 @@ public class MQTTTest extends AbstractMQTTTest {
         msg.ack();
 
         // make sure we received duplicate message ids
-        for (int i = 0; i < publishList.size(); i++) {
-            boolean found = false;
-            for (int j = 0; j < publishList.size(); j++) {
-                if (i != j) {
+        List<Integer> dups = new ArrayList<Integer>();
+        for (int i = 0; i < publishList.size() - 1; i++) {
+            if (!dups.contains(i)) {
+                boolean found = false;
+                for (int j = i + 1; j < publishList.size(); j++) {
                     if (publishList.get(i).messageId() == publishList.get(j).messageId())
{
                         // one of them is a duplicate
                         assertTrue(publishList.get(i).dup() || publishList.get(j).dup());
                         found = true;
+                        dups.add(j);
+                        break;
                     }
                 }
+                assertTrue("Dup Not found " + publishList.get(i), found);
             }
-            assertTrue("Dup Not found " + publishList.get(i), found);
         }
 
         connection.unsubscribe(subs);
         connection.disconnect();
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testClientConnectionFailure() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        MQTT mqtt = createMQTTConnection("reconnect", false);
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        assertTrue(connection.isConnected());
+
+        final String TOPIC = "TopicA";
+        final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
+        connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+        // kill transport
+        connection.kill();
+
+        connection = mqtt.blockingConnection();
+        connection.connect();
+        assertTrue(connection.isConnected());
+
+        assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
+        Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(TOPIC, new String(msg.getPayload()));
+        msg.ack();
+        connection.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testCleanSession() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+
+        final String CLIENTID = "cleansession";
+        final MQTT mqttNotClean = createMQTTConnection(CLIENTID, false);
+        BlockingConnection notClean = mqttNotClean.blockingConnection();
+        final String TOPIC = "TopicA";
+        notClean.connect();
+        notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+        notClean.disconnect();
+
+        // MUST receive message from previous not clean session
+        notClean = mqttNotClean.blockingConnection();
+        notClean.connect();
+        Message msg = notClean.receive(10000, TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(TOPIC, new String(msg.getPayload()));
+        msg.ack();
+        notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+        notClean.disconnect();
+
+        // MUST NOT receive message from previous not clean session
+        final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
+        final BlockingConnection clean = mqttClean.blockingConnection();
+        clean.connect();
+        msg = clean.receive(10000, TimeUnit.MILLISECONDS);
+        assertNull(msg);
+        clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+        clean.disconnect();
+
+        // MUST NOT receive message from previous clean session
+        notClean = mqttNotClean.blockingConnection();
+        notClean.connect();
+        msg = notClean.receive(1000, TimeUnit.MILLISECONDS);
+        assertNull(msg);
+        notClean.disconnect();
+    }
+
     @Test(timeout=60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
         addMQTTConnector();
@@ -948,7 +1018,7 @@ public class MQTTTest extends AbstractMQTTTest {
 
     @Test(timeout=60 * 1000)
     public void testReceiveMessageSentWhileOffline() throws Exception {
-        byte[] payload = new byte[1024 * 32];
+        final byte[] payload = new byte[1024 * 32];
         for (int i = 0; i < payload.length; i++){
             payload[i] = '2';
         }
@@ -958,12 +1028,9 @@ public class MQTTTest extends AbstractMQTTTest {
 
         addMQTTConnector("trace=true");
         brokerService.start();
-        MQTT mqttPub = createMQTTConnection();
-        mqttPub.setClientId("MQTT-Pub-Client");
+        final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
 
-        MQTT mqttSub = createMQTTConnection();
-        mqttSub.setClientId("MQTT-Sub-Client");
-        mqttSub.setCleanSession(false);
+        final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
 
         final BlockingConnection connectionPub = mqttPub.blockingConnection();
         connectionPub.connect();
@@ -983,10 +1050,7 @@ public class MQTTTest extends AbstractMQTTTest {
             Message message = connectionSub.receive(5, TimeUnit.SECONDS);
             assertNotNull(message);
             received++;
-            payload = message.getPayload();
-            String messageContent = new String(payload);
-            LOG.info("Received message from topic: " + message.getTopic() +
-                     " Message content: " + messageContent);
+            assertTrue(Arrays.equals(payload, message.getPayload()));
             message.ack();
         }
         connectionSub.disconnect();
@@ -997,10 +1061,6 @@ public class MQTTTest extends AbstractMQTTTest {
                 connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
             }
 
-            mqttSub = createMQTTConnection();
-            mqttSub.setClientId("MQTT-Sub-Client");
-            mqttSub.setCleanSession(false);
-
             connectionSub = mqttSub.blockingConnection();
             connectionSub.connect();
             connectionSub.subscribe(topics);
@@ -1009,10 +1069,7 @@ public class MQTTTest extends AbstractMQTTTest {
                 Message message = connectionSub.receive(5, TimeUnit.SECONDS);
                 assertNotNull(message);
                 received++;
-                payload = message.getPayload();
-                String messageContent = new String(payload);
-                LOG.info("Received message from topic: " + message.getTopic() +
-                         " Message content: " + messageContent);
+                assertTrue(Arrays.equals(payload, message.getPayload()));
                 message.ack();
             }
             connectionSub.disconnect();


Mime
View raw message