activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5092
Date Mon, 24 Mar 2014 14:16:25 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk afddc1a83 -> 45c0dfb2b


https://issues.apache.org/jira/browse/AMQ-5092

Add patch that fixes the missing test case.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/45c0dfb2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/45c0dfb2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/45c0dfb2

Branch: refs/heads/trunk
Commit: 45c0dfb2bc2134d790d6af5b19a328f990a0a27e
Parents: afddc1a
Author: Timothy Bish <tabish121@gmai.com>
Authored: Mon Mar 24 10:16:12 2014 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Mon Mar 24 10:16:12 2014 -0400

----------------------------------------------------------------------
 .../activemq/transport/mqtt/MQTTTest.java       | 184 +++++++++++++++++--
 1 file changed, 164 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/45c0dfb2/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
----------------------------------------------------------------------
diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
index d8788d3..31198ba 100644
--- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
+++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java
@@ -693,41 +693,185 @@ public class MQTTTest extends AbstractMQTTTest {
             }
         });
 
-        final BlockingConnection connection = mqtt.blockingConnection();
+        BlockingConnection connection = mqtt.blockingConnection();
         connection.connect();
-
-        // create overlapping subscriptions with different QoSs
         final String TOPIC = "TopicA/";
-        final Topic[] topics = {new Topic(TOPIC, QoS.AT_LEAST_ONCE)};
-        connection.subscribe(topics);
+        final String[] topics = new String[] {TOPIC, "TopicA/+"};
+        connection.subscribe(new Topic[]{new Topic(topics[0], QoS.AT_LEAST_ONCE), new Topic(topics[1],
QoS.EXACTLY_ONCE)});
 
         // publish non-retained message
         connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
 
-        Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
-        assertNotNull(msg);
-        assertEquals(TOPIC, new String(msg.getPayload()));
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return publishList.size() == 2;
+            }
+        }, 5000);
+        assertEquals(2, publishList.size());
 
-        // drop subs without acknowledging messages, then subscribe and receive again
-        connection.unsubscribe(new String[]{ TOPIC });
-        Thread.sleep(1000);
-        connection.subscribe(topics);
-        Thread.sleep(1000);
+        connection.disconnect();
 
-        msg = connection.receive(30000, TimeUnit.MILLISECONDS);
-        assertNotNull(msg);
-        assertEquals(TOPIC, new String(msg.getPayload()));
-        msg.ack();
+        connection = mqtt.blockingConnection();
+        connection.connect();
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return publishList.size() == 4;
+            }
+        }, 5000);
+        assertEquals(4, publishList.size());
 
         // make sure we received duplicate message ids
