activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arthur Naseef (JIRA)" <j...@apache.org>
Subject [jira] [Resolved] (AMQ-5107) In-flight queue message redelivered to multiple listeners upon broker shutdown
Date Thu, 27 Mar 2014 15:17:15 GMT

     [ https://issues.apache.org/jira/browse/AMQ-5107?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Arthur Naseef resolved AMQ-5107.
--------------------------------

       Resolution: Fixed
    Fix Version/s:     (was: NEEDS_REVIEW)
                   5.10.0

Fixed Queue handling to avoid re-dispatch of unacked messages on removal of a consumer when
the broker is shutting down.

Commit: https://git-wip-us.apache.org/repos/asf?p=activemq.git;a=commit;h=29f34f4dab68ceb3138a6194617fa8f13f4d3875

Includes a unit test.  There is a race condition in the tests which appears to be in between
removal of all the queue's consumers and the resend logic on the Queue which causes false
positives (i.e. failure to detect a problem), so the test runs up to 3 iterations in an attempt
to increase reliability.

> In-flight queue message redelivered to multiple listeners upon broker shutdown
> ------------------------------------------------------------------------------
>
>                 Key: AMQ-5107
>                 URL: https://issues.apache.org/jira/browse/AMQ-5107
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Transport
>    Affects Versions: 5.9.0
>         Environment: Windows 7 64Bit - Java "1.6.0_20"
> CentOS 6.0 - Java "1.7.0_09-icedtea" 
>            Reporter: Greg Garlak
>            Assignee: Arthur Naseef
>             Fix For: 5.10.0
>
>
> To reproduce: 
> 1) Start 3 or more listener processes (see listener code below)
> 2) Run producer to push one message on queue (see producer code below)
> 3) One of the listeners will pick-up the message and sleep for one minute before auto
acknowledging the message
> 4) Start a shutdown sequence of the broker within the 60 second window (Ctrl-C or issue
Terminate jvm(int) command from Hawtio console) 
> 5) All other idle listeners should get the same message redelivered simultaneously, each
one having deliveryCount incremented 
> Listener code:
> --------------
> package com.test;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestListener {
> 	public static void main(String[] args) {
> 		try {	
> 			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
> 			Connection connection = connectionFactory.createConnection();
> 			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 			Destination destination = session.createQueue("TEST.QUEUE");
> 			MessageConsumer consumer = session.createConsumer(destination);
> 			
> 			consumer.setMessageListener(new MessageListener() {
> 				public void onMessage(Message message) {
> 					try	{
> 						TextMessage textMessage = (TextMessage) message;
> 						System.out.print("\nReceived " + textMessage.getText());
> 						System.out.print(", Redelivery: " + message.getJMSRedelivered());
> 						System.out.print(", Count: " + message.getLongProperty("JMSXDeliveryCount"));
> 						Thread.sleep(60000);			
> 						System.out.print("... finished after sleep");
> 					} catch (Exception e) {
> 						e.printStackTrace();
> 					}
> 				}
> 			});
> 			
> 			connection.start();
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
> 	public TestListener() {
> 		super();
> 	}
> }
> Producer code:
> --------------
> package com.test;
> import java.util.Date;
> import javax.jms.Connection;
> import javax.jms.Destination;
> import javax.jms.MessageProducer;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import org.apache.activemq.ActiveMQConnectionFactory;
> public class TestProducer {
> 	public static void main(String[] args) {
> 		try {
> 			thread(new HelloWorldProducer(), false);
> 		} catch (Exception e) {
> 			e.printStackTrace();
> 		}
> 	}
>  
> 	public static class HelloWorldProducer implements Runnable {
> 		public void run() {
> 			try {
> 				ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
> 				Connection connection = connectionFactory.createConnection();
> 				connection.start();
> 				Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> 				Destination destination = session.createQueue("TEST.QUEUE");
> 				MessageProducer producer = session.createProducer(destination);
> 				String text = "test message created on " + new Date();
> 				TextMessage message = session.createTextMessage(text);
> 				System.out.println("Sent " + text);
> 				producer.send(message);
> 				session.close();
> 				connection.close();
> 			}
> 			catch (Exception e) {
> 				e.printStackTrace();
> 			}
> 		}
> 		public HelloWorldProducer() {}
> 	}
> 	public static void thread(Runnable runnable, boolean daemon) {
> 		Thread brokerThread = new Thread(runnable);
> 		brokerThread.setDaemon(daemon);
> 		brokerThread.start();
> 	}
>     
> 	public TestProducer() {
> 		super();
> 	}
> }



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message