activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Carl Allain (JIRA)" <j...@apache.org>
Subject [jira] Commented: (AMQ-2446) Client hangs on receive call with timeout value > 0 when activemq is shutdown (CTRL-C)
Date Fri, 09 Oct 2009 16:22:52 GMT

    [ https://issues.apache.org/activemq/browse/AMQ-2446?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=54701#action_54701
] 

Carl Allain commented on AMQ-2446:
----------------------------------

Good news! I wrote down the simplest test case I could come with:

import java.util.Properties;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;

public class TestActiveMQShutdown {

    /**
     * Start ActiveMQ on default port
     * Run the main program, it will make a receive with a timeout of 30 seconds
     * During that time, shutdown ActiveMQ.  
     * The system.out below will never happen and you will need to kill the java process
     */
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        // Please not the prefetch policy here
        props.put(Context.PROVIDER_URL, "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0");
        props.put("queue.test1", "test1.q");

        InitialContext initialContext = new javax.naming.InitialContext(props);

        ConnectionFactory connectionFactory = (ConnectionFactory) initialContext.lookup("ConnectionFactory");

        Connection connection = (Connection) connectionFactory.createConnection();

        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        Destination destination = (Destination) initialContext.lookup("test1");

        MessageConsumer messageConsumer = session.createConsumer(destination);

        messageConsumer.receive(30000);

        System.out.println("Will never get there if ActiveMQ is shutdown while we were in
the receive call above");

        // I don't care about closing all resources in that test and about the message received

    }

}

And with that, I can reproduce the problem in 5.2 (using activemq-all-5.2.0.jar) , but not
in 5.3 (using activemq-all-5.3.0.jar). I used jms-1.1.jar in both cases.

I must have made a mistake or something with my full application when I tried to modify. If
you guys can confirm that my test case works for you too, reproducing the problem in 5.2 and
seeing 5.3 fixes it, the we could all vote for 5.3 RC4!


> Client hangs on receive call with timeout value > 0 when activemq is shutdown (CTRL-C)
> --------------------------------------------------------------------------------------
>
>                 Key: AMQ-2446
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2446
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.2.0
>         Environment: Windows XP SP3, ActiveMQ 5.2, prefetchSize=0 (java.naming.provider.url
= tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0)
>            Reporter: Carl Allain
>         Attachments: AMQ-2446-Test.zip
>
>
> Start ActiveMQ
> create a message consumer and make a receive(30000) call for example.
> stop ActiveMQ (CTRL-C)
> There is an error reported on the console:
> 2009-10-08 19:23:54,978 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616] DEBUG org.apache.activemq.ActiveMQConnection
- Async exception with no exception listener: java.io.EOFException
> java.io.EOFException
> 	at java.io.DataInputStream.readInt(DataInputStream.java:358)
> 	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:269)
> 	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:210)
> 	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:202)
> 	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185)
> 	at java.lang.Thread.run(Thread.java:595)
> 2009-10-08 19:23:54,978 [ActiveMQ Connection Worker: tcp://localhost/127.0.0.1:61616]
DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616
> The thread making the receive call is waiting on a mutex that the ActiveMQ Connection
Worker thread that reported that log has (I guess):
> class MessageDispatchChannel
>     public MessageDispatch dequeue(long timeout) throws InterruptedException {
>         synchronized (mutex) {
>             // Wait until the consumer is ready to deliver messages.
>             while (timeout != 0 && !closed && (list.isEmpty() || !running))
{
>                 if (timeout == -1) {
>                     mutex.wait(); <-----------------------------------------------------------
>                 } else {
>                     mutex.wait(timeout);
>                     break;
>                 }
>             }
>             if (closed || !running || list.isEmpty()) {
>                 return null;
>             }
>             return list.removeFirst();
>         }
>     }
> What is also strange to me is that my timeout of 30000 ms is silently converted to an
infinite timeout if the preFetchSize == 0 by this code? Why?
> class ActiveMQMessageConsumer
> ...
>             MessageDispatch md;
>             if (info.getPrefetchSize() == 0) {
>                 md = dequeue(-1); // We let the broker let us know when we
>                 // timeout.
>             } else {
>                 md = dequeue(timeout);
>             }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message