activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "AR (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AMQ-5390) MQTT pending durable subscriber messages are not delievered after broker restart
Date Sun, 12 Oct 2014 01:14:33 GMT
AR created AMQ-5390:
-----------------------

             Summary: MQTT pending durable subscriber messages are not delievered after broker
restart
                 Key: AMQ-5390
                 URL: https://issues.apache.org/jira/browse/AMQ-5390
             Project: ActiveMQ
          Issue Type: Bug
          Components: MQTT
    Affects Versions: 5.11.0
            Reporter: AR


If there are pending messages to be delivered to a subscriber and if the broker is restarted
at this point, the pending messages are not delivered to the subscriber when it connects after
broker restart.

I modified existing test case testReceiveMessageSentWhileOffline() and added test case testReceiveMessageSentWhileOfflineAndBrokerRestart()
shown below:
changes:
* use standalone broker as I was not sure if embedded broker persists messages on permanent
store.
* manually need to restart when test prompts to restart broker

{noformat}
@Test(timeout = 60 * 1000)
    public void testReceiveMessageSentWhileOfflineAndBrokerRestart() throws Exception {
        final byte[] payload = new byte[1024 * 32];
        for (int i = 0; i < payload.length; i++) {
            payload[i] = '2';
        }

        int numberOfRuns = 100;
        int messagesPerRun = 2;

        final MQTT mqttPub = createMQTTConnection("MQTT-Pub-Client", true);
        final MQTT mqttSub = createMQTTConnection("MQTT-Sub-Client", false);
        mqttPub.setHost("tcp://localhost:1883");
        mqttSub.setHost("tcp://localhost:1883");

        final BlockingConnection connectionPub = mqttPub.blockingConnection();
        connectionPub.connect();

        BlockingConnection connectionSub = mqttSub.blockingConnection();
        connectionSub.connect();

        Topic[] topics = { new Topic("TopicA", QoS.EXACTLY_ONCE) };
        connectionSub.subscribe(topics);

        for (int i = 0; i < messagesPerRun; ++i) {
            connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
        }

        int received = 0;
        for (int i = 0; i < messagesPerRun; ++i) {
            Message message = connectionSub.receive(5, TimeUnit.SECONDS);
            assertNotNull(message);
            received++;
            assertTrue(Arrays.equals(payload, message.getPayload()));
            message.ack();
        }
        connectionSub.disconnect();

        for (int j = 0; j < numberOfRuns; j++) {

            for (int i = 0; i < messagesPerRun; ++i) {
                connectionPub.publish(topics[0].name().toString(), payload, QoS.AT_LEAST_ONCE,
false);
            }
            
            System.out.println("Restart broker here.....");

            Thread.sleep(30000);
            
            connectionSub = mqttSub.blockingConnection();
            connectionSub.connect();
            connectionSub.subscribe(topics);

            for (int i = 0; i < messagesPerRun; ++i) {
                Message message = connectionSub.receive(5, TimeUnit.SECONDS);
                assertNotNull(message);
                received++;
                assertTrue(Arrays.equals(payload, message.getPayload()));
                message.ack();
            }
            connectionSub.disconnect();
        }
        assertEquals("Should have received " + (messagesPerRun * (numberOfRuns + 1)) + " messages",
(messagesPerRun * (numberOfRuns + 1)), received);
    }
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message