activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5127
Date Mon, 31 Mar 2014 16:42:51 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk f5ddcf06d -> f4a8b117c


https://issues.apache.org/jira/browse/AMQ-5127

Add test case to show this is now fixed in 5.10

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f4a8b117
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f4a8b117
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f4a8b117

Branch: refs/heads/trunk
Commit: f4a8b117cefced1a0eeadf65efd60c96f99ff144
Parents: f5ddcf0
Author: Timothy Bish <tabish121@gmai.com>
Authored: Mon Mar 31 12:42:44 2014 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Mon Mar 31 12:42:44 2014 -0400

----------------------------------------------------------------------
 .../activemq/transport/mqtt/MQTTTest.java       | 271 +++++++++++--------
 1 file changed, 155 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f4a8b117/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index 31198ba..17ad1e4 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotEquals;
+
 import java.net.ProtocolException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -26,6 +29,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Destination;
@@ -34,8 +38,6 @@ import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertNotEquals;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerPlugin;
@@ -65,22 +67,21 @@ public class MQTTTest extends AbstractMQTTTest {
 
     private static final Logger LOG = LoggerFactory.getLogger(MQTTTest.class);
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAndReceiveMQTT() throws Exception {
         addMQTTConnector();
         brokerService.start();
         final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
         initializeConnection(subscriptionProvider);
 
-
-        subscriptionProvider.subscribe("foo/bah",AT_MOST_ONCE);
+        subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
 
         final CountDownLatch latch = new CountDownLatch(numberOfMessages);
 
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
-                for (int i = 0; i < numberOfMessages; i++){
+                for (int i = 0; i < numberOfMessages; i++) {
                     try {
                         byte[] payload = subscriptionProvider.receive(10000);
                         assertNotNull("Should get a message", payload);
@@ -98,9 +99,9 @@ public class MQTTTest extends AbstractMQTTTest {
         final MQTTClientProvider publishProvider = getMQTTClientProvider();
         initializeConnection(publishProvider);
 
-        for (int i = 0; i < numberOfMessages; i++){
+        for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Message " + i;
-            publishProvider.publish("foo/bah",payload.getBytes(),AT_LEAST_ONCE);
+            publishProvider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
         }
 
         latch.await(10, TimeUnit.SECONDS);
@@ -109,7 +110,7 @@ public class MQTTTest extends AbstractMQTTTest {
         publishProvider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testUnsubscribeMQTT() throws Exception {
         addMQTTConnector();
         brokerService.start();
@@ -120,12 +121,12 @@ public class MQTTTest extends AbstractMQTTTest {
 
         subscriptionProvider.subscribe(topic, AT_MOST_ONCE);
 
-        final CountDownLatch latch = new CountDownLatch(numberOfMessages/2);
+        final CountDownLatch latch = new CountDownLatch(numberOfMessages / 2);
 
         Thread thread = new Thread(new Runnable() {
             @Override
             public void run() {
-                for (int i = 0; i < numberOfMessages; i++){
+                for (int i = 0; i < numberOfMessages; i++) {
                     try {
                         byte[] payload = subscriptionProvider.receive(10000);
                         assertNotNull("Should get a message", payload);
@@ -143,12 +144,12 @@ public class MQTTTest extends AbstractMQTTTest {
         final MQTTClientProvider publishProvider = getMQTTClientProvider();
         initializeConnection(publishProvider);
 
-        for (int i = 0; i < numberOfMessages; i++){
+        for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Message " + i;
-            if (i == numberOfMessages/2){
+            if (i == numberOfMessages / 2) {
                 subscriptionProvider.unsubscribe(topic);
             }
-            publishProvider.publish(topic,payload.getBytes(),AT_LEAST_ONCE);
+            publishProvider.publish(topic, payload.getBytes(), AT_LEAST_ONCE);
         }
 
         latch.await(10, TimeUnit.SECONDS);
@@ -157,19 +158,19 @@ public class MQTTTest extends AbstractMQTTTest {
         publishProvider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAtMostOnceReceiveExactlyOnce() throws Exception {
         /**
          * Although subscribing with EXACTLY ONCE, the message gets published
-         * with AT_MOST_ONCE - in MQTT the QoS is always determined by the message
-         * as published - not the wish of the subscriber
+         * with AT_MOST_ONCE - in MQTT the QoS is always determined by the
+         * message as published - not the wish of the subscriber
          */
         addMQTTConnector();
         brokerService.start();
 
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
-        provider.subscribe("foo",EXACTLY_ONCE);
+        provider.subscribe("foo", EXACTLY_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
@@ -180,14 +181,14 @@ public class MQTTTest extends AbstractMQTTTest {
         provider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAtLeastOnceReceiveExactlyOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
 
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
-        provider.subscribe("foo",EXACTLY_ONCE);
+        provider.subscribe("foo", EXACTLY_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
@@ -198,14 +199,14 @@ public class MQTTTest extends AbstractMQTTTest {
         provider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAtLeastOnceReceiveAtMostOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
 
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
-        provider.subscribe("foo",AT_MOST_ONCE);
+        provider.subscribe("foo", AT_MOST_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
@@ -216,14 +217,14 @@ public class MQTTTest extends AbstractMQTTTest {
         provider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAndReceiveAtMostOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
 
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
-        provider.subscribe("foo",AT_MOST_ONCE);
+        provider.subscribe("foo", AT_MOST_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_MOST_ONCE);
@@ -234,14 +235,14 @@ public class MQTTTest extends AbstractMQTTTest {
         provider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAndReceiveAtLeastOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
 
         final MQTTClientProvider provider = getMQTTClientProvider();
         initializeConnection(provider);
-        provider.subscribe("foo",AT_LEAST_ONCE);
+        provider.subscribe("foo", AT_LEAST_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             provider.publish("foo", payload.getBytes(), AT_LEAST_ONCE);
@@ -252,7 +253,7 @@ public class MQTTTest extends AbstractMQTTTest {
         provider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAndReceiveExactlyOnce() throws Exception {
         addMQTTConnector();
         brokerService.start();
@@ -262,22 +263,22 @@ public class MQTTTest extends AbstractMQTTTest {
         final MQTTClientProvider subscriber = getMQTTClientProvider();
         initializeConnection(subscriber);
 
-        subscriber.subscribe("foo",EXACTLY_ONCE);
+        subscriber.subscribe("foo", EXACTLY_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "Test Message: " + i;
             publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
             byte[] message = subscriber.receive(5000);
-            assertNotNull("Should get a message + ["+ i + "]", message);
+            assertNotNull("Should get a message + [" + i + "]", message);
             assertEquals(payload, new String(message));
         }
         subscriber.disconnect();
         publisher.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAndReceiveLargeMessages() throws Exception {
         byte[] payload = new byte[1024 * 32];
-        for (int i = 0; i < payload.length; i++){
+        for (int i = 0; i < payload.length; i++) {
             payload[i] = '2';
         }
         addMQTTConnector();
@@ -289,7 +290,7 @@ public class MQTTTest extends AbstractMQTTTest {
         final MQTTClientProvider subscriber = getMQTTClientProvider();
         initializeConnection(subscriber);
 
-        subscriber.subscribe("foo",AT_LEAST_ONCE);
+        subscriber.subscribe("foo", AT_LEAST_ONCE);
         for (int i = 0; i < 10; i++) {
             publisher.publish("foo", payload, AT_LEAST_ONCE);
             byte[] message = subscriber.receive(5000);
@@ -301,7 +302,7 @@ public class MQTTTest extends AbstractMQTTTest {
         publisher.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendAndReceiveRetainedMessages() throws Exception {
 
         addMQTTConnector();
@@ -317,20 +318,20 @@ public class MQTTTest extends AbstractMQTTTest {
         publisher.publish("foo", RETAINED.getBytes(), AT_LEAST_ONCE, true);
 
         List<String> messages = new ArrayList<String>();
-        for (int i = 0; i < 10; i++){
+        for (int i = 0; i < 10; i++) {
             messages.add("TEST MESSAGE:" + i);
         }
 
-        subscriber.subscribe("foo",AT_LEAST_ONCE);
+        subscriber.subscribe("foo", AT_LEAST_ONCE);
 
         for (int i = 0; i < 10; i++) {
             publisher.publish("foo", messages.get(i).getBytes(), AT_LEAST_ONCE);
         }
         byte[] msg = subscriber.receive(5000);
         assertNotNull(msg);
-        assertEquals(RETAINED,new String(msg));
+        assertEquals(RETAINED, new String(msg));
 
-        for (int i =0; i < 10; i++){
+        for (int i = 0; i < 10; i++) {
             msg = subscriber.receive(5000);
             assertNotNull(msg);
             assertEquals(messages.get(i), new String(msg));
@@ -339,7 +340,7 @@ public class MQTTTest extends AbstractMQTTTest {
         publisher.disconnect();
     }
 
-    @Test(timeout=30000)
+    @Test(timeout = 30 * 1000)
     public void testValidZeroLengthClientId() throws Exception {
         addMQTTConnector();
         brokerService.start();
@@ -366,12 +367,12 @@ public class MQTTTest extends AbstractMQTTTest {
         connection.connect();
 
         final String RETAINED = "RETAINED";
-        String[] topics = {"TopicA", "/TopicA", "/", "TopicA/", "//"};
+        String[] topics = { "TopicA", "/TopicA", "/", "TopicA/", "//" };
         for (String topic : topics) {
             // test retained message
             connection.publish(topic, (RETAINED + topic).getBytes(), QoS.AT_LEAST_ONCE, true);
 
-            connection.subscribe(new Topic[]{new Topic(topic, QoS.AT_LEAST_ONCE)});
+            connection.subscribe(new Topic[] { new Topic(topic, QoS.AT_LEAST_ONCE) });
             Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
             assertEquals(RETAINED + topic, new String(msg.getPayload()));
@@ -384,26 +385,25 @@ public class MQTTTest extends AbstractMQTTTest {
             assertEquals(topic, new String(msg.getPayload()));
             msg.ack();
 
-            connection.unsubscribe(new String[] {topic});
+            connection.unsubscribe(new String[] { topic });
         }
         connection.disconnect();
 
         // test wildcard patterns with above topics
-        String[] wildcards = {"#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+"};
+        String[] wildcards = { "#", "+", "+/#", "/+", "+/", "+/+", "+/+/", "+/+/+" };
         for (String wildcard : wildcards) {
             final Pattern pattern = Pattern.compile(wildcard.replaceAll("/?#", "(/?.*)*").replaceAll("\\+",
"[^/]*"));
 
             connection = mqtt.blockingConnection();
             connection.connect();
-            connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+            connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) });
 
             // test retained messages
             Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
             do {
                 assertNotNull("RETAINED null " + wildcard, msg);
                 assertTrue("RETAINED prefix " + wildcard, new String(msg.getPayload()).startsWith(RETAINED));
-                assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(),
-                    pattern.matcher(msg.getTopic()).matches());
+                assertTrue("RETAINED matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
                 msg.ack();
                 msg = connection.receive(1000, TimeUnit.MILLISECONDS);
             } while (msg != null);
@@ -412,7 +412,7 @@ public class MQTTTest extends AbstractMQTTTest {
             connection.disconnect();
             connection = mqtt.blockingConnection();
             connection.connect();
-            connection.subscribe(new Topic[]{new Topic(wildcard, QoS.AT_LEAST_ONCE)});
+            connection.subscribe(new Topic[] { new Topic(wildcard, QoS.AT_LEAST_ONCE) });
 
             // test non-retained message
             for (String topic : topics) {
@@ -421,8 +421,7 @@ public class MQTTTest extends AbstractMQTTTest {
             msg = connection.receive(1000, TimeUnit.MILLISECONDS);
             do {
                 assertNotNull("Non-retained Null " + wildcard, msg);
-                assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(),
-                    pattern.matcher(msg.getTopic()).matches());
+                assertTrue("Non-retained matching " + wildcard + " " + msg.getTopic(), pattern.matcher(msg.getTopic()).matches());
                 msg.ack();
                 msg = connection.receive(1000, TimeUnit.MILLISECONDS);
             } while (msg != null);
@@ -443,9 +442,9 @@ public class MQTTTest extends AbstractMQTTTest {
 
             MQTT mqtt = createMQTTConnection();
             mqtt.setClientId("foo");
-            mqtt.setKeepAlive((short)2);
+            mqtt.setKeepAlive((short) 2);
 
-            final int[] actualQoS = {-1};
+            final int[] actualQoS = { -1 };
             mqtt.setTracer(new Tracer() {
                 @Override
                 public void onReceive(MQTTFrame frame) {
@@ -459,7 +458,7 @@ public class MQTTTest extends AbstractMQTTTest {
             final BlockingConnection connection = mqtt.blockingConnection();
             connection.connect();
             connection.publish(topic, topic.getBytes(), QoS.EXACTLY_ONCE, true);
-            connection.subscribe(new Topic[]{ new Topic(topic, QoS.valueOf(topic)) });
+            connection.subscribe(new Topic[] { new Topic(topic, QoS.valueOf(topic)) });
 
             final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
@@ -472,7 +471,7 @@ public class MQTTTest extends AbstractMQTTTest {
             assertEquals(i, actualQoS[0]);
             msg.ack();
 
-            connection.unsubscribe(new String[]{topic});
+            connection.unsubscribe(new String[] { topic });
             connection.disconnect();
         }
 
@@ -485,9 +484,9 @@ public class MQTTTest extends AbstractMQTTTest {
 
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
-        mqtt.setKeepAlive((short)2);
+        mqtt.setKeepAlive((short) 2);
 
-        final int[] actualQoS = {-1};
+        final int[] actualQoS = { -1 };
         mqtt.setTracer(new Tracer() {
             @Override
             public void onReceive(MQTTFrame frame) {
@@ -506,7 +505,7 @@ public class MQTTTest extends AbstractMQTTTest {
 
         QoS[] qoss = { QoS.AT_MOST_ONCE, QoS.AT_MOST_ONCE, QoS.AT_LEAST_ONCE, QoS.EXACTLY_ONCE
};
         for (QoS qos : qoss) {
-            connection.subscribe(new Topic[]{ new Topic("TopicA", qos) });
+            connection.subscribe(new Topic[] { new Topic("TopicA", qos) });
 
             final Message msg = connection.receive(5000, TimeUnit.MILLISECONDS);
             assertNotNull(msg);
@@ -520,7 +519,7 @@ public class MQTTTest extends AbstractMQTTTest {
             assertEquals(qos.ordinal(), actualQoS[0]);
         }
 
-        connection.unsubscribe(new String[]{"TopicA"});
+        connection.unsubscribe(new String[] { "TopicA" });
         connection.disconnect();
 
     }
@@ -539,7 +538,7 @@ public class MQTTTest extends AbstractMQTTTest {
         map.put(new ActiveMQTopic(ANONYMOUS), new GroupPrincipal(ANONYMOUS));
         final AuthorizationPlugin authorizationPlugin = new AuthorizationPlugin(new SimpleAuthorizationMap(map,
map, map));
 
-        brokerService.setPlugins(new BrokerPlugin[] {authorizationPlugin, authenticationPlugin});
+        brokerService.setPlugins(new BrokerPlugin[] { authorizationPlugin, authenticationPlugin
});
         brokerService.start();
 
         MQTT mqtt = createMQTTConnection();
@@ -550,10 +549,9 @@ public class MQTTTest extends AbstractMQTTTest {
         connection.connect();
 
         final String NAMED = "named";
-        byte[] qos = connection.subscribe(new Topic[] {
-            new Topic(NAMED, QoS.AT_MOST_ONCE), new Topic(ANONYMOUS, QoS.EXACTLY_ONCE)});
-        assertEquals((byte)0x80, qos[0]);
-        assertEquals((byte)QoS.EXACTLY_ONCE.ordinal(), qos[1]);
+        byte[] qos = connection.subscribe(new Topic[] { new Topic(NAMED, QoS.AT_MOST_ONCE),
new Topic(ANONYMOUS, QoS.EXACTLY_ONCE) });
+        assertEquals((byte) 0x80, qos[0]);
+        assertEquals((byte) QoS.EXACTLY_ONCE.ordinal(), qos[1]);
 
         // validate the subscription by sending a retained message
         connection.publish(ANONYMOUS, ANONYMOUS.getBytes(), QoS.AT_MOST_ONCE, true);
@@ -562,9 +560,9 @@ public class MQTTTest extends AbstractMQTTTest {
         assertEquals(ANONYMOUS, new String(msg.getPayload()));
         msg.ack();
 
-        connection.unsubscribe(new String[]{ANONYMOUS});
-        qos = connection.subscribe(new Topic[]{new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE)});
-        assertEquals((byte)QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
+        connection.unsubscribe(new String[] { ANONYMOUS });
+        qos = connection.subscribe(new Topic[] { new Topic(ANONYMOUS, QoS.AT_LEAST_ONCE)
});
+        assertEquals((byte) QoS.AT_LEAST_ONCE.ordinal(), qos[0]);
 
         msg = connection.receive(1000, TimeUnit.MILLISECONDS);
         assertNotNull(msg);
@@ -616,9 +614,9 @@ public class MQTTTest extends AbstractMQTTTest {
         // publish retained message
         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, true);
 
-        String[] subs = {TOPIC, "TopicA/#", "TopicA/+"};
+        String[] subs = { TOPIC, "TopicA/#", "TopicA/+" };
         for (int i = 0; i < qoss.length; i++) {
-            connection.subscribe(new Topic[]{ new Topic(subs[i], qoss[i]) });
+            connection.subscribe(new Topic[] { new Topic(subs[i], qoss[i]) });
         }
 
         // publish non-retained message
@@ -639,7 +637,8 @@ public class MQTTTest extends AbstractMQTTTest {
         } while (msg != null && received++ < subs.length * 2);
         assertEquals("Unexpected number of messages", subs.length * 2, received + 1);
 
-        // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for AT_MOST_ONCE
+        // make sure we received distinct ids for QoS != AT_MOST_ONCE, and 0 for
+        // AT_MOST_ONCE
         for (int i = 0; i < publishList.size(); i++) {
             for (int j = i + 1; j < publishList.size(); j++) {
                 final PUBLISH publish1 = publishList.get(i);
@@ -696,8 +695,8 @@ public class MQTTTest extends AbstractMQTTTest {
         BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
         final String TOPIC = "TopicA/";
-        final String[] topics = new String[] {TOPIC, "TopicA/+"};
-        connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1],
QoS.EXACTLY_ONCE)});
+        final String[] topics = new String[] { TOPIC, "TopicA/+" };
+        connection.subscribe(new Topic[] { new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1],
QoS.EXACTLY_ONCE) });
 
         // publish non-retained message
         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
@@ -724,10 +723,8 @@ public class MQTTTest extends AbstractMQTTTest {
         assertEquals(4, publishList.size());
 
         // make sure we received duplicate message ids
-        assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() ||
-            publishList.get(0).messageId() == publishList.get(3).messageId());
-        assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() ||
-            publishList.get(1).messageId() == publishList.get(2).messageId());
+        assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() || publishList.get(0).messageId()
== publishList.get(3).messageId());
+        assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() || publishList.get(1).messageId()
== publishList.get(2).messageId());
         assertTrue(publishList.get(2).dup() && publishList.get(3).dup());
 
         connection.unsubscribe(topics);
@@ -771,7 +768,7 @@ public class MQTTTest extends AbstractMQTTTest {
         BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
         final String TOPIC = "TopicA/";
-        connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        connection.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) });
 
         // publish non-retained messages
         final int TOTAL_MESSAGES = 10;
@@ -815,7 +812,7 @@ public class MQTTTest extends AbstractMQTTTest {
         addMQTTConnector("trace=true");
         brokerService.start();
 
-        final String[] cleanClientIds = new String[] { "", "clean-packetid", null};
+        final String[] cleanClientIds = new String[] { "", "clean-packetid", null };
         final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
         MQTT[] mqtts = new MQTT[cleanClientIds.length];
         for (int i = 0; i < cleanClientIds.length; i++) {
@@ -853,7 +850,7 @@ public class MQTTTest extends AbstractMQTTTest {
             BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection();
             connection.connect();
             final String TOPIC = "TopicA/";
-            connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+            connection.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) });
 
             // publish non-retained message
             connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
@@ -888,7 +885,7 @@ public class MQTTTest extends AbstractMQTTTest {
         });
 
         final String TOPIC = "TopicA";
-        final byte[] qos = connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        final byte[] qos = connection.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE)
});
         assertEquals(QoS.EXACTLY_ONCE.ordinal(), qos[0]);
         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
         // kill transport
@@ -921,7 +918,7 @@ public class MQTTTest extends AbstractMQTTTest {
         BlockingConnection notClean = mqttNotClean.blockingConnection();
         final String TOPIC = "TopicA";
         notClean.connect();
-        notClean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        notClean.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) });
         notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
         notClean.disconnect();
 
@@ -941,7 +938,7 @@ public class MQTTTest extends AbstractMQTTTest {
         clean.connect();
         msg = clean.receive(10000, TimeUnit.MILLISECONDS);
         assertNull(msg);
-        clean.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+        clean.subscribe(new Topic[] { new Topic(TOPIC, QoS.EXACTLY_ONCE) });
         clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
         clean.disconnect();
 
@@ -953,7 +950,7 @@ public class MQTTTest extends AbstractMQTTTest {
         notClean.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendMQTTReceiveJMS() throws Exception {
         addMQTTConnector();
         TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
@@ -982,7 +979,7 @@ public class MQTTTest extends AbstractMQTTTest {
         provider.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testSendJMSReceiveMQTT() throws Exception {
         addMQTTConnector();
         TransportConnector openwireTransport = brokerService.addConnector("tcp://localhost:0");
@@ -996,7 +993,7 @@ public class MQTTTest extends AbstractMQTTTest {
         javax.jms.Topic jmsTopic = s.createTopic("foo.far");
         MessageProducer producer = s.createProducer(jmsTopic);
 
-        provider.subscribe("foo/+",AT_MOST_ONCE);
+        provider.subscribe("foo/+", AT_MOST_ONCE);
         for (int i = 0; i < numberOfMessages; i++) {
             String payload = "This is Test Message: " + i;
             TextMessage sendMessage = s.createTextMessage(payload);
@@ -1010,13 +1007,13 @@ public class MQTTTest extends AbstractMQTTTest {
         activeMQConnection.close();
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testPingKeepsInactivityMonitorAlive() throws Exception {
         addMQTTConnector();
         brokerService.start();
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
-        mqtt.setKeepAlive((short)2);
+        mqtt.setKeepAlive((short) 2);
         final BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
 
@@ -1031,13 +1028,13 @@ public class MQTTTest extends AbstractMQTTTest {
         connection.disconnect();
     }
 
-    @Test(timeout=60 * 1000)
-    public void testTurnOffInactivityMonitor()throws Exception{
+    @Test(timeout = 60 * 1000)
+    public void testTurnOffInactivityMonitor() throws Exception {
         addMQTTConnector("transport.useInactivityMonitor=false");
         brokerService.start();
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo3");
-        mqtt.setKeepAlive((short)2);
+        mqtt.setKeepAlive((short) 2);
         final BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
 
@@ -1052,7 +1049,7 @@ public class MQTTTest extends AbstractMQTTTest {
         connection.disconnect();
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 30 * 10000)
     public void testJmsMapping() throws Exception {
         addMQTTConnector();
         addOpenwireConnector();
@@ -1069,7 +1066,7 @@ public class MQTTTest extends AbstractMQTTTest {
         // set up mqtt producer
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo3");
-        mqtt.setKeepAlive((short)2);
+        mqtt.setKeepAlive((short) 2);
         final BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
 
@@ -1099,7 +1096,7 @@ public class MQTTTest extends AbstractMQTTTest {
 
     }
 
-    @Test(timeout = 300000)
+    @Test(timeout = 30 * 10000)
     public void testSubscribeMultipleTopics() throws Exception {
 
         byte[] payload = new byte[1024 * 32];
@@ -1116,8 +1113,8 @@ public class MQTTTest extends AbstractMQTTTest {
         final BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
 
-        Topic[] topics = {new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE)};
-        Topic[] wildcardTopic = {new Topic("Topic/#", QoS.AT_LEAST_ONCE)};
+        Topic[] topics = { new Topic("Topic/A", QoS.EXACTLY_ONCE), new Topic("Topic/B", QoS.EXACTLY_ONCE)
};
+        Topic[] wildcardTopic = { new Topic("Topic/#", QoS.AT_LEAST_ONCE) };
         connection.subscribe(wildcardTopic);
 
         for (Topic topic : topics) {
@@ -1131,18 +1128,17 @@ public class MQTTTest extends AbstractMQTTTest {
             received++;
             payload = message.getPayload();
             String messageContent = new String(payload);
-            LOG.info("Received message from topic: " + message.getTopic() +
-                " Message content: " + messageContent);
+            LOG.info("Received message from topic: " + message.getTopic() + " Message content:
" + messageContent);
             message.ack();
         }
 
         assertEquals("Should have received " + topics.length + " messages", topics.length,
received);
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testReceiveMessageSentWhileOffline() throws Exception {
         final byte[] payload = new byte[1024 * 32];
-        for (int i = 0; i < payload.length; i++){
+        for (int i = 0; i < payload.length; i++) {
             payload[i] = '2';
         }
 
@@ -1161,7 +1157,7 @@ public class MQTTTest extends AbstractMQTTTest {
         BlockingConnection connectionSub = mqttSub.blockingConnection();
         connectionSub.connect();
 
-        Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+        Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
         connectionSub.subscribe(topics);
 
         for (int i = 0; i < messagesPerRun; ++i) {
@@ -1178,7 +1174,7 @@ public class MQTTTest extends AbstractMQTTTest {
         }
         connectionSub.disconnect();
 
-        for(int j = 0; j < numberOfRuns; j++) {
+        for (int j = 0; j < numberOfRuns; j++) {
 
             for (int i = 0; i < messagesPerRun; ++i) {
                 connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
@@ -1200,14 +1196,14 @@ public class MQTTTest extends AbstractMQTTTest {
         assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + "
messages", (messagesPerRun * (numberOfRuns + 1)), received);
     }
 
-    @Test(timeout=30000)
+    @Test(timeout = 30 * 1000)
     public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
         // default keep alive in milliseconds
         addMQTTConnector("transport.defaultKeepAlive=2000");
         brokerService.start();
         MQTT mqtt = createMQTTConnection();
         mqtt.setClientId("foo");
-        mqtt.setKeepAlive((short)0);
+        mqtt.setKeepAlive((short) 0);
         final BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
 
@@ -1220,7 +1216,7 @@ public class MQTTTest extends AbstractMQTTTest {
         }));
     }
 
-    @Test(timeout=60 * 1000)
+    @Test(timeout = 60 * 1000)
     public void testReuseConnection() throws Exception {
         addMQTTConnector();
         brokerService.start();
@@ -1242,6 +1238,62 @@ public class MQTTTest extends AbstractMQTTTest {
         }
     }
 
+    @Test(timeout = 60 * 1000)
+    public void testNoMessageReceivedAfterUnsubscribeMQTT() throws Exception {
+        addMQTTConnector();
+        brokerService.setPersistent(true);
+        brokerService.start();
+        Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
+
+        MQTT mqttPub = createMQTTConnection("MQTTPub-Client", true);
+        // mqttPub.setVersion("3.1.1");
+
+        MQTT mqttSub = createMQTTConnection("MQTTSub-Client", false);
+        // mqttSub.setVersion("3.1.1");
+
+        BlockingConnection connectionPub = mqttPub.blockingConnection();
+        connectionPub.connect();
+
+        BlockingConnection connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
+        connectionSub.subscribe(topics);
+        connectionSub.unsubscribe(new String[] { "TopicA" });
+        connectionSub.disconnect();
+
+        for (int i = 0; i < 10; i++) {
+            String payload = "Message " + i;
+            connectionPub.publish(topics[0].name().toString(), payload.getBytes(), QoS.EXACTLY_ONCE,
false);
+        }
+
+        connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
+
+        int received = 0;
+        for (int i = 0; i < 10; ++i) {
+            Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            LOG.info("Message is " + new String(message.getPayload()));
+            received++;
+            message.ack();
+        }
+        assertEquals(10, received);
+
+        connectionSub.disconnect();
+        connectionPub.disconnect();
+    }
+
+    @Test(timeout = 60 * 1000)
+    public void testMQTT311Connection() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("foo");
+        mqtt.setVersion("3.1.1");
+        final BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        connection.disconnect();
+    }
+
     @Override
     protected String getProtocolScheme() {
         return "mqtt";
@@ -1270,10 +1322,10 @@ public class MQTTTest extends AbstractMQTTTest {
     }
 
     protected Tracer createTracer() {
-        return new Tracer(){
+        return new Tracer() {
             @Override
             public void onReceive(MQTTFrame frame) {
-                LOG.info("Client Received:\n"+frame);
+                LOG.info("Client Received:\n" + frame);
             }
 
             @Override
@@ -1287,17 +1339,4 @@ public class MQTTTest extends AbstractMQTTTest {
             }
         };
     }
-
-    @Test(timeout=60 * 1000)
-    public void testMQTT311Connection()throws Exception{
-        addMQTTConnector();
-        brokerService.start();
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId("foo");
-        mqtt.setVersion("3.1.1");
-        final BlockingConnection connection = mqtt.blockingConnection();
-        connection.connect();
-        connection.disconnect();
-    }
-
 }


Mime
View raw message