activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gary Tully (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (AMQ-2648) Interrupting Consumer.close() thread puts queue into unusable state
Date Fri, 01 Apr 2011 11:22:12 GMT

     [ https://issues.apache.org/jira/browse/AMQ-2648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Gary Tully updated AMQ-2648:
----------------------------

    Fix Version/s:     (was: 5.5.0)
                   5.6.0

> Interrupting Consumer.close() thread puts queue into unusable state
> -------------------------------------------------------------------
>
>                 Key: AMQ-2648
>                 URL: https://issues.apache.org/jira/browse/AMQ-2648
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JMS client
>    Affects Versions: 5.2.0
>            Reporter: Parasoft Corporation
>             Fix For: 5.6.0
>
>
> We have built a client program for sending and receiving JMS messages. Each send/receive
operation is performed in a thread, so that we can handle timeouts properly. However, if the
thread which is performing the receive() gets interrupted, the queue no longer responds to
receive() requests, even from another client with a separate JVM.
> To reproduce, use two separate programs:
> -----------[QueueSendReceiveActiveMQInterrupt.java]--------
> import java.util.*;
> import javax.jms.*;
> import javax.naming.*;
> public class QueueSendReceiveActiveMQInterrupt implements MessageListener {
>     public static void main(String[] args) throws Exception {
>         useConnectionFactory();
>     }
>     private static void useConnectionFactory() throws Exception, JMSException {
>         ConnectionFactory factory = getConnectionFactoryUsingJNDI();
>         Connection connect = null;
>         Session session = null;
>         connect = factory.createConnection(/*"Admin", "Admin"*/);
>         session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Destination entryDest = session.createQueue("soatest.demo.queue");
>         Destination exitDest = entryDest;
>         MessageProducer producer = session.createProducer(entryDest);
>         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>         MessageConsumer consumer = session.createConsumer(exitDest);
>         connect.start();
>         TextMessage txtMessage = session.createTextMessage();
>         txtMessage.setJMSReplyTo(exitDest);
>         txtMessage.setText("Hello 1 from standalone program!");
>         producer.send(txtMessage);
>         System.out.println("message 1 sent");
>         Message msg;
>         
>         // threaded receive with a kill
>         ReceiverRunner runner = (new QueueSendReceiveActiveMQInterrupt()).new ReceiverRunner(consumer);
>         Thread t = new Thread(runner);
>         t.setDaemon(true);
>         t.start();
>         t.join(1000);
>         t.interrupt();
>         msg = runner.getMessage();
>         if (msg != null) {
>             System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
>         } else {
>             System.out.println("got no message 1");
>         }
>         
>         producer.close();
> //        consumer.close();
> //        session.close();
> //        connect.close();
>     }
>     private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception
{
>         Object ret = null;
>         Properties props = new Properties();
>         props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
>         props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>         InitialContext ictx = new javax.naming.InitialContext(props);
>         Object obj = ictx.lookup("QueueConnectionFactory");
>         if (obj instanceof Reference) {
>             Reference ref = (Reference)obj;
>             String className = ref.getClassName();
>             System.out.println("Connection factory class name: " + className);
>             Class cls = Class.forName(className);
>             ret = cls.newInstance();
>         } else {
>             ret = obj;
>         }
>         ictx.close();
>         return (ConnectionFactory)ret;
>     }
>     public void onMessage(Message msg) {
>         if (msg != null) {
>             try {
>                 System.out.println("msg = " + ((TextMessage)msg).getText());
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             }
>         } else {
>             System.out.println("got nothing");
>         }
>     }
>     public class ReceiverRunner implements Runnable {
>         private MessageConsumer consumer;
>         private Message msg;
>         
>         public ReceiverRunner(MessageConsumer consumer) {
>             this.consumer = consumer;
>         }
>         public void run() {
>             try {
>                 msg = consumer.receive(500);
>                 // change the following to a very small amount like 500 and notice how
everything works
>                 consumer.receive(10000); // another receive just so it blocks and get
the thread to stop
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             } finally {
>                 try {
>                     consumer.close();
>                 } catch (JMSException e) {
>                     e.printStackTrace();
>                 }
>             }
>         }
>         public Message getMessage() {
>             return msg;
>         }
>     }
> }
> --
> -----------[QueueSendReceiveActiveMQ.java]--------
> import java.util.*;
> import javax.jms.*;
> import javax.naming.*;
> public class QueueSendReceiveActiveMQ implements MessageListener {
>     public static void main(String[] args) throws Exception {
>         useConnectionFactory();
>     }
>     private static void useConnectionFactory() throws Exception, JMSException {
>         ConnectionFactory factory = getConnectionFactoryUsingJNDI();
>         Connection connect = null;
>         Session session = null;
>         connect = factory.createConnection(/*"Admin", "Admin"*/);
>         session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Destination entryDest = session.createQueue("soatest.demo.queue");
>         Destination exitDest = entryDest;
>         MessageProducer producer = session.createProducer(entryDest);
>         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
>         MessageConsumer consumer = session.createConsumer(exitDest);
>         connect.start();
>         TextMessage txtMessage = session.createTextMessage();
>         txtMessage.setJMSReplyTo(exitDest);
>         txtMessage.setText("without thread interrupt: Hello 1 from standalone program!");
>         producer.send(txtMessage);
>         System.out.println("without thread interrupt: message 1 sent");
>         Message msg;
>         // regular receive
>         msg = consumer.receive(2000);
>         
>         if (msg != null) {
>             System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
>         } else {
>             System.out.println("without thread interrupt: got no message 1");
>         }
>         
>         producer.close();
>         consumer.close();
>         session.close();
>         connect.close();
>         
>     }
>     private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception
{
>         Object ret = null;
>         Properties props = new Properties();
>         props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
>         props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
>         InitialContext ictx = new javax.naming.InitialContext(props);
>         Object obj = ictx.lookup("QueueConnectionFactory");
>         if (obj instanceof Reference) {
>             Reference ref = (Reference)obj;
>             String className = ref.getClassName();
>             System.out.println("Connection factory class name: " + className);
>             Class cls = Class.forName(className);
>             ret = cls.newInstance();
>         } else {
>             ret = obj;
>         }
>         ictx.close();
>         return (ConnectionFactory)ret;
>     }
>     public void onMessage(Message msg) {
>         if (msg != null) {
>             try {
>                 System.out.println("msg = " + ((TextMessage)msg).getText());
>             } catch (JMSException e) {
>                 e.printStackTrace();
>             }
>         } else {
>             System.out.println("got nothing");
>         }
>     }
> }
> --
> 1) Run QueueSendReceiveActiveMQ alone, notice how it works in sending receiving messages
from the queue.
> 2) Run QueueSendReceiveActiveMQInterrupt will result in the program halting (due to some
non-daemon thread created by ActiveMQ), then while it is running run QueueSendReceiveActiveMQ
and notice how it fails to retrieve messages from the queue. If JMS Consumer.close() is excuted
in a thread that is interrupted, it fails and throws an exception and leaves the consumer
in some bad state.
> Note that the same code does not exhibit this behavior when using other vendors' MQ solutions.

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message