activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (AMQ-5390) MQTT pending durable subscriber messages are not delievered after broker restart
Date Tue, 21 Oct 2014 22:01:35 GMT

     [ https://issues.apache.org/jira/browse/AMQ-5390?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timothy Bish closed AMQ-5390.
-----------------------------
    Resolution: Not a Problem

Added new test to the test suite to show that things are working as expected, messages are
recovered after broker restart. 

> MQTT pending durable subscriber messages are not delievered after broker restart
> --------------------------------------------------------------------------------
>
>                 Key: AMQ-5390
>                 URL: https://issues.apache.org/jira/browse/AMQ-5390
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 5.11.0
>            Reporter: AR
>
> If there are pending messages to be delivered to a subscriber and if the broker is restarted
at this point, the pending messages are not delivered to the subscriber when it connects after
broker restart.
> I modified existing test case testReceiveMessageSentWhileOffline() and added test case
testReceiveMessageSentWhileOfflineAndBrokerRestart() shown below:
> changes:
> * use standalone broker as I was not sure if embedded broker persists messages on permanent
store.
> * manually need to restart when test prompts to restart broker
> {noformat}
> @Test(timeout = 60 * 1000)
>     public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception
{
>         final byte[] payload = new byte[1024 * 32];
>         for (int i = 0; i < payload.length; i++) {
>             payload[i] = '2';
>         }
>         int numberOfRuns = 100;
>         int messagesPerRun = 2;
>         final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
>         final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
>         mqttPub.setHost("tcp://localhost:1883");
>         mqttSub.setHost("tcp://localhost:1883");
>         final BlockingConnection connectionPub = mqttPub.blockingConnection();
>         connectionPub.connect();
>         BlockingConnection connectionSub = mqttSub.blockingConnection();
>         connectionSub.connect();
>         Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
>         connectionSub.subscribe(topics);
>         for (int i = 0; i < messagesPerRun; ++i) {
>             connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
>         }
>         int received = 0;
>         for (int i = 0; i < messagesPerRun; ++i) {
>             Message message = connectionSub.receive(5, TimeUnit.SECONDS);
>             assertNotNull(message);
>             received++;
>             assertTrue(Arrays.equals(payload, message.getPayload()));
>             message.ack();
>         }
>         connectionSub.disconnect();
>         for (int j = 0; j < numberOfRuns; j++) {
>             for (int i = 0; i < messagesPerRun; ++i) {
>                 connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
>             }
>             
>             System.out.println("Restart broker here.....");
>             Thread.sleep(30000);
>             
>             connectionSub = mqttSub.blockingConnection();
>             connectionSub.connect();
>             connectionSub.subscribe(topics);
>             for (int i = 0; i < messagesPerRun; ++i) {
>                 Message message = connectionSub.receive(5, TimeUnit.SECONDS);
>                 assertNotNull(message);
>                 received++;
>                 assertTrue(Arrays.equals(payload, message.getPayload()));
>                 message.ack();
>             }
>             connectionSub.disconnect();
>         }
>         assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1))
+ " messages", (messagesPerRun * (numberOfRuns + 1)), received);
>     }
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message