activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oleg Dulin <oleg.du...@gmail.com>
Subject Re: Broker federation and messages not getting forwarded
Date Wed, 14 Aug 2013 19:03:36 GMT
One more note -- here is a kicker.

If I change the consumers to producer's IP, comment out hte producer, 
and restart the program -- they get all their messages!

This is driving me nuts!

On 2013-08-14 18:59:55 +0000, Oleg Dulin said:

> ----------------17595334911828795876
> 
> Content-Type: text/plain; charset=iso-8859-1; format=flowed
> 
> Content-Transfer-Encoding: 8bit
> 
> 
> 
> Dear Distinguished Colleagues:
> 
> 
> 
> I have the following configuration.
> 
> 
> 
> I have broker1 (7.107) set up as follows:
> 
> 
> 
> <transportConnectors>
> 
>            <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
> 
> </transportConnectors>
> 
> 
> 
> My second broker (7.106) is setup like this:
> 
> 
> 
>  <transportConnectors>
> 
>              <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
> 
> </transportConnectors>
> 
> <networkConnectors>
> 
>             <networkConnector name="connector106.107"
> 
>                                 uri="static:(tcp://192.168.7.107:3200)" 
> 
> duplex="true" />
> 
>  </networkConnectors>
> 
> 
> 
> Once in awhile, but consistently, the federation gets into a funny 
> 
> state where if consumers are on 106, but producer is on 107, 106 
> 
> doesn't get any messages.
> 
> 
> 
> Same happens if roducer is on 106, and consumers are on 107.
> 
> 
> 
> If they are all on the same broker, either one, all is well.
> 
> 
> 
> Once I restart the brokers, everything's fine for awhile, until it is 
> 
> not. Usually a moderate to heavy volume of messages is enough to get 
> 
> the system into this state.
> 
> 
> 
> I am convinced it is either a misconfiguration on my part, or a bug in 
> 
> ActiveMQ. Hopefully, this is just a misconfiguration.
> 
> 
> 
> Here is the test code. I change the IPs for both consumers and producer 
> 
> to test it out.
> 
> 
> 
> public class FederationTest
> 
> {
> 
> 	public static void main(String args[]) throws JMSException, 
> 
> InterruptedException
> 
> 	{
> 
> 		ActiveMQConnectionFactory factory1 = new 
> 
> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
> 
> 		factory1.setUseAsyncSend(true);
> 
> 		initConsumer("c1",factory1);
> 
> 		
> 
> 		
> 
> 		ActiveMQConnectionFactory factory2 = new 
> 
> ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
> 
> 		factory2.setUseAsyncSend(true);
> 
> 		initConsumer("c2",factory2);
> 
> 		
> 
> 		
> 
> 		ActiveMQConnectionFactory factory3 = new 
> 
> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
> 
> 		factory3.setUseAsyncSend(true);
> 
> 		initProducer(factory3);
> 
> 		while(true)
> 
> 		{
> 
> 			Thread.sleep(1000);
> 
> 		}
> 
> 		
> 
> 	}
> 
> 
> 
> 	private static void initProducer(ActiveMQConnectionFactory factory) 
> 
> throws JMSException
> 
> 	{
> 
> 		QueueConnection queueConnection1=factory.createQueueConnection();
> 
> 		queueConnection1.start();
> 
> 		final QueueSession qs1=queueConnection1.createQueueSession(false, 
> 
> Session.AUTO_ACKNOWLEDGE);
> 
> 		Queue q1=qs1.createQueue("TEST");
> 
> 		final QueueSender qSender=qs1.createSender(q1);
> 
> 		new Thread(new Runnable() {
> 
> 
> 
> 			@Override
> 
> 			public void run()
> 
> 			{
> 
> 				int counter=0;
> 
> 				while(true)
> 
> 				{
> 
> 					try
> 
> 					{
> 
> 						TextMessage msg=qs1.createTextMessage();
> 
> 						msg.setText("counter="+counter);
> 
> 						System.out.println("p:"+counter);
> 
> 						counter++;
> 
> 						qSender.send(msg);
> 
> 						
> 
> 						Thread.sleep(1000);
> 
> 					}
> 
> 					catch(Exception exp)
> 
> 					{
> 
> 						exp.printStackTrace();
> 
> 					}
> 
> 					
> 
> 				}
> 
> 			}
> 
> 			
> 
> 		}).start();
> 
> 		
> 
> 		
> 
> 	}
> 
> 
> 
> 	public static void initConsumer(final String 
> 
> cname,ActiveMQConnectionFactory factory) throws JMSException
> 
> 	{
> 
> 		QueueConnection queueConnection1=factory.createQueueConnection();
> 
> 		queueConnection1.start();
> 
> 		QueueSession qs1=queueConnection1.createQueueSession(false, 
> 
> Session.AUTO_ACKNOWLEDGE);
> 
> 		Queue q1=qs1.createQueue("TEST");
> 
> 		MessageConsumer c1=qs1.createConsumer(q1);
> 
> 		c1.setMessageListener(new MessageListener() {
> 
> 			@Override
> 
> 			public void onMessage(Message msg)
> 
> 			{
> 
> 				try
> 
> 				{
> 
> 					TextMessage txt=(TextMessage) msg;
> 
> 					System.out.println(cname+":"+txt.getText());
> 
> 					Thread.sleep((long) (1000*Math.random()));
> 
> 				}
> 
> 				catch(Exception exp)
> 
> 				{
> 
> 					exp.printStackTrace();
> 
> 				}
> 
> 			}
> 
> 			
> 
> 		});
> 
> 	}
> 
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> -- 
> 
> Regards,
> 
> Oleg Dulin
> 
> http://www.olegdulin.com
> 
> ----------------17595334911828795876--
> 
> 
> 
> 
> 
> 
> 


-- 
Regards,
Oleg Dulin
http://www.olegdulin.com



Mime
View raw message