activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ihor Mochurad (JIRA)" <j...@apache.org>
Subject [jira] [Created] (AMQ-5150) ActiveMQ failover seems not to work in 5.9.1 on MacOSX
Date Wed, 16 Apr 2014 15:26:15 GMT
Ihor Mochurad created AMQ-5150:
----------------------------------

             Summary: ActiveMQ failover seems not to work in 5.9.1 on MacOSX
                 Key: AMQ-5150
                 URL: https://issues.apache.org/jira/browse/AMQ-5150
             Project: ActiveMQ
          Issue Type: Bug
          Components: Connector
    Affects Versions: 5.9.1
         Environment: MacOSx
            Reporter: Ihor Mochurad


I have super simple scenario: one broker and one consumer with durable subscription.
This is the code of my consumer app: 

    package test;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import javax.jms.Topic;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import pojo.Event;
    import pojo.StockUpdate;
    
    public class Consumer
    {
    
        private static transient ConnectionFactory factory;
        private transient Connection connection;
        private transient Session session;
        public static int counter = 0;
    
        public Consumer(String brokerURL) throws JMSException
        {
            factory = new ActiveMQConnectionFactory(brokerURL);
            connection = factory.createConnection();
            connection.setClientID("CLUSTER_CLIENT_1");
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }
    
        public void close() throws JMSException
        {
            if (connection != null)
            {
                connection.close();
            }
        }
    
        public static void main(String[] args) throws JMSException
        {
    
            try
            {
                // extract topics from the rest of arguments
                String[] topics = new String[2];
                topics[0] = "CSCO";
                topics[1] = "ORCL";
    
                // define connection URI
                Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true");
    
                for (String stock : topics)
                {
                    try
                    {
                        Destination destination = consumer.getSession().createTopic("STOCKS."
+ stock);
                        // consumer.getSession().
                        MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic)
destination, "STOCKS_DURABLE_CONSUMER_" + stock);
                        messageConsumer.setMessageListener(new Listener());
                    }
                    catch (JMSException e)
                    {
                        e.printStackTrace();
                    }
                }
            }
            catch (Throwable t)
            {
                t.printStackTrace();
            }
    
        }
    
        public Session getSession()
        {
            return session;
        }
    
    }
    
    class Listener implements MessageListener
    {
    
        public void onMessage(Message message)
        {
            try
            {
                TextMessage textMessage = (TextMessage) message;
                String json = textMessage.getText();
                Event event = StockUpdate.fromJSON(json, StockUpdate.class);
                System.out.println("Consumed message #:" + ++Consumer.counter + "\n" + event);
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }
    
    }

Here is my activemq.xml

    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- Allows us to use system properties as variables in this configuration file
-->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="R6_cluster_broker1"
persistent="true">
        
    		<networkConnectors>
    			<networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/>
    		</networkConnectors>
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
    
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
            <!--
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
    
                http://activemq.apache.org/persistence.html
            -->
            <persistenceAdapter>
                <kahaDB directory="/work/temp/kahadb"/>
            </persistenceAdapter>
    
    
              <!--
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information,
see:
                http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
    
                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size
to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
-->
            </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"
/>
            </shutdownHooks>
    
        </broker>
    
        <!--
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml
file
    
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml"/>
    
    </beans>


When I have both broker and consumer running and then stop the broker my consumer exits few
moments after. As far I understood it must attempt to reconnect, but it is not the case. What
am I doing wrong, please advise.

!NOTE! I launch my consumer in Eclipse, i do not build a standalone jar for this task.   

I have updated my broker to the latest 5.9.1 and did the same to my consumer. Result is the
same - after I stop the broker my consumer dies few seconds after. It works fine if broker
is up and running. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message