activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Timothy Bish (JIRA)" <j...@apache.org>
Subject [jira] [Closed] (AMQ-5150) ActiveMQ failover seems not to work in 5.9.1 on MacOSX
Date Mon, 28 Apr 2014 16:14:16 GMT

     [ https://issues.apache.org/jira/browse/AMQ-5150?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Timothy Bish closed AMQ-5150.
-----------------------------

    Resolution: Not a Problem

The is expected behaviour with failover.  You need to ensure you application keeps it's main
thread open.  

> ActiveMQ failover seems not to work in 5.9.1 on MacOSX
> ------------------------------------------------------
>
>                 Key: AMQ-5150
>                 URL: https://issues.apache.org/jira/browse/AMQ-5150
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Connector
>    Affects Versions: 5.9.1
>         Environment: MacOSx
>            Reporter: Ihor Mochurad
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> I have super simple scenario: one broker and one consumer with durable subscription.
> This is the code of my consumer app: 
>     package test;
>     
>     import javax.jms.Connection;
>     import javax.jms.ConnectionFactory;
>     import javax.jms.Destination;
>     import javax.jms.JMSException;
>     import javax.jms.Message;
>     import javax.jms.MessageConsumer;
>     import javax.jms.MessageListener;
>     import javax.jms.Session;
>     import javax.jms.TextMessage;
>     import javax.jms.Topic;
>     
>     import org.apache.activemq.ActiveMQConnectionFactory;
>     
>     import pojo.Event;
>     import pojo.StockUpdate;
>     
>     public class Consumer
>     {
>     
>         private static transient ConnectionFactory factory;
>         private transient Connection connection;
>         private transient Session session;
>         public static int counter = 0;
>     
>         public Consumer(String brokerURL) throws JMSException
>         {
>             factory = new ActiveMQConnectionFactory(brokerURL);
>             connection = factory.createConnection();
>             connection.setClientID("CLUSTER_CLIENT_1");
>             connection.start();
>             session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         }
>     
>         public void close() throws JMSException
>         {
>             if (connection != null)
>             {
>                 connection.close();
>             }
>         }
>     
>         public static void main(String[] args) throws JMSException
>         {
>     
>             try
>             {
>                 // extract topics from the rest of arguments
>                 String[] topics = new String[2];
>                 topics[0] = "CSCO";
>                 topics[1] = "ORCL";
>     
>                 // define connection URI
>                 Consumer consumer = new Consumer("failover:(tcp://localhost:61616)?maxReconnectAttempts=-1&useExponentialBackOff=true");
>     
>                 for (String stock : topics)
>                 {
>                     try
>                     {
>                         Destination destination = consumer.getSession().createTopic("STOCKS."
+ stock);
>                         // consumer.getSession().
>                         MessageConsumer messageConsumer = consumer.getSession().createDurableSubscriber((Topic)
destination, "STOCKS_DURABLE_CONSUMER_" + stock);
>                         messageConsumer.setMessageListener(new Listener());
>                     }
>                     catch (JMSException e)
>                     {
>                         e.printStackTrace();
>                     }
>                 }
>             }
>             catch (Throwable t)
>             {
>                 t.printStackTrace();
>             }
>     
>         }
>     
>         public Session getSession()
>         {
>             return session;
>         }
>     
>     }
>     
>     class Listener implements MessageListener
>     {
>     
>         public void onMessage(Message message)
>         {
>             try
>             {
>                 TextMessage textMessage = (TextMessage) message;
>                 String json = textMessage.getText();
>                 Event event = StockUpdate.fromJSON(json, StockUpdate.class);
>                 System.out.println("Consumed message #:" + ++Consumer.counter + "\n"
+ event);
>             }
>             catch (Exception e)
>             {
>                 e.printStackTrace();
>             }
>         }
>     
>     }
> Here is my activemq.xml
>     <beans
>       xmlns="http://www.springframework.org/schema/beans"
>       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
>       http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
>     
>         <!-- Allows us to use system properties as variables in this configuration
file -->
>         <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>             <property name="locations">
>                 <value>file:${activemq.conf}/credentials.properties</value>
>             </property>
>         </bean>
>     
>         <!--
>             The <broker> element is used to configure the ActiveMQ broker.
>         -->
>         <broker xmlns="http://activemq.apache.org/schema/core" brokerName="R6_cluster_broker1"
persistent="true">
>         
>     		<networkConnectors>
>     			<networkConnector uri="static:(failover:(tcp://remote_master:61616,tcp://remote_slave:61617))"/>
>     		</networkConnectors>
>     
>             <destinationPolicy>
>                 <policyMap>
>                   <policyEntries>
>                     <policyEntry topic=">" >
>                         <!-- The constantPendingMessageLimitStrategy is used to prevent
>                              slow topic consumers to block producers and affect other
consumers
>                              by limiting the number of messages that are retained
>                              For more information, see:
>     
>                              http://activemq.apache.org/slow-consumer-handling.html
>     
>                         -->
>                       <pendingMessageLimitStrategy>
>                         <constantPendingMessageLimitStrategy limit="1000"/>
>                       </pendingMessageLimitStrategy>
>                     </policyEntry>
>                   </policyEntries>
>                 </policyMap>
>             </destinationPolicy>
>     
>     
>             <!--
>                 The managementContext is used to configure how ActiveMQ is exposed in
>                 JMX. By default, ActiveMQ uses the MBean server that is started by
>                 the JVM. For more information, see:
>     
>                 http://activemq.apache.org/jmx.html
>             -->
>             <managementContext>
>                 <managementContext createConnector="false"/>
>             </managementContext>
>     
>             <!--
>                 Configure message persistence for the broker. The default persistence
>                 mechanism is the KahaDB store (identified by the kahaDB tag).
>                 For more information, see:
>     
>                 http://activemq.apache.org/persistence.html
>             -->
>             <persistenceAdapter>
>                 <kahaDB directory="/work/temp/kahadb"/>
>             </persistenceAdapter>
>     
>     
>               <!--
>                 The systemUsage controls the maximum amount of space the broker will
>                 use before disabling caching and/or slowing down producers. For more
information, see:
>                 http://activemq.apache.org/producer-flow-control.html
>               -->
>               <systemUsage>
>                 <systemUsage>
>                     <memoryUsage>
>                         <memoryUsage percentOfJvmHeap="70" />
>                     </memoryUsage>
>                     <storeUsage>
>                         <storeUsage limit="100 gb"/>
>                     </storeUsage>
>                     <tempUsage>
>                         <tempUsage limit="50 gb"/>
>                     </tempUsage>
>                 </systemUsage>
>             </systemUsage>
>     
>             <!--
>                 The transport connectors expose ActiveMQ over a given protocol to
>                 clients and other brokers. For more information, see:
>     
>                 http://activemq.apache.org/configuring-transports.html
>             -->
>             <transportConnectors>
>                 <!-- DOS protection, limit concurrent connections to 1000 and frame
size to 100MB -->
>                 <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>                 <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>                 <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>                 <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
>                 <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
-->
>             </transportConnectors>
>     
>             <!-- destroy the spring context on shutdown to stop jetty -->
>             <shutdownHooks>
>                 <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook"
/>
>             </shutdownHooks>
>     
>         </broker>
>     
>         <!--
>             Enable web consoles, REST and Ajax APIs and demos
>             The web consoles requires by default login, you can disable this in the jetty.xml
file
>     
>             Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
>         -->
>         <import resource="jetty.xml"/>
>     
>     </beans>
> When I have both broker and consumer running and then stop the broker my consumer exits
few moments after. As far I understood it must attempt to reconnect, but it is not the case.
What am I doing wrong, please advise.
> !NOTE! I launch my consumer in Eclipse, i do not build a standalone jar for this task.
  
> I have updated my broker to the latest 5.9.1 and did the same to my consumer. Result
is the same - after I stop the broker my consumer dies few seconds after. It works fine if
broker is up and running. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message