activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From baratox <nando...@gmail.com>
Subject Failover Transport + MessageListener stops receiving messages after recovering
Date Thu, 05 Apr 2012 01:20:52 GMT
Hello!

I'm using the Failover Transport to connect to an embedded broker that is
not created automatically.
I created an ActiveMQConnectionFactory using
"failover://(vm://testBroker?create=false)", from which I get 2 connections,
one for a MessageProducer and the other for a MessageConsumer (using a
MessageListener to receive messages).

After I restart the broker, the producer is able to continue sending
messages without a problem, but the consumer stops receiving them. It works
fine if I use consumer.receive() to retrieve the messages...

This is the code I'm using:

public void testConsumerFailover() throws Exception {
	BrokerService broker = new SslBrokerService();
	broker.setBrokerName("testBroker");
	broker.start();
	broker.waitUntilStarted();
	
	ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("failover://(vm://testBroker?create=false)");

	Connection conn = factory.createConnection();
	conn.start();
	Session senderSession = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
	MessageProducer producer =
senderSession.createProducer(senderSession.createQueue("Q"));
	producer.send(senderSession.createMessage());
	
	Connection conn2 = factory.createConnection();
	conn2.start();
	Session consumerSession = conn2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
	MessageConsumer consumer =
consumerSession.createConsumer(consumerSession.createQueue("Q"));
	TestMessageListener listener = new TestMessageListener();
	consumer.setMessageListener(listener);
	assertNotNull(listener.waitForMessage(1000));
//		assertNotNull(consumer.receive(1000));
	
	System.out.println("Restarting broker");
	broker.stop();
	broker.waitUntilStopped();
	
	broker = new BrokerService();
	broker.setBrokerName("testBroker");
	broker.start();
	broker.waitUntilStarted();
	
	System.out.println("Sending after failover");
	producer.send(senderSession.createMessage());
	
	System.out.println("Consuming after failover");
//		assertNotNull(consumer.receive(1000));
	assertNotNull(listener.waitForMessage(5 * 1000));
}

private class TestMessageListener implements MessageListener {
	private Message msg = null;
	
	public void onMessage(Message msg) {
		this.msg = msg;
	}
	
	private Message waitForMessage(long timeout) {
		try {
			Thread.sleep(timeout);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		Message msg = this.msg;
		this.msg = null;
		return msg;
	}
}


Should it work?

Thanks!

--
View this message in context: http://activemq.2283324.n4.nabble.com/Failover-Transport-MessageListener-stops-receiving-messages-after-recovering-tp4533657p4533657.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message