activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From npakudin <>
Subject Lost persistent message sent before keepAliveInterval exceed
Date Thu, 27 Nov 2014 02:43:46 GMT

I have the problem below.

What I do

- There is a MQTT client - Android application. It connects to server and
creates topic with name ServiceTracker.Inbound.userName (may be, the name is
necessary go log analysis).
- Sent there message with all default settings and payload  {"name":"1st
message”} by ActiveMQ console. Message is successfully delivered. If open
Active Subscribers, enqueued = 1, and dequeued = 1
- Turn on Airplane mode at Android, so there is no connection really, but
ActiveMQ still shows it in Active Subscribers.
- Send message WITH checkbox Persistent delivery and payload
{"name":"persistent”} If open Active Subscribers, enqueued = 2, and dequeued
= 2
- Send message WITHOUT checkbox Persistent delivery and payload {"name”:”not
persistent”} If open Active Subscribers, enqueued = 3, and dequeued = 2
- Turn off Airplane mode at Android, connection appears and only one message
is delivered - not persistent.

What I tried

- Restart ActiveMQ.
- Send a few persistent and a few not persistent messages - not persistent
are delivered, persistent aren't
- Delete ActiveMQ and extract again.
- Rebuild Android application and clean all data at phone.

Messages, sent after keepAliveInterval exceed are delivered successfully.
I reproduced it at least 20 times.
I found
, but it seems, that I have the other problem.


ActiveMQ version: apache-activemq-5.10.0

$ java -version
java version "1.6.0_65"
Java(TM) SE Runtime Environment (build 1.6.0_65-b14-462-11M4609)
Java HotSpot(TM) 64-Bit Server VM (build 20.65-b04-462, mixed mode)

OS: Maс OS X 10.9.5

conf/activemq.xml - default, with change 1 line (twice added “+nio")
<transportConnector name="mqtt+nio"
data/activemq.log attached

My code for Android:
(if it’s not enough, let me know)

public boolean connectIfNecessary() throws MqttException, IOException {

    synchronized (synchLock) {

        MqttConnectOptions connectionOptions = new MqttConnectOptions();

        if (mqttClient == null) {

            stash = new MessageStash(applicationRoot + "/" + userName);

            mqttClient = new MqttAsyncClient(
                    new MqttDefaultFilePersistence(applicationRoot + "/" +
            Log.d(TAG, "Broker URL: " + brokerUrl);
            Log.d(TAG, "Connection clientId: " + userName);
            Log.d(TAG, "Application path: " + applicationRoot);

            mqttClient.setCallback(new MqttCallback() {
                public void connectionLost(Throwable cause) {
                    Log.e(TAG, "Connection lost. Cause: " +

                public void messageArrived(String topic, MqttMessage
message) throws Exception {
                    ConnectionBinder recipient =
                    if (recipient != null)
                    Log.d(TAG, "Message " + message + " received");

                public void deliveryComplete(IMqttDeliveryToken token) {
                    Log.d(TAG, "Message delivery complete");

        if(mqttClient.isConnected()) // connection is already active - we
can subscribe to the topic synchronously (see connect method)
            return true;

        if (connecting) // connecting was earlier initiated from a different
thread - just let things take their course
            return false;

        connecting = true;

        mqttClient.connect(connectionOptions, null, new
IMqttActionListener() {
            public void onSuccess(IMqttToken asyncActionToken) {
                connecting = false;
                Log.d(TAG, "connected");

                for (MessageStash.Message message : stash.get()) {
                    try {
                        send(message.topic(), message.body());
                    } catch (IOException e) {
                        // we can safely ignore it here because this code is
executed after the connection is restored
                        // so there will be no need to stash the message,
but even the connection will be lost while
                        // resubmitting messages here there will be no need
to worry - the message will remain stashed
                        // because message.commit will not be executed

                for (Map.Entry<String, ConnectionBinder> binder :


            public void onFailure(IMqttToken asyncActionToken, Throwable
exception) {
                connecting = false;
                // todo: onConnectionLost only on recoverable exceptions
                Log.e(TAG, "Failed to connect :" + exception.toString());
        return false;

private void subscribe(String topic) {

    final String inboundTopic = getInboundTopic(topic);

    try {

        mqttClient.subscribe(inboundTopic, QoS_EXACLY_ONCE,
                new IMqttActionListener() {
                    public void onSuccess(IMqttToken iMqttToken) {
                        Log.d(TAG, "Successfully subscribed to " +

                    public void onFailure(IMqttToken iMqttToken, Throwable
throwable) {
                        Log.e(TAG, "Subscribe to " + inboundTopic + "
failed: " + throwable.toString());
    } catch (MqttException e) {
        logger.log(String.format("Exception subscribing to %s",
inboundTopic), e);

public void unregisterSubscriber(String topic) {

    String inboundTopic = getInboundTopic(topic);

    if (mqttClient.isConnected())
        try {
        } catch (MqttException e) {
            logger.log(String.format("Exception unsubscribing from %s",
inboundTopic), e);

public void send(String topic, String message) throws IOException {

    String outboundTopic = getOutboundTopic(topic);

    try {
        mqttClient.publish(outboundTopic, message.getBytes(),
        Log.d(TAG, "published to " + outboundTopic + " :" + message);
    } catch (MqttException e) {
        switch (e.getReasonCode()) {
            // todo: double check this is the only recoverable failure
            // it seems likely that REASON_CODE_CLIENT_DISCONNECTING should
also be here
            // I am not 100% sure, but I've seen a message 'Publish of blah
failed ' with this reason code
            case MqttException.REASON_CODE_CLIENT_DISCONNECTING:
            case MqttException.REASON_CODE_CLIENT_NOT_CONNECTED:
                stash.put(topic, message);   // stash it for when the
connection comes back online;
                logger.log(String.format("Exception publishing to %s",
outboundTopic), e);

public static final String TAG = "MQTT Connection";
private static final int QoS_EXACLY_ONCE = 2;

private final Service service;
private final Notification notification;
private final Object synchLock = new Object();
private final String applicationRoot;
private final ILogger logger;

private MqttAsyncClient mqttClient;
private HashMap<String, ConnectionBinder> recipients = new HashMap<String,
private MessageStash stash;
private boolean connecting;
private String brokerUrl = null;
private String userName;
private String password;

View this message in context:
Sent from the ActiveMQ - User mailing list archive at

View raw message