activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1494222 - /activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Date Tue, 18 Jun 2013 17:37:01 GMT
Author: tabish
Date: Tue Jun 18 17:37:01 2013
New Revision: 1494222

URL: http://svn.apache.org/r1494222
Log:
test case for: https://issues.apache.org/jira/browse/AMQ-4585

Modified:
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1494222&r1=1494221&r2=1494222&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Tue Jun 18 17:37:01 2013
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.transport.mqtt;
 
+import java.util.concurrent.TimeUnit;
+
 import org.apache.activemq.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
@@ -72,6 +74,7 @@ public class MQTTTest extends AbstractMQ
         connection.disconnect();
     }
 
+    @Test(timeout=300000)
     public void testSubscribeMultipleTopics() throws Exception {
         byte[] payload = new byte[1024 * 32];
         for (int i = 0; i < payload.length; i++){
@@ -109,6 +112,65 @@ public class MQTTTest extends AbstractMQ
         assertEquals("Should have received " + topics.length + " messages", topics.length,
received);
     }
 
+    @Test(timeout=300000)
+    public void testReceiveMessageSentWhileOffline() throws Exception {
+        addMQTTConnector();
+        brokerService.start();
+        final MQTTClientProvider publisher = getMQTTClientProvider();
+        initializeConnection(publisher);
+
+        MQTT mqtt = createMQTTConnection();
+        mqtt.setClientId("MQTT-Client");
+        mqtt.setCleanSession(false);
+
+        {
+            final BlockingConnection subscriber = mqtt.blockingConnection();
+            subscriber.connect();
+            Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
+            subscriber.subscribe(topic);
+
+            for (int i = 0; i < numberOfMessages; i++) {
+                String payload = "Test Message: " + i;
+                publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+            }
+
+            for (int i = 0; i < numberOfMessages / 2; i++) {
+                Message message = subscriber.receive(5, TimeUnit.SECONDS);
+                assertNotNull(message);
+                byte[] payload = message.getPayload();
+                String messageContent = new String(payload);
+                if (i % 100 == 0) {
+                    LOG.debug("Received message from topic: " + message.getTopic() +
+                              " Message content: " + messageContent);
+                }
+                message.ack();
+            }
+
+            subscriber.disconnect();
+        }
+
+        publisher.disconnect();
+
+        final BlockingConnection subscriber = mqtt.blockingConnection();
+        subscriber.connect();
+        Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
+        subscriber.subscribe(topic);
+
+        for (int i = 0; i < numberOfMessages / 2; i++) {
+            Message message = subscriber.receive(5, TimeUnit.SECONDS);
+            assertNotNull(message);
+            byte[] payload = message.getPayload();
+            String messageContent = new String(payload);
+            if (i % 100 == 0) {
+                LOG.debug("Received message from topic: " + message.getTopic() +
+                          " Message content: " + messageContent);
+            }
+            message.ack();
+        }
+
+        subscriber.disconnect();
+    }
+
     @Test(timeout=30000)
     public void testDefaultKeepAliveWhenClientSpecifiesZero() throws Exception {
         // default keep alive in milliseconds
@@ -166,10 +228,18 @@ public class MQTTTest extends AbstractMQ
     }
 
     protected MQTT createMQTTConnection() throws Exception {
+        return createMQTTConnection(null, false);
+    }
+
+    protected MQTT createMQTTConnection(String clientId, boolean clean) throws Exception
{
         MQTT mqtt = new MQTT();
         mqtt.setConnectAttemptsMax(1);
         mqtt.setReconnectAttemptsMax(0);
         mqtt.setTracer(createTracer());
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
         mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
         // shut off connect retry
         return mqtt;



Mime
View raw message