activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Oleg Dulin <oleg.du...@gmail.com>
Subject Broker federation and messages not getting forwarded
Date Wed, 14 Aug 2013 18:59:55 GMT
Dear Distinguished Colleagues:

I have the following configuration.

I have broker1 (7.107) set up as follows:

<transportConnectors>
           <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>

My second broker (7.106) is setup like this:

 <transportConnectors>
             <transportConnector name="openwire" uri="tcp://0.0.0.0:3200"/>
</transportConnectors>
<networkConnectors>
            <networkConnector name="connector106.107"
                                uri="static:(tcp://192.168.7.107:3200)" 
duplex="true" />
 </networkConnectors>

Once in awhile, but consistently, the federation gets into a funny 
state where if consumers are on 106, but producer is on 107, 106 
doesn't get any messages.

Same happens if roducer is on 106, and consumers are on 107.

If they are all on the same broker, either one, all is well.

Once I restart the brokers, everything's fine for awhile, until it is 
not. Usually a moderate to heavy volume of messages is enough to get 
the system into this state.

I am convinced it is either a misconfiguration on my part, or a bug in 
ActiveMQ. Hopefully, this is just a misconfiguration.

Here is the test code. I change the IPs for both consumers and producer 
to test it out.

public class FederationTest
{
	public static void main(String args[]) throws JMSException, 
InterruptedException
	{
		ActiveMQConnectionFactory factory1 = new 
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
		factory1.setUseAsyncSend(true);
		initConsumer("c1",factory1);
		
		
		ActiveMQConnectionFactory factory2 = new 
ActiveMQConnectionFactory("failover:(tcp://192.168.7.106:3200)");
		factory2.setUseAsyncSend(true);
		initConsumer("c2",factory2);
		
		
		ActiveMQConnectionFactory factory3 = new 
ActiveMQConnectionFactory("failover:(tcp://192.168.7.107:3200)");
		factory3.setUseAsyncSend(true);
		initProducer(factory3);
		while(true)
		{
			Thread.sleep(1000);
		}
		
	}

	private static void initProducer(ActiveMQConnectionFactory factory) 
throws JMSException
	{
		QueueConnection queueConnection1=factory.createQueueConnection();
		queueConnection1.start();
		final QueueSession qs1=queueConnection1.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
		Queue q1=qs1.createQueue("TEST");
		final QueueSender qSender=qs1.createSender(q1);
		new Thread(new Runnable() {

			@Override
			public void run()
			{
				int counter=0;
				while(true)
				{
					try
					{
						TextMessage msg=qs1.createTextMessage();
						msg.setText("counter="+counter);
						System.out.println("p:"+counter);
						counter++;
						qSender.send(msg);
						
						Thread.sleep(1000);
					}
					catch(Exception exp)
					{
						exp.printStackTrace();
					}
					
				}
			}
			
		}).start();
		
		
	}

	public static void initConsumer(final String 
cname,ActiveMQConnectionFactory factory) throws JMSException
	{
		QueueConnection queueConnection1=factory.createQueueConnection();
		queueConnection1.start();
		QueueSession qs1=queueConnection1.createQueueSession(false, 
Session.AUTO_ACKNOWLEDGE);
		Queue q1=qs1.createQueue("TEST");
		MessageConsumer c1=qs1.createConsumer(q1);
		c1.setMessageListener(new MessageListener() {
			@Override
			public void onMessage(Message msg)
			{
				try
				{
					TextMessage txt=(TextMessage) msg;
					System.out.println(cname+":"+txt.getText());
					Thread.sleep((long) (1000*Math.random()));
				}
				catch(Exception exp)
				{
					exp.printStackTrace();
				}
			}
			
		});
	}
}




-- 
Regards,
Oleg Dulin
http://www.olegdulin.com
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message