activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From npakudin <npaku...@hill30.com>
Subject Lost persistent message sent before keepAliveInterval exceed
Date Thu, 27 Nov 2014 02:43:46 GMT
Hello,

I have the problem below.


What I do

- There is a MQTT client - Android application. It connects to server and
creates topic with name ServiceTracker.Inbound.userName (may be, the name is
necessary go log analysis).
- Sent there message with all default settings and payload  {"name":"1st
message”} by ActiveMQ console. Message is successfully delivered. If open
Active Subscribers, enqueued = 1, and dequeued = 1
- Turn on Airplane mode at Android, so there is no connection really, but
ActiveMQ still shows it in Active Subscribers.
- Send message WITH checkbox Persistent delivery and payload
{"name":"persistent”} If open Active Subscribers, enqueued = 2, and dequeued
= 2
- Send message WITHOUT checkbox Persistent delivery and payload {"name”:”not
persistent”} If open Active Subscribers, enqueued = 3, and dequeued = 2
- Turn off Airplane mode at Android, connection appears and only one message
is delivered - not persistent.


What I tried

- Restart ActiveMQ.
- Send a few persistent and a few not persistent messages - not persistent
are delivered, persistent aren't
- Delete ActiveMQ and extract again.
- Rebuild Android application and clean all data at phone.

Messages, sent after keepAliveInterval exceed are delivered successfully.
I reproduced it at least 20 times.
I found
http://activemq.2283324.n4.nabble.com/Random-persistent-messages-lost-tt3826331.html
, but it seems, that I have the other problem.


Configuration

ActiveMQ version: apache-activemq-5.10.0

$ java -version
java version "1.6.0_65"
Java(TM) SE Runtime Environment (build 1.6.0_65-b14-462-11M4609)
Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-462, mixed mode)

OS: Maс OS X 10.9.5

conf/activemq.xml - default, with change 1 line (twice added “+nio")
<transportConnector name="mqtt+nio"
uri="mqtt+nio://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
data/activemq.log attached


My code for Android:
(if it’s not enough, let me know)

public boolean connectIfNecessary() throws MqttException, IOException {

    synchronized (synchLock) {

        MqttConnectOptions connectionOptions = new MqttConnectOptions();
        connectionOptions.setCleanSession(false);
        connectionOptions.setUserName(userName);
        connectionOptions.setPassword(password.toCharArray());

        if (mqttClient == null) {

            stash = new MessageStash(applicationRoot + "/" + userName);

            mqttClient = new MqttAsyncClient(
                    brokerUrl,
                    userName,
                    new MqttDefaultFilePersistence(applicationRoot + "/" +
userName)
            );
            Log.d(TAG, "Broker URL: " + brokerUrl);
            Log.d(TAG, "Connection clientId: " + userName);
            Log.d(TAG, "Application path: " + applicationRoot);

            mqttClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.e(TAG, "Connection lost. Cause: " +
cause.toString());
                    service.onConnectionLost();
                   
notification.updateStatus(Notification.STATUS_DISCONNECTED);
                }

                @Override
                public void messageArrived(String topic, MqttMessage
message) throws Exception {
                    ConnectionBinder recipient =
recipients.get(getTopicFromInbound(topic));
                    if (recipient != null)
                        recipient.onMessageReceived(message.toString());
                    Log.d(TAG, "Message " + message + " received");
                }

                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    Log.d(TAG, "Message delivery complete");
                }
            });
        }

        if(mqttClient.isConnected()) // connection is already active - we
can subscribe to the topic synchronously (see connect method)
            return true;

        if (connecting) // connecting was earlier initiated from a different
thread - just let things take their course
            return false;

        connecting = true;

        notification.updateStatus(Notification.STATUS_CONNECTING);
        mqttClient.connect(connectionOptions, null, new
IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                connecting = false;
                Log.d(TAG, "connected");

                for (MessageStash.Message message : stash.get()) {
                    try {
                        send(message.topic(), message.body());
                        message.commit();
                    } catch (IOException e) {
                        // we can safely ignore it here because this code is
executed after the connection is restored
                        // so there will be no need to stash the message,
but even the connection will be lost while
                        // resubmitting messages here there will be no need
to worry - the message will remain stashed
                        // because message.commit will not be executed
                    }
                }

                for (Map.Entry<String, ConnectionBinder> binder :
recipients.entrySet())
                    subscribe(binder.getKey());

                notification.updateStatus(Notification.STATUS_CONNECTED);
            }

            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable
exception) {
                connecting = false;
                // todo: onConnectionLost only on recoverable exceptions
                Log.e(TAG, "Failed to connect :" + exception.toString());
                service.onConnectionLost();
                notification.updateStatus(Notification.STATUS_DISCONNECTED);
            }
        });
        return false;
    }
}

private void subscribe(String topic) {

    final String inboundTopic = getInboundTopic(topic);

    try {

        mqttClient.subscribe(inboundTopic, QoS_EXACLY_ONCE,
                null,
                new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken iMqttToken) {
                        Log.d(TAG, "Successfully subscribed to " +
inboundTopic);
                    }

                    @Override
                    public void onFailure(IMqttToken iMqttToken, Throwable
throwable) {
                        Log.e(TAG, "Subscribe to " + inboundTopic + "
failed: " + throwable.toString());
                    }
                });
    } catch (MqttException e) {
        logger.log(String.format("Exception subscribing to %s",
inboundTopic), e);
    }
}

public void unregisterSubscriber(String topic) {

    String inboundTopic = getInboundTopic(topic);

    if (mqttClient.isConnected())
        try {
            mqttClient.unsubscribe(inboundTopic);
        } catch (MqttException e) {
            logger.log(String.format("Exception unsubscribing from %s",
inboundTopic), e);
        }
    recipients.remove(topic);
}

public void send(String topic, String message) throws IOException {

    String outboundTopic = getOutboundTopic(topic);

    try {
        mqttClient.publish(outboundTopic, message.getBytes(),
QoS_EXACLY_ONCE, true);
        Log.d(TAG, "published to " + outboundTopic + " :" + message);
    } catch (MqttException e) {
        switch (e.getReasonCode()) {
            // todo: double check this is the only recoverable failure
            // it seems likely that REASON_CODE_CLIENT_DISCONNECTING should
also be here
            // I am not 100% sure, but I've seen a message 'Publish of blah
failed ' with this reason code
            case MqttException.REASON_CODE_CLIENT_DISCONNECTING:
            case MqttException.REASON_CODE_CLIENT_NOT_CONNECTED:
                stash.put(topic, message);   // stash it for when the
connection comes back online;
                break;
            default:
                logger.log(String.format("Exception publishing to %s",
outboundTopic), e);
                break;
        }
    }
}

public static final String TAG = "MQTT Connection";
private static final int QoS_EXACLY_ONCE = 2;

private final Service service;
private final Notification notification;
private final Object synchLock = new Object();
private final String applicationRoot;
private final ILogger logger;

private MqttAsyncClient mqttClient;
private HashMap<String, ConnectionBinder> recipients = new HashMap<String,
ConnectionBinder>();
private MessageStash stash;
private boolean connecting;
private String brokerUrl = null;
private String userName;
private String password;




--
View this message in context: http://activemq.2283324.n4.nabble.com/Lost-persistent-message-sent-before-keepAliveInterval-exceed-tp4688049.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message