activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jlindwall <jlindw...@yahoo.com>
Subject Re: Cluster: Slave not forwarding messages?
Date Fri, 17 Jul 2015 19:57:05 GMT
Below I am going to attempt to include my java code (producer and consumer)
as well as the activemq.xml files for my masters and slaves.  

The consumer is called ActiveMQFailOverDurableMessageListener.java, and I
run it using a discovery transport url, like this:


> /usr/bin/java -cp .
> com.xifin.jms.tools.activemq.ActiveMQFailOverDurableMessageListener
> 'discovery:(multicast://default)?maxReconnectAttempts=10' id

The producer is called ActiveMQFailOverMessageSender.java, and I run it like
this:


> /usr/bin/java -cp .
> com.xifin.jms.tools.activemq.ActiveMQFailOverMessageSender
> tcp://brokerhost:61612 58

Here is the code for the consumer:


ActiveMQFailOverDurableMessageListener.java wrote
> package com.xifin.jms.tools.activemq;
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.log4j.Appender;
> import org.apache.log4j.BasicConfigurator;
> import org.apache.log4j.Level;
> import org.apache.log4j.Logger;
> 
> import javax.jms.*;
> 
> import static java.lang.String.format;
> 
> /**
>  * Listens for & dumps JMS messages until interrupted with Ctrl-C
>  */
> public class ActiveMQFailOverDurableMessageListener  implements
> MessageListener {
> 
>     private static final Logger log =
> Logger.getLogger(ActiveMQFailOverDurableMessageListener.class);
> 
>     public static final String TOPICNAME = "failover";
>     public static final int SLEEP_IN_MS = 1000;
> 
>     private final String brokerUrl;
>     private final String clientId;
> 
>     private boolean closed = false;
> 
>     public ActiveMQFailOverDurableMessageListener(String publisherUrl,
> String clientId) {
>         this.brokerUrl = publisherUrl;
>         this.clientId = clientId;
>     }
> 
>     public static void main(String[] args) throws JMSException {
>         BasicConfigurator.configure();
>         final Logger rootLogger = Logger.getRootLogger();
> 
>         rootLogger.setLevel(Level.INFO);
>         Appender appender = (Appender)
> rootLogger.getAllAppenders().nextElement();
>         rootLogger.removeAppender(appender);
> 
>         String clientId = "clientId";
> 
>         String publisherUrl = args[0];
>         if( args.length > 1 ) {
>             clientId = args[1];
>         }
> 
>         ActiveMQFailOverDurableMessageListener listener = new
> ActiveMQFailOverDurableMessageListener(publisherUrl, clientId);
>         listener.listen();
>     }
> 
>     private void listen() throws JMSException {
>         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(brokerUrl);
> 
>         final Connection connection =
> connectionFactory.createConnection();
>         connection.setClientID(clientId);
> 
>         final Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 
>         // Create the destination (Topic or Queue)
>         Topic destination = session.createTopic(TOPICNAME);
> 
>         final TopicSubscriber subscriber =
> session.createDurableSubscriber(destination, clientId);
> 
>         boolean runForever = true;
>         subscriber.setMessageListener(this);
>         connection.start();
> 
>         Runtime.getRuntime().addShutdownHook(new Thread() {
>             public void run() {
>                 closeAll(subscriber, session, connection);
>             }
>         });
> 
>         log.info(format("DURABLY Listening for messages on broker %s",
> brokerUrl));
> 
>         // The onMessage method will be called once for each message
>         while (true) {
>             sleep(60000);
>         }
>     }
> 
>     private void closeAll(TopicSubscriber subscriber, Session session,
> Connection connection) {
>         if( closed ) {
>             return;
>         }
>         try {
>             subscriber.close();
>             session.unsubscribe(clientId);
>             session.close();
>             connection.stop();
>             connection.close();
>             closed = true;
>             log.info("Stopped connection and unsubscribed to topic");
>         }catch(Exception e) {
>             log.error("Failed to disconnect and unsubscribe!", e);
>         }
>     }
> 
>     @Override
>     public void onMessage(Message message) {
>         dumpMessage(message);
>     }
> 
>     private void dumpMessage(Message message) {
>         if( message != null ) {
>             try {
>                 int setId = message.getIntProperty("Set-Id");
>                 int sequenceId = message.getIntProperty("Sequence-Id");
>                 boolean redelivered =
> message.getBooleanProperty("JMSRedelivered");
> 
>                 log.info(format("Sequence #%d (in set %d) received%s",
> sequenceId, setId, (redelivered ? " REDELIVERED" : "")));
>                 sleep(SLEEP_IN_MS);
>                 log.info(format("         #%d is done getting processed
> (in set %d)", sequenceId, setId));
>             } catch (JMSException e) {
>                 log.error("Error accessing message data", e);
>             }
>         }
>     }
> 
>     private void sleep(int ms) {
>         try {
>             Thread.sleep(ms);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>         }
>     }
> }

Here is the code for the producer:


ActiveMQFailOverMessageSender.java wrote
> package com.xifin.jms.tools.activemq;
> 
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.log4j.Appender;
> import org.apache.log4j.BasicConfigurator;
> import org.apache.log4j.Level;
> import org.apache.log4j.Logger;
> 
> import javax.jms.*;
> 
> import static java.lang.String.format;
> 
> /**
>  * Send a set of 10 messages to a topic, each with a constant "set id" and
> an incrementing "sequence id"
>  */
> public class ActiveMQFailOverMessageSender {
>     private static final Logger log =
> Logger.getLogger(ActiveMQFailOverMessageSender.class);
> 
>     public static final String TOPICNAME = "failover";
> 
>     private final String publisherUrl;
> 
>     public ActiveMQFailOverMessageSender(String publisherUrl) {
>         this.publisherUrl = publisherUrl;
>     }
> 
>     public static void main(String[] args) throws JMSException {
>         BasicConfigurator.configure();
> 
>         final Logger rootLogger = Logger.getRootLogger();
> 
>         rootLogger.setLevel(Level.INFO);
>         Appender appender = (Appender)
> rootLogger.getAllAppenders().nextElement();
>         rootLogger.removeAppender(appender);
> 
>         String publisherUrl = args[0];
>         int setId = Integer.valueOf(args[1]);
> 
>         ActiveMQFailOverMessageSender sender = new
> ActiveMQFailOverMessageSender(publisherUrl);
>         sender.publishMessageSet(setId);
> 
>         log.info("Done");
>     }
> 
>     private void publishMessageSet(int setId) throws JMSException {
>         ActiveMQConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory(publisherUrl);
> 
>         Connection connection = connectionFactory.createConnection();
>         connection.start();
> 
>         Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 
>         // Create the destination (Topic or Queue)
>         Destination destination = session.createTopic(TOPICNAME);
> 
>         MessageProducer producer = session.createProducer(destination);
>         producer.setDeliveryMode(DeliveryMode.PERSISTENT);
> 
>         long start = System.currentTimeMillis();
>         try {
> 
>             for (int i = 1; i <= 10; i++) {
>                 ObjectMessage msg = session.createObjectMessage();
> 
>                 msg.setIntProperty("Set-Id", setId);
>                 msg.setIntProperty("Sequence-Id", i);
> 
>                 long msgStart = System.currentTimeMillis();
>                 producer.send(msg);
>                 long msgEnd = System.currentTimeMillis();
>                 log.info(format("  Time elapsed for msg %02d: %d ms", i,
> (msgEnd - msgStart)));
>             }
>         } finally {
>             long end = System.currentTimeMillis();
> 
>             log.info(format("Total elapsed time: %d ms", (end - start)));
>             session.close();
>             connection.close();
>         }
>     }
> }

Here are the activemq.xml files for master1, slave1, master2, and slave2:


master1 activemq.xml wrote
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd  
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>     

>     
> <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         
> <property name="locations">
>             
> <value>
> file:${activemq.conf}/credentials.properties
> </value>
>         
> </property>
>     
> </bean>
>    

>     
> <bean class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> destroy-method="stop" id="logQuery" init-method="start" lazy-init="false"
> scope="singleton">
>     
> </bean>
>     

>     
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="master1" dataDirectory="${activemq.data}">
>         
> <destinationPolicy>
>             
> <policyMap>
>               
> <policyEntries>
>                 
> <policyEntry topic="&gt;">
>                     

>                   
> <pendingMessageLimitStrategy>
>                     
> <constantPendingMessageLimitStrategy limit="1000"/>
>                   
> </pendingMessageLimitStrategy>
>                 
> </policyEntry>
>               
> </policyEntries>
>             
> </policyMap>
>         
> </destinationPolicy>
> 
>         

>         
> <managementContext>
>             
> <managementContext connectorPort="1911" createConnector="true"/>
>         
> </managementContext>
>         

>         
> <persistenceAdapter>
>             
> <kahaDB directory="/home/ops/activemq-cluster/master1/data/kahadb"/>
>         
> </persistenceAdapter>
> 
>           

>           
> <systemUsage>
>             
> <systemUsage>
>                 
> <memoryUsage>
>                     
> <memoryUsage percentOfJvmHeap="70"/>
>                 
> </memoryUsage>
>                 
> <storeUsage>
>                     
> <storeUsage limit="100 gb"/>
>                 
> </storeUsage>
>                 
> <tempUsage>
>                     
> <tempUsage limit="50 gb"/>
>                 
> </tempUsage>
>             
> </systemUsage>
>         
> </systemUsage>
>         

>         
> <transportConnectors>
>             

>             
> <transportConnector name="openwire" uri="tcp://172.10.10.10:61611"
> discoveryUri="multicast://default??wireFormat.maxFrameSize=104857600"/>
>         
> </transportConnectors>
>         
> <networkConnectors>
>           
> <networkConnector uri="multicast://default" />
>         
> </networkConnectors>
>         

>         
> <shutdownHooks>
>             
> <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook"/>
>         
> </shutdownHooks>
>     
> </broker>
>     

>     
> <import resource="jetty.xml"/>
> 
> </beans>


slave1 activemq.xml wrote
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd  
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>     

>     
> <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         
> <property name="locations">
>             
> <value>
> file:${activemq.conf}/credentials.properties
> </value>
>         
> </property>
>     
> </bean>
>    

>     
> <bean class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> destroy-method="stop" id="logQuery" init-method="start" lazy-init="false"
> scope="singleton">
>     
> </bean>
>     

>     
> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="slave1"
> dataDirectory="${activemq.data}">
>         
> <destinationPolicy>
>             
> <policyMap>
>               
> <policyEntries>
>                 
> <policyEntry topic="&gt;">
>                     

>                   
> <pendingMessageLimitStrategy>
>                     
> <constantPendingMessageLimitStrategy limit="1000"/>
>                   
> </pendingMessageLimitStrategy>
>                 
> </policyEntry>
>               
> </policyEntries>
>             
> </policyMap>
>         
> </destinationPolicy>
> 
>         

>         
> <managementContext>
>             
> <managementContext connectorPort="1913" createConnector="true"/>
>         
> </managementContext>
>         

>         
> <persistenceAdapter>
>             
> <kahaDB directory="/home/ops/activemq-cluster/master1/data/kahadb"/>
>         
> </persistenceAdapter>
> 
>           

>           
> <systemUsage>
>             
> <systemUsage>
>                 
> <memoryUsage>
>                     
> <memoryUsage percentOfJvmHeap="70"/>
>                 
> </memoryUsage>
>                 
> <storeUsage>
>                     
> <storeUsage limit="100 gb"/>
>                 
> </storeUsage>
>                 
> <tempUsage>
>                     
> <tempUsage limit="50 gb"/>
>                 
> </tempUsage>
>             
> </systemUsage>
>         
> </systemUsage>
>         

>         
> <transportConnectors>
>             

>             
> <transportConnector name="openwire" uri="tcp://172.10.10.10:61613"
> discoveryUri="multicast://default??wireFormat.maxFrameSize=104857600"/>
>         
> </transportConnectors>
>         
> <networkConnectors>
>           
> <networkConnector uri="multicast://default" />
>         
> </networkConnectors>
>         

>         
> <shutdownHooks>
>             
> <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook"/>
>         
> </shutdownHooks>
>     
> </broker>
>     

>     
> <import resource="jetty.xml"/>
> 
> </beans>


master2 activemq.xml wrote
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd  
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>     

>     
> <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         
> <property name="locations">
>             
> <value>
> file:${activemq.conf}/credentials.properties
> </value>
>         
> </property>
>     
> </bean>
>    

>     
> <bean class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> destroy-method="stop" id="logQuery" init-method="start" lazy-init="false"
> scope="singleton">
>     
> </bean>
>     

>     
> <broker xmlns="http://activemq.apache.org/schema/core"
> brokerName="master2" dataDirectory="${activemq.data}">
>         
> <destinationPolicy>
>             
> <policyMap>
>               
> <policyEntries>
>                 
> <policyEntry topic="&gt;">
>                     

>                   
> <pendingMessageLimitStrategy>
>                     
> <constantPendingMessageLimitStrategy limit="1000"/>
>                   
> </pendingMessageLimitStrategy>
>                 
> </policyEntry>
>               
> </policyEntries>
>             
> </policyMap>
>         
> </destinationPolicy>
> 
>         

>         
> <managementContext>
>             
> <managementContext connectorPort="1912" createConnector="true"/>
>         
> </managementContext>
>         

>         
> <persistenceAdapter>
>             
> <kahaDB directory="/home/ops/activemq-cluster/master2/data/kahadb"/>
>         
> </persistenceAdapter>
> 
>           

>           
> <systemUsage>
>             
> <systemUsage>
>                 
> <memoryUsage>
>                     
> <memoryUsage percentOfJvmHeap="70"/>
>                 
> </memoryUsage>
>                 
> <storeUsage>
>                     
> <storeUsage limit="100 gb"/>
>                 
> </storeUsage>
>                 
> <tempUsage>
>                     
> <tempUsage limit="50 gb"/>
>                 
> </tempUsage>
>             
> </systemUsage>
>         
> </systemUsage>
>         

>         
> <transportConnectors>
>             

>             
> <transportConnector name="openwire" uri="tcp://172.10.10.10:61612"
> discoveryUri="multicast://default??wireFormat.maxFrameSize=104857600"/>
>         
> </transportConnectors>
>         
> <networkConnectors>
>           
> <networkConnector uri="multicast://default" />
>         
> </networkConnectors>
>         

>         
> <shutdownHooks>
>             
> <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook"/>
>         
> </shutdownHooks>
>     
> </broker>
>     

>     
> <import resource="jetty.xml"/>
> 
> </beans>


slave2 activemq.xml wrote
> <?xml version="1.0" encoding="UTF-8"?>
> <beans xmlns="http://www.springframework.org/schema/beans"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="http://www.springframework.org/schema/beans
> http://www.springframework.org/schema/beans/spring-beans.xsd  
> http://activemq.apache.org/schema/core
> http://activemq.apache.org/schema/core/activemq-core.xsd">
>     

>     
> <bean
> class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
>         
> <property name="locations">
>             
> <value>
> file:${activemq.conf}/credentials.properties
> </value>
>         
> </property>
>     
> </bean>
>    

>     
> <bean class="io.fabric8.insight.log.log4j.Log4jLogQuery"
> destroy-method="stop" id="logQuery" init-method="start" lazy-init="false"
> scope="singleton">
>     
> </bean>
>     

>     
> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="slave2"
> dataDirectory="${activemq.data}">
>         
> <destinationPolicy>
>             
> <policyMap>
>               
> <policyEntries>
>                 
> <policyEntry topic="&gt;">
>                     

>                   
> <pendingMessageLimitStrategy>
>                     
> <constantPendingMessageLimitStrategy limit="1000"/>
>                   
> </pendingMessageLimitStrategy>
>                 
> </policyEntry>
>               
> </policyEntries>
>             
> </policyMap>
>         
> </destinationPolicy>
> 
>         

>         
> <managementContext>
>             
> <managementContext connectorPort="1914" createConnector="true"/>
>         
> </managementContext>
>         

>         
> <persistenceAdapter>
>             
> <kahaDB directory="/home/ops/activemq-cluster/master2/data/kahadb"/>
>         
> </persistenceAdapter>
> 
>           

>           
> <systemUsage>
>             
> <systemUsage>
>                 
> <memoryUsage>
>                     
> <memoryUsage percentOfJvmHeap="70"/>
>                 
> </memoryUsage>
>                 
> <storeUsage>
>                     
> <storeUsage limit="100 gb"/>
>                 
> </storeUsage>
>                 
> <tempUsage>
>                     
> <tempUsage limit="50 gb"/>
>                 
> </tempUsage>
>             
> </systemUsage>
>         
> </systemUsage>
>         

>         
> <transportConnectors>
>             

>             
> <transportConnector name="openwire" uri="tcp://172.10.10.10:61614"
> discoveryUri="multicast://default??wireFormat.maxFrameSize=104857600"/>
>         
> </transportConnectors>
>         
> <networkConnectors>
>           
> <networkConnector uri="multicast://default" />
>         
> </networkConnectors>
>         

>         
> <shutdownHooks>
>             
> <bean xmlns="http://www.springframework.org/schema/beans"
> class="org.apache.activemq.hooks.SpringContextHook"/>
>         
> </shutdownHooks>
>     
> </broker>
>     

>     
> <import resource="jetty.xml"/>
> 
> </beans>







--
View this message in context: http://activemq.2283324.n4.nabble.com/Cluster-Slave-not-forwarding-messages-tp4699365p4699456.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message