Return-Path: X-Original-To: apmail-activemq-dev-archive@www.apache.org Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DDDEE10E9E for ; Wed, 19 Mar 2014 02:49:46 +0000 (UTC) Received: (qmail 93442 invoked by uid 500); 19 Mar 2014 02:49:44 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 93140 invoked by uid 500); 19 Mar 2014 02:49:44 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 93101 invoked by uid 99); 19 Mar 2014 02:49:43 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Mar 2014 02:49:42 +0000 Date: Wed, 19 Mar 2014 02:49:42 +0000 (UTC) From: "Arthur Naseef (JIRA)" To: dev@activemq.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (AMQ-5107) In-flight queue message redelivered to multiple listeners upon broker shutdown MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/AMQ-5107?page=3Dcom.atlassian.j= ira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D139400= 96#comment-13940096 ]=20 Arthur Naseef commented on AMQ-5107: ------------------------------------ Looking at this more closely, I'm leaning toward this being a non-problem. The JMS specification states this for AUTO_ACKNOWLEDGE: {quote} AUTO_ACKNOWLEDGE - With this option, the session automatically acknowledges= a client=E2=80=99s receipt of a message when it has either successfully re= turned from a call to receive or the MessageListener it has called to proce= ss the message successfully returns. {quote} This means that MessageListener *must* be prepared to handle duplicates com= ing into the {{onMessage}} call because (a) the message does not get acknow= ledged until the handler returns, and (b) there's always the chance that cl= ient application will fail between the {{onMessage}} call and the message A= CK (leading to redeliveries). Really, all client apps should be prepared to handle duplicates, although t= ransacted applications are best positioned to avoid them. While it may be possible to improve this one case, such a change is not tri= vial, and such a change would not prevent the same condition from requiring= handling in the client. > In-flight queue message redelivered to multiple listeners upon broker shu= tdown > -------------------------------------------------------------------------= ----- > > Key: AMQ-5107 > URL: https://issues.apache.org/jira/browse/AMQ-5107 > Project: ActiveMQ > Issue Type: Bug > Components: Transport > Affects Versions: 5.9.0 > Environment: Windows 7 64Bit - Java "1.6.0_20" > CentOS 6.0 - Java "1.7.0_09-icedtea"=20 > Reporter: Greg Garlak > Assignee: Arthur Naseef > Fix For: NEEDS_REVIEW > > > To reproduce:=20 > 1) Start 3 or more listener processes (see listener code below) > 2) Run producer to push one message on queue (see producer code below) > 3) One of the listeners will pick-up the message and sleep for one minute= before auto acknowledging the message > 4) Start a shutdown sequence of the broker within the 60 second window (C= trl-C or issue Terminate jvm(int) command from Hawtio console)=20 > 5) All other idle listeners should get the same message redelivered simul= taneously, each one having deliveryCount incremented=20 > Listener code: > -------------- > package com.test; > import javax.jms.Connection; > import javax.jms.Destination; > import javax.jms.Message; > import javax.jms.MessageConsumer; > import javax.jms.MessageListener; > import javax.jms.Session; > import javax.jms.TextMessage; > import org.apache.activemq.ActiveMQConnectionFactory; > public class TestListener { > =09public static void main(String[] args) { > =09=09try {=09 > =09=09=09ActiveMQConnectionFactory connectionFactory =3D new ActiveMQConn= ectionFactory("tcp://localhost:61616"); > =09=09=09Connection connection =3D connectionFactory.createConnection(); > =09=09=09Session session =3D connection.createSession(false, Session.AUTO= _ACKNOWLEDGE); > =09=09=09Destination destination =3D session.createQueue("TEST.QUEUE"); > =09=09=09MessageConsumer consumer =3D session.createConsumer(destination)= ; > =09=09=09 > =09=09=09consumer.setMessageListener(new MessageListener() { > =09=09=09=09public void onMessage(Message message) { > =09=09=09=09=09try=09{ > =09=09=09=09=09=09TextMessage textMessage =3D (TextMessage) message; > =09=09=09=09=09=09System.out.print("\nReceived " + textMessage.getText())= ; > =09=09=09=09=09=09System.out.print(", Redelivery: " + message.getJMSRedel= ivered()); > =09=09=09=09=09=09System.out.print(", Count: " + message.getLongProperty(= "JMSXDeliveryCount")); > =09=09=09=09=09=09Thread.sleep(60000);=09=09=09 > =09=09=09=09=09=09System.out.print("... finished after sleep"); > =09=09=09=09=09} catch (Exception e) { > =09=09=09=09=09=09e.printStackTrace(); > =09=09=09=09=09} > =09=09=09=09} > =09=09=09}); > =09=09=09 > =09=09=09connection.start(); > =09=09} catch (Exception e) { > =09=09=09e.printStackTrace(); > =09=09} > =09} > =09public TestListener() { > =09=09super(); > =09} > } > Producer code: > -------------- > package com.test; > import java.util.Date; > import javax.jms.Connection; > import javax.jms.Destination; > import javax.jms.MessageProducer; > import javax.jms.Session; > import javax.jms.TextMessage; > import org.apache.activemq.ActiveMQConnectionFactory; > public class TestProducer { > =09public static void main(String[] args) { > =09=09try { > =09=09=09thread(new HelloWorldProducer(), false); > =09=09} catch (Exception e) { > =09=09=09e.printStackTrace(); > =09=09} > =09} > =20 > =09public static class HelloWorldProducer implements Runnable { > =09=09public void run() { > =09=09=09try { > =09=09=09=09ActiveMQConnectionFactory connectionFactory =3D new ActiveMQC= onnectionFactory("tcp://localhost:61616"); > =09=09=09=09Connection connection =3D connectionFactory.createConnection(= ); > =09=09=09=09connection.start(); > =09=09=09=09Session session =3D connection.createSession(false, Session.A= UTO_ACKNOWLEDGE); > =09=09=09=09Destination destination =3D session.createQueue("TEST.QUEUE")= ; > =09=09=09=09MessageProducer producer =3D session.createProducer(destinati= on); > =09=09=09=09String text =3D "test message created on " + new Date(); > =09=09=09=09TextMessage message =3D session.createTextMessage(text); > =09=09=09=09System.out.println("Sent " + text); > =09=09=09=09producer.send(message); > =09=09=09=09session.close(); > =09=09=09=09connection.close(); > =09=09=09} > =09=09=09catch (Exception e) { > =09=09=09=09e.printStackTrace(); > =09=09=09} > =09=09} > =09=09public HelloWorldProducer() {} > =09} > =09public static void thread(Runnable runnable, boolean daemon) { > =09=09Thread brokerThread =3D new Thread(runnable); > =09=09brokerThread.setDaemon(daemon); > =09=09brokerThread.start(); > =09} > =20 > =09public TestProducer() { > =09=09super(); > =09} > } -- This message was sent by Atlassian JIRA (v6.2#6252)