-        assertEquals(2, publishList.size());
-        assertEquals(publishList.get(0).messageId(), publishList.get(1).messageId());
-        assertTrue(publishList.get(0).dup() || publishList.get(1).dup());
+        assertTrue(publishList.get(0).messageId() == publishList.get(2).messageId() ||
+            publishList.get(0).messageId() == publishList.get(3).messageId());
+        assertTrue(publishList.get(1).messageId() == publishList.get(3).messageId() ||
+            publishList.get(1).messageId() == publishList.get(2).messageId());
+        assertTrue(publishList.get(2).dup() && publishList.get(3).dup());
+
+        connection.unsubscribe(topics);
+        connection.disconnect();
+    }
+
+    @Test(timeout = 90 * 1000)
+    public void testPacketIdGeneratorNonCleanSession() throws Exception {
+        addMQTTConnector("trace=true");
+        brokerService.start();
+
+        final MQTT mqtt = createMQTTConnection("nonclean-packetid", false);
+        mqtt.setKeepAlive((short) 15);
+
+        final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
+        mqtt.setTracer(new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client received:\n" + frame);
+                if (frame.messageType() == PUBLISH.TYPE) {
+                    PUBLISH publish = new PUBLISH();
+                    try {
+                        publish.decode(frame);
+                        LOG.info("PUBLISH " + publish);
+                    } catch (ProtocolException e) {
+                        fail("Error decoding publish " + e.getMessage());
+                    }
+                    if (publishMap.get(publish.messageId()) != null) {
+                        assertTrue(publish.dup());
+                    }
+                    publishMap.put(publish.messageId(), publish);
+                }
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client sent:\n" + frame);
+            }
+        });
+
+        BlockingConnection connection = mqtt.blockingConnection();
+        connection.connect();
+        final String TOPIC = "TopicA/";
+        connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+
+        // publish non-retained messages
+        final int TOTAL_MESSAGES = 10;
+        for (int i = 0; i < TOTAL_MESSAGES; i++) {
+            connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+        }
+
+        // receive half the messages in this session
+        for (int i = 0; i < TOTAL_MESSAGES / 2; i++) {
+            final Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(TOPIC, new String(msg.getPayload()));
+            msg.ack();
+        }
+
+        connection.disconnect();
+        // resume session
+        connection = mqtt.blockingConnection();
+        connection.connect();
+        // receive rest of the messages
+        Message msg = null;
+        do {
+            msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            if (msg != null) {
+                assertEquals(TOPIC, new String(msg.getPayload()));
+                msg.ack();
+            }
+        } while (msg != null);
+
+        // make sure we received all message ids
+        for (short id = 1; id <= TOTAL_MESSAGES; id++) {
+            assertNotNull("No message for id " + id, publishMap.get(id));
+        }
 
         connection.unsubscribe(new String[] { TOPIC });
         connection.disconnect();
     }
 
+    @Test(timeout = 90 * 1000)
+    public void testPacketIdGeneratorCleanSession() throws Exception {
+        addMQTTConnector("trace=true");
+        brokerService.start();
+
+        final String[] cleanClientIds = new String[] { "", "clean-packetid", null};
+        final Map<Short, PUBLISH> publishMap = new ConcurrentHashMap<Short, PUBLISH>();
+        MQTT[] mqtts = new MQTT[cleanClientIds.length];
+        for (int i = 0; i < cleanClientIds.length; i++) {
+            mqtts[i] = createMQTTConnection("", true);
+            mqtts[i].setKeepAlive((short) 15);
+
+            mqtts[i].setTracer(new Tracer() {
+                @Override
+                public void onReceive(MQTTFrame frame) {
+                    LOG.info("Client received:\n" + frame);
+                    if (frame.messageType() == PUBLISH.TYPE) {
+                        PUBLISH publish = new PUBLISH();
+                        try {
+                            publish.decode(frame);
+                            LOG.info("PUBLISH " + publish);
+                        } catch (ProtocolException e) {
+                            fail("Error decoding publish " + e.getMessage());
+                        }
+                        if (publishMap.get(publish.messageId()) != null) {
+                            assertTrue(publish.dup());
+                        }
+                        publishMap.put(publish.messageId(), publish);
+                    }
+                }
+
+                @Override
+                public void onSend(MQTTFrame frame) {
+                    LOG.info("Client sent:\n" + frame);
+                }
+            });
+        }
+
+        final Random random = new Random();
+        for (short i = 0; i < 10; i++) {
+            BlockingConnection connection = mqtts[random.nextInt(cleanClientIds.length)].blockingConnection();
+            connection.connect();
+            final String TOPIC = "TopicA/";
+            connection.subscribe(new Topic[]{new Topic(TOPIC, QoS.EXACTLY_ONCE)});
+
+            // publish non-retained message
+            connection.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
+            Message msg = connection.receive(1000, TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(TOPIC, new String(msg.getPayload()));
+            msg.ack();
+
+            assertEquals(1, publishMap.size());
+            final short id = (short) (i + 1);
+            assertNotNull("No message for id " + id, publishMap.get(id));
+            publishMap.clear();
+
+            connection.disconnect();
+        }
+
+    }
+
     @Test(timeout = 60 * 1000)
     public void testClientConnectionFailure() throws Exception {
         addMQTTConnector();


Mime
View raw message