activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AMQ-5382) MQTT messages published after unsubscribe on a durable topic are received
Date Sat, 01 Nov 2014 13:06:33 GMT

    [ https://issues.apache.org/jira/browse/AMQ-5382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14193143#comment-14193143
] 

Timothy Bish commented on AMQ-5382:
-----------------------------------

Root cause is related to the fact that the MQTT subscriptions are made with the retroactive
flag turned on in order to meet other requirements of the MQTT spec.  You are welcome to dig
into the code and see if you can come up with a workaround for this.  

> MQTT messages published after unsubscribe on a durable topic are received
> -------------------------------------------------------------------------
>
>                 Key: AMQ-5382
>                 URL: https://issues.apache.org/jira/browse/AMQ-5382
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 5.11.0
>            Reporter: AR
>              Labels: MQTT
>
> Test procedure:
> Create a durable subscription to topic "durableunsubtest" with clientid "durableUnsub".
> Publish a message with client2 to topic "durableunsubtest". Make sure "retain" flag is
not set.
> Verify that the message is received (confirming creation of durable subscription).
> Unsubscribe from topic "durableunsubtest" from clientid "durableUnsub". Disconnect client.
> Publish another message with client2.  Make sure "retain" flag is not set.
> Connect "durableUnsub" and subscribe to the topic again.
> Verify that no messages are received.
> JUnit Test Case code:
> {noformat}
>     /*
>      * Test unsubscribe on a durable subscription topic
>      */
>     @Test(timeout = 60 * 1000)
>     public void testDurableUnsubscribe() throws Exception {
>     	// Create connection1
>         final String clientId = "durableUnsub";
>         MQTT mqtt1 = createMQTTConnection(clientId, false);
>         mqtt1.setKeepAlive((short) 60);
>         final BlockingConnection connection1 = mqtt1.blockingConnection();
>         connection1.connect();
>         
>         // create a durable subscription for client "durableUnsub"
>         final String TOPICNAME = "durableunsubtest";
>         final String payload = "durable unsub test";
>         Topic[] topic = { new Topic(TOPICNAME, QoS.AT_LEAST_ONCE) };
>         connection1.subscribe(topic);
>         // Create connection2 and publish a message on the topic
>         MQTT mqtt2 = createMQTTConnection(null, true);
>         mqtt2.setKeepAlive((short) 60);
>         final BlockingConnection connection2 = mqtt2.blockingConnection();
>         connection2.connect();
>         connection2.publish(TOPICNAME, payload.getBytes(), QoS.EXACTLY_ONCE, false);
>         // Verify that the durable subscription was created by receiving the message
>         // on connection1
>         Message message = connection1.receive(10, TimeUnit.SECONDS);
>         assertNotNull(message);
>         message.ack();
>         assertEquals("Unexpected String received", payload, new String(message.getPayload()));
>         // Unsubscribe the topic on connection1
>         connection1.unsubscribe(new String[]{TOPICNAME});
>         // Disconnect connection1
>         connection1.disconnect();
>         
>         // Publish another message on connection2 for the same topic
>         MQTT mqtt2a = createMQTTConnection(null, true);
>         mqtt2a.setKeepAlive((short) 60);
>         final BlockingConnection connection2a = mqtt2a.blockingConnection();
>         connection2a.connect();
>         connection2a.publish(TOPICNAME, payload.getBytes(), QoS.EXACTLY_ONCE, false);
>        
>         // Create connection3 with the same clientid "durableUnsub"
>         // and subscribe to the topic
>         MQTT mqtt3 = createMQTTConnection(clientId, false);
>         mqtt3.setKeepAlive((short) 60);
>         BlockingConnection connection3 = mqtt3.blockingConnection();
>         connection3.connect();
>         connection3.subscribe(topic);
>         message = connection3.receive(10, TimeUnit.SECONDS);
>         // Since we have unsubscribed before the publish, we should not receive
>         // any message (retain flag was also not set during publish)
>         assertNull(message);     
>     }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message