activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugeny N Dzhurinsky <b...@redwerk.com>
Subject CLIENT_ACKNOWLEDGE mode for a session misunderstanding?
Date Tue, 28 Oct 2008 16:28:45 GMT
Hello!

I am facing some strange issue with acknowledge mode in ActiveMQ. After
reading the specs, I realized the broker will never deliver a message to the
consumer unless the consumer acknowledges the previous message. So I thought
it is enough to not send the acknowledgement message to the server in the
onMessage method of the consumer and sent it later from another thread, which
does it's job. This means I spawn a thread when onMessage is called and forget
about it in this method.

However I've found this is not true. Below is my simple test case - I am
starting the single consumer and several producers.

===================================================================================================================
import javax.jms.Connection;

public class TestAcknowledgeMode {

    private static final String DATA = "data";

    private static final String SERVER_QUEUE = "server_queue";

    private static final String CONNECTION_URL = "vm://localhost?broker.persistent=false&jms.prefetchPolicy.all=1";

    private static final int CLIENT_THREADS = 5;

    static ActiveMQConnectionFactory factory;

    Semaphore semaphore = new Semaphore(CLIENT_THREADS, true);

    AtomicInteger successCount = new AtomicInteger(0);

    @BeforeClass
    public static void initFactory() throws Exception {
        factory = new ActiveMQConnectionFactory(CONNECTION_URL);
        BasicConfigurator.configure(new ConsoleAppender(new PatternLayout(
                "%5p %d{hh:mm:ss} [%t] (%C{1}:%L) - %m%n")));
        Logger.getRootLogger().setLevel(Level.DEBUG);
    }

    @Test(timeout = 10000)
    public void testAckDelivery() throws Exception {
        final Connection clientConnection = factory.createConnection();
        clientConnection.start();
        final Connection serverConnection = factory.createConnection();
        serverConnection.start();

        final Session serverSession = serverConnection.createSession(false,
                Session.CLIENT_ACKNOWLEDGE);
        final Queue serverQueue = serverSession.createQueue(SERVER_QUEUE);
        final MessageConsumer consumer = serverSession
                .createConsumer(serverQueue);
        consumer.setMessageListener(new MessageListener() {

            public void onMessage(final Message message) {
                System.out.println("Got message");
                try {
                    System.out.println(message.getStringProperty(DATA));
                    // message.acknowledge();
                    successCount.incrementAndGet();
                } catch (final Exception e) {
                    e.printStackTrace();
                }
            }

        });
        for (int i = 0; i < CLIENT_THREADS; i++) {
            final Thread t = new Thread(new Runnable() {

                public void run() {
                    try {
                        final Session clientSession = clientConnection
                                .createSession(false,
                                        Session.CLIENT_ACKNOWLEDGE);
                        final Queue clientQueue = clientSession
                                .createQueue(SERVER_QUEUE);
                        final MessageProducer producer = clientSession
                                .createProducer(clientQueue);
                        final Message msg = clientSession.createMessage();
                        msg.setStringProperty(DATA, Thread.currentThread()
                                .getName());
                        System.err.println("Sending data "
                                + Thread.currentThread().getName());
                        producer.send(msg);
                    } catch (final Exception e) {
                        e.printStackTrace();
                    }
                }
            });
            t.start();
        }
        for (;;) {
            if (successCount.intValue() == CLIENT_THREADS)
                break;
            Thread.sleep(1000);
        }
        Assert.assertEquals(CLIENT_THREADS, successCount.intValue());
    }
}
===================================================================================================================

I expected the only one message will be processed by the consumer, so the rest
messages will never be delivered to it, but this test shows the messages are
delivered to the consumer and onMessage method is called exactly CLIENT_THREADS times, 
which seems to be wrong.

Does it mean the onMessage method will be executed with no matter of
acknowledgement is sent back, as soon as previous execution finishes?

And what is the purpose of message.acknowledge() method?

Thank you in advance!

-- 
Eugene N Dzhurinsky

Mime
View raw message