activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1494283 - /activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Date Tue, 18 Jun 2013 20:22:51 GMT
Author: tabish
Date: Tue Jun 18 20:22:51 2013
New Revision: 1494283

URL: http://svn.apache.org/r1494283
Log:
test case for: https://issues.apache.org/jira/browse/AMQ-4585

Modified:
    activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java

Modified: activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java?rev=1494283&r1=1494282&r2=1494283&view=diff
==============================================================================
--- activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
(original)
+++ activemq/trunk/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
Tue Jun 18 20:22:51 2013
@@ -114,61 +114,76 @@ public class MQTTTest extends AbstractMQ
 
     @Test(timeout=300000)
     public void testReceiveMessageSentWhileOffline() throws Exception {
-        addMQTTConnector();
-        brokerService.start();
-        final MQTTClientProvider publisher = getMQTTClientProvider();
-        initializeConnection(publisher);
+        byte[] payload = new byte[1024 * 32];
+        for (int i = 0; i < payload.length; i++){
+            payload[i] = '2';
+        }
 
-        MQTT mqtt = createMQTTConnection();
-        mqtt.setClientId("MQTT-Client");
-        mqtt.setCleanSession(false);
+        int numberOfRuns = 100;
+        int messagesPerRun = 2;
 
-        {
-            final BlockingConnection subscriber = mqtt.blockingConnection();
-            subscriber.connect();
-            Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
-            subscriber.subscribe(topic);
-
-            for (int i = 0; i < numberOfMessages; i++) {
-                String payload = "Test Message: " + i;
-                publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
-            }
+        addMQTTConnector("trace=true");
+        brokerService.start();
+        MQTT mqttPub = createMQTTConnection();
+        mqttPub.setClientId("MQTT-Pub-Client");
 
-            for (int i = 0; i < numberOfMessages / 2; i++) {
-                Message message = subscriber.receive(5, TimeUnit.SECONDS);
-                assertNotNull(message);
-                byte[] payload = message.getPayload();
-                String messageContent = new String(payload);
-                if (i % 100 == 0) {
-                    LOG.debug("Received message from topic: " + message.getTopic() +
-                              " Message content: " + messageContent);
-                }
-                message.ack();
-            }
+        MQTT mqttSub = createMQTTConnection();
+        mqttSub.setClientId("MQTT-Sub-Client");
+        mqttSub.setCleanSession(false);
 
-            subscriber.disconnect();
-        }
+        final BlockingConnection connectionPub = mqttPub.blockingConnection();
+        connectionPub.connect();
+
+        BlockingConnection connectionSub = mqttSub.blockingConnection();
+        connectionSub.connect();
 
-        publisher.disconnect();
+        Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
+        connectionSub.subscribe(topics);
 
-        final BlockingConnection subscriber = mqtt.blockingConnection();
-        subscriber.connect();
-        Topic[] topic = {new Topic("foo", QoS.EXACTLY_ONCE)};
-        subscriber.subscribe(topic);
+        for (int i = 0; i < messagesPerRun; ++i) {
+            connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
+        }
 
-        for (int i = 0; i < numberOfMessages / 2; i++) {
-            Message message = subscriber.receive(5, TimeUnit.SECONDS);
+        int received = 0;
+        for (int i = 0; i < messagesPerRun; ++i) {
+            Message message = connectionSub.receive(5, TimeUnit.SECONDS);
             assertNotNull(message);
-            byte[] payload = message.getPayload();
+            received++;
+            payload = message.getPayload();
             String messageContent = new String(payload);
-            if (i % 100 == 0) {
-                LOG.debug("Received message from topic: " + message.getTopic() +
-                          " Message content: " + messageContent);
-            }
+            LOG.info("Received message from topic: " + message.getTopic() +
+                     " Message content: " + messageContent);
             message.ack();
         }
+        connectionSub.disconnect();
+
+        for(int j = 0; j < numberOfRuns; j++) {
+
+            for (int i = 0; i < messagesPerRun; ++i) {
+                connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
+            }
+
+            mqttSub = createMQTTConnection();
+            mqttSub.setClientId("MQTT-Sub-Client");
+            mqttSub.setCleanSession(false);
+
+            connectionSub = mqttSub.blockingConnection();
+            connectionSub.connect();
+            connectionSub.subscribe(topics);
 
-        subscriber.disconnect();
+            for (int i = 0; i < messagesPerRun; ++i) {
+                Message message = connectionSub.receive(5, TimeUnit.SECONDS);
+                assertNotNull(message);
+                received++;
+                payload = message.getPayload();
+                String messageContent = new String(payload);
+                LOG.info("Received message from topic: " + message.getTopic() +
+                         " Message content: " + messageContent);
+                message.ack();
+            }
+            connectionSub.disconnect();
+        }
+        assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + "
messages", (messagesPerRun * (numberOfRuns + 1)), received);
     }
 
     @Test(timeout=30000)
@@ -219,7 +234,7 @@ public class MQTTTest extends AbstractMQ
 
     @Override
     protected void addMQTTConnector() throws Exception {
-        addMQTTConnector("");
+        addMQTTConnector();
     }
 
     @Override
@@ -249,12 +264,12 @@ public class MQTTTest extends AbstractMQ
         return new Tracer(){
             @Override
             public void onReceive(MQTTFrame frame) {
-//                LOG.info("Client Received:\n"+frame);
+                LOG.info("Client Received:\n"+frame);
             }
 
             @Override
             public void onSend(MQTTFrame frame) {
-//                LOG.info("Client Sent:\n" + frame);
+                LOG.info("Client Sent:\n" + frame);
             }
 
             @Override



Mime
View raw message