activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oleg Dulin <oleg.du...@gmail.com>
Subject Re: Broker federation and messages not getting forwarded
Date Wed, 14 Aug 2013 19:31:03 GMT
Another factoid -- if I don't use "duplex" then all seems to be well.

On 2013-08-14 19:03:36 +0000, Oleg Dulin said:

> One more note -- here is a kicker.
> 
> If I change the consumers to producer's IP, comment out hte producer, 
> and restart the program -- they get all their messages!
> 
> This is driving me nuts!
> 
> On 2013-08-14 18:59:55 +0000, Oleg Dulin said:
> 
>> ----------------17595334911828795876
>> 
>> Content-Type: text/plain; charset=iso-8859-1; format=flowed
>> 
>> Content-Transfer-Encoding: 8bit
>> 
>> 
>> 
>> Dear Distinguished Colleagues:
>> 
>> 
>> 
>> I have the following configuration.
>> 
>> 
>> 
>> I have broker1 (7.107) set up as follows:
>> 
>> 
>> 
>> <transportConnectors>
>> 
>> <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
>> 
>> </transportConnectors>
>> 
>> 
>> 
>> My second broker (7.106) is setup like this:
>> 
>> 
>> 
>> <transportConnectors>
>> 
>> <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
>> 
>> </transportConnectors>
>> 
>> <networkConnectors>
>> 
>> <networkConnector name="connector106.107"
>> 
>> uri="static:(tcp://192.168.7.107:3200)"
>> duplex="true" />
>> 
>> </networkConnectors>
>> 
>> 
>> 
>> Once in awhile, but consistently, the federation gets into a funny
>> state where if consumers are on 106, but producer is on 107, 106
>> doesn't get any messages.
>> 
>> 
>> 
>> Same happens if roducer is on 106, and consumers are on 107.
>> 
>> 
>> 
>> If they are all on the same broker, either one, all is well.
>> 
>> 
>> 
>> Once I restart the brokers, everything's fine for awhile, until it is
>> not. Usually a moderate to heavy volume of messages is enough to get
>> the system into this state.
>> 
>> 
>> 
>> I am convinced it is either a misconfiguration on my part, or a bug in
>> ActiveMQ. Hopefully, this is just a misconfiguration.
>> 
>> 
>> 
>> Here is the test code. I change the IPs for both consumers and producer
>> to test it out.
>> 
>> 
>> 
>> public class FederationTest
>> 
>> {
>> 
>> 	public static void main(String args[]) throws JMSException,
>> InterruptedException
>> 
>> 	{
>> 
>> 		ActiveMQConnectionFactory factory1 = new
>> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
>> 
>> 		factory1.setUseAsyncSend(true);
>> 
>> 		initConsumer("c1",factory1);
>> 
>> 		
>> 
>> 		
>> 
>> 		ActiveMQConnectionFactory factory2 = new
>> ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
>> 
>> 		factory2.setUseAsyncSend(true);
>> 
>> 		initConsumer("c2",factory2);
>> 
>> 		
>> 
>> 		
>> 
>> 		ActiveMQConnectionFactory factory3 = new
>> ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
>> 
>> 		factory3.setUseAsyncSend(true);
>> 
>> 		initProducer(factory3);
>> 
>> 		while(true)
>> 
>> 		{
>> 
>> 			Thread.sleep(1000);
>> 
>> 		}
>> 
>> 		
>> 
>> 	}
>> 
>> 
>> 
>> 	private static void initProducer(ActiveMQConnectionFactory factory)
>> throws JMSException
>> 
>> 	{
>> 
>> 		QueueConnection queueConnection1=factory.createQueueConnection();
>> 
>> 		queueConnection1.start();
>> 
>> 		final QueueSession qs1=queueConnection1.createQueueSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>> 
>> 		Queue q1=qs1.createQueue("TEST");
>> 
>> 		final QueueSender qSender=qs1.createSender(q1);
>> 
>> 		new Thread(new Runnable() {
>> 
>> 
>> 
>> 			@Override
>> 
>> 			public void run()
>> 
>> 			{
>> 
>> 				int counter=0;
>> 
>> 				while(true)
>> 
>> 				{
>> 
>> 					try
>> 
>> 					{
>> 
>> 						TextMessage msg=qs1.createTextMessage();
>> 
>> 						msg.setText("counter="+counter);
>> 
>> 						System.out.println("p:"+counter);
>> 
>> 						counter++;
>> 
>> 						qSender.send(msg);
>> 
>> 						
>> 
>> 						Thread.sleep(1000);
>> 
>> 					}
>> 
>> 					catch(Exception exp)
>> 
>> 					{
>> 
>> 						exp.printStackTrace();
>> 
>> 					}
>> 
>> 					
>> 
>> 				}
>> 
>> 			}
>> 
>> 			
>> 
>> 		}).start();
>> 
>> 		
>> 
>> 		
>> 
>> 	}
>> 
>> 
>> 
>> 	public static void initConsumer(final String
>> cname,ActiveMQConnectionFactory factory) throws JMSException
>> 
>> 	{
>> 
>> 		QueueConnection queueConnection1=factory.createQueueConnection();
>> 
>> 		queueConnection1.start();
>> 
>> 		QueueSession qs1=queueConnection1.createQueueSession(false,
>> Session.AUTO_ACKNOWLEDGE);
>> 
>> 		Queue q1=qs1.createQueue("TEST");
>> 
>> 		MessageConsumer c1=qs1.createConsumer(q1);
>> 
>> 		c1.setMessageListener(new MessageListener() {
>> 
>> 			@Override
>> 
>> 			public void onMessage(Message msg)
>> 
>> 			{
>> 
>> 				try
>> 
>> 				{
>> 
>> 					TextMessage txt=(TextMessage) msg;
>> 
>> 					System.out.println(cname+":"+txt.getText());
>> 
>> 					Thread.sleep((long) (1000*Math.random()));
>> 
>> 				}
>> 
>> 				catch(Exception exp)
>> 
>> 				{
>> 
>> 					exp.printStackTrace();
>> 
>> 				}
>> 
>> 			}
>> 
>> 			
>> 
>> 		});
>> 
>> 	}
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> --
>> 
>> Regards,
>> 
>> Oleg Dulin
>> 
>> http://www.olegdulin.com
>> 
>> ----------------17595334911828795876--


-- 
Regards,
Oleg Dulin
http://www.olegdulin.com



Mime
View raw message