activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ihor Mochurad (JIRA)" <>
Subject [jira] [Created] (AMQ-5150) ActiveMQ failover seems not to work in 5.9.1 on MacOSX
Date Wed, 16 Apr 2014 15:26:15 GMT
Ihor Mochurad created AMQ-5150:

             Summary: ActiveMQ failover seems not to work in 5.9.1 on MacOSX
                 Key: AMQ-5150
             Project: ActiveMQ
          Issue Type: Bug
          Components: Connector
    Affects Versions: 5.9.1
         Environment: MacOSx
            Reporter: Ihor Mochurad

I have super simple scenario: one broker and one consumer with durable subscription.
This is the code of my consumer app: 

    package test;
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import pojo.Event;
    import pojo.StockUpdate;
    public class Consumer
        private static transient ConnectionFactory factory;
        private transient Connection connection;
        private transient Session session;
        public static int counter = 0;
        public Consumer(String brokerURL) throws JMSException
            factory = new ActiveMQConnectionFactory(brokerURL);
            connection = factory.createConnection();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        public void close() throws JMSException
            if (connection != null)
        public static void main(String[] args) throws JMSException
                // extract topics from the rest of arguments
                String[] topics = new String[2];
                topics[0] = "CSCO";
                topics[1] = "ORCL";
                // define connection URI
                Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true");
                for (String stock : topics)
                        Destination destination = consumer.getSession().createTopic("STOCKS."
+ stock);
                        // consumer.getSession().
                        MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic)
destination, "STOCKS_DURABLE_CONSUMER_" + stock);
                        messageConsumer.setMessageListener(new Listener());
                    catch (JMSException e)
            catch (Throwable t)
        public Session getSession()
            return session;
    class Listener implements MessageListener
        public void onMessage(Message message)
                TextMessage textMessage = (TextMessage) message;
                String json = textMessage.getText();
                Event event = StockUpdate.fromJSON(json, StockUpdate.class);
                System.out.println("Consumed message #:" + ++Consumer.counter + "\n" + event);
            catch (Exception e)

Here is my activemq.xml

        <!-- Allows us to use system properties as variables in this configuration file
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
            The <broker> element is used to configure the ActiveMQ broker.
        <broker xmlns="" brokerName="R6_cluster_broker1"
    			<networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
                        <constantPendingMessageLimitStrategy limit="1000"/>
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
                <managementContext createConnector="false"/>
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
                <kahaDB directory="/work/temp/kahadb"/>
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information,
                        <memoryUsage percentOfJvmHeap="70" />
                        <storeUsage limit="100 gb"/>
                        <tempUsage limit="50 gb"/>
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
                <!-- DOS protection, limit concurrent connections to 1000 and frame size
to 100MB -->
                <transportConnector name="openwire" uri="tcp://;wireFormat.maxFrameSize=104857600"/>
                <!-- <transportConnector name="amqp" uri="amqp://;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://;wireFormat.maxFrameSize=104857600"/>
            <!-- destroy the spring context on shutdown to stop jetty -->
                <bean xmlns="" class="org.apache.activemq.hooks.SpringContextHook"
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        <import resource="jetty.xml"/>

When I have both broker and consumer running and then stop the broker my consumer exits few
moments after. As far I understood it must attempt to reconnect, but it is not the case. What
am I doing wrong, please advise.

!NOTE! I launch my consumer in Eclipse, i do not build a standalone jar for this task.   

I have updated my broker to the latest 5.9.1 and did the same to my consumer. Result is the
same - after I stop the broker my consumer dies few seconds after. It works fine if broker
is up and running. 

This message was sent by Atlassian JIRA

View raw message