activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Parasoft Corporation (JIRA)" <j...@apache.org>
Subject [jira] Created: (AMQ-2648) Interrupting Consumer.close() thread puts queue into unusable state
Date Fri, 12 Mar 2010 18:01:44 GMT
Interrupting Consumer.close() thread puts queue into unusable state
-------------------------------------------------------------------

                 Key: AMQ-2648
                 URL: https://issues.apache.org/activemq/browse/AMQ-2648
             Project: ActiveMQ
          Issue Type: Bug
          Components: JMS client
    Affects Versions: 5.2.0
            Reporter: Parasoft Corporation


We have built a client program for sending and receiving JMS messages. Each send/receive operation
is performed in a thread, so that we can handle timeouts properly. However, if the thread
which is performing the receive() gets interrupted, the queue no longer responds to receive()
requests, even from another client with a separate JVM.

To reproduce, use two separate programs:
-----------[QueueSendReceiveActiveMQInterrupt.java]--------
import java.util.*;
import javax.jms.*;
import javax.naming.*;

public class QueueSendReceiveActiveMQInterrupt implements MessageListener {

    public static void main(String[] args) throws Exception {
        useConnectionFactory();
    }
    private static void useConnectionFactory() throws Exception, JMSException {
        ConnectionFactory factory = getConnectionFactoryUsingJNDI();
        Connection connect = null;
        Session session = null;
        connect = factory.createConnection(/*"Admin", "Admin"*/);
        session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination entryDest = session.createQueue("soatest.demo.queue");
        Destination exitDest = entryDest;
        MessageProducer producer = session.createProducer(entryDest);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        MessageConsumer consumer = session.createConsumer(exitDest);
        connect.start();
        TextMessage txtMessage = session.createTextMessage();
        txtMessage.setJMSReplyTo(exitDest);
        txtMessage.setText("Hello 1 from standalone program!");
        producer.send(txtMessage);
        System.out.println("message 1 sent");
        Message msg;
        
        // threaded receive with a kill
        ReceiverRunner runner = (new QueueSendReceiveActiveMQInterrupt()).new ReceiverRunner(consumer);
        Thread t = new Thread(runner);
        t.setDaemon(true);
        t.start();
        t.join(1000);
        t.interrupt();
        msg = runner.getMessage();
        if (msg != null) {
            System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
        } else {
            System.out.println("got no message 1");
        }
        
        producer.close();
//        consumer.close();
//        session.close();
//        connect.close();
    }
    private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception {
        Object ret = null;
        Properties props = new Properties();
        props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
        props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        InitialContext ictx = new javax.naming.InitialContext(props);
        Object obj = ictx.lookup("QueueConnectionFactory");
        if (obj instanceof Reference) {
            Reference ref = (Reference)obj;
            String className = ref.getClassName();
            System.out.println("Connection factory class name: " + className);
            Class cls = Class.forName(className);
            ret = cls.newInstance();
        } else {
            ret = obj;
        }
        ictx.close();
        return (ConnectionFactory)ret;
    }
    public void onMessage(Message msg) {
        if (msg != null) {
            try {
                System.out.println("msg = " + ((TextMessage)msg).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("got nothing");
        }
    }
    public class ReceiverRunner implements Runnable {
        private MessageConsumer consumer;
        private Message msg;
        
        public ReceiverRunner(MessageConsumer consumer) {
            this.consumer = consumer;
        }
        public void run() {
            try {
                msg = consumer.receive(500);
                // change the following to a very small amount like 500 and notice how everything
works
                consumer.receive(10000); // another receive just so it blocks and get the
thread to stop
            } catch (JMSException e) {
                e.printStackTrace();
            } finally {
                try {
                    consumer.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }
        public Message getMessage() {
            return msg;
        }
    }
}
--
-----------[QueueSendReceiveActiveMQ.java]--------
import java.util.*;
import javax.jms.*;
import javax.naming.*;

public class QueueSendReceiveActiveMQ implements MessageListener {

    public static void main(String[] args) throws Exception {
        useConnectionFactory();
    }
    private static void useConnectionFactory() throws Exception, JMSException {
        ConnectionFactory factory = getConnectionFactoryUsingJNDI();
        Connection connect = null;
        Session session = null;
        connect = factory.createConnection(/*"Admin", "Admin"*/);
        session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination entryDest = session.createQueue("soatest.demo.queue");
        Destination exitDest = entryDest;
        MessageProducer producer = session.createProducer(entryDest);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        MessageConsumer consumer = session.createConsumer(exitDest);
        connect.start();
        TextMessage txtMessage = session.createTextMessage();
        txtMessage.setJMSReplyTo(exitDest);
        txtMessage.setText("without thread interrupt: Hello 1 from standalone program!");
        producer.send(txtMessage);
        System.out.println("without thread interrupt: message 1 sent");
        Message msg;
        // regular receive
        msg = consumer.receive(2000);
        
        if (msg != null) {
            System.out.println("msg 1 received: " + ((TextMessage)msg).getText());
        } else {
            System.out.println("without thread interrupt: got no message 1");
        }
        
        producer.close();
        consumer.close();
        session.close();
        connect.close();
        
    }
    private static ConnectionFactory getConnectionFactoryUsingJNDI() throws Exception {
        Object ret = null;
        Properties props = new Properties();
        props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
        props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        InitialContext ictx = new javax.naming.InitialContext(props);
        Object obj = ictx.lookup("QueueConnectionFactory");
        if (obj instanceof Reference) {
            Reference ref = (Reference)obj;
            String className = ref.getClassName();
            System.out.println("Connection factory class name: " + className);
            Class cls = Class.forName(className);
            ret = cls.newInstance();
        } else {
            ret = obj;
        }
        ictx.close();
        return (ConnectionFactory)ret;
    }

    public void onMessage(Message msg) {
        if (msg != null) {
            try {
                System.out.println("msg = " + ((TextMessage)msg).getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("got nothing");
        }
    }
}
--

1) Run QueueSendReceiveActiveMQ alone, notice how it works in sending receiving messages from
the queue.
2) Run QueueSendReceiveActiveMQInterrupt will result in the program halting (due to some non-daemon
thread created by ActiveMQ), then while it is running run QueueSendReceiveActiveMQ and notice
how it fails to retrieve messages from the queue. If JMS Consumer.close() is excuted in a
thread that is interrupted, it fails and throws an exception and leaves the consumer in some
bad state.

Note that the same code does not exhibit this behavior when using other vendors' MQ solutions.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message