activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From frbo <f...@imc.nl>
Subject onMessage() called by multiple threads - synchronization needed?
Date Fri, 09 Apr 2010 13:30:35 GMT

I'm sorry if this question has been answered before, but it is not clear to
me yet what the safe way is to update class fields from the 'onMessage()'
method of an asynchronous consumer.

I am using ActiveMQ 5.3.0 with the default settings (Java 1.6). The
following example code shows that 'onMessage()' is called from more than one
thread, but not at the same time. The code seems to work ok without any
additional synchronization: the 'receivedMessages' list contains all the
messages I'm sending to the queue. But maybe I'm just being lucky...

If I understand the Java memory model correctly, then you must use
synchronization or 'volatile' variables to make sure that changes made to a
field by one thread are always visible by the other threads. 
So I wonder: should I make use a 'synchronized' version for the collections
that are accessed in the 'onMessage' threads? 

Thanks!

-- Frank 

--------

import org.apache.activemq.*;

import javax.jms.*;
import javax.jms.Message;
import javax.jms.Queue;
import java.util.*;
import java.util.concurrent.locks.*;

public class ThreadSafeConsumer
{
    private static final String theURL =
"vm:(broker:(tcp://localhost:61616)?persistent=true)";

    // QUESTION: should I use a 'synchronized' version of these
collections???
    private final Set<Integer> theThreads = new HashSet<Integer>();
    private final List<String> theReceivedMessages = new
ArrayList<String>();

    private final Lock theLock = new ReentrantLock();

    public static void main(String[] anArgs) throws JMSException
    {
        new ThreadSafeConsumer().run();
    }

    public void run() throws JMSException
    {
        ConnectionFactory myFactory = new ActiveMQConnectionFactory(theURL);
        Connection myConnection = myFactory.createConnection();

        Session mySession = myConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);

        Queue myQueue = mySession.createQueue("MyTestInput");
        MessageConsumer myConsumer = mySession.createConsumer(myQueue);

        myConsumer.setMessageListener(new MessageListener()
        {
            @Override
            public void onMessage(Message message)
            {
                try
                {
                    handleIncomingMessage(message);
                }
                catch (JMSException e)
                {
                    e.printStackTrace();
                }
            }
        });

        myConnection.start();
    }

    private void handleIncomingMessage(Message aMessage) throws JMSException
    {
        if (theThreads.add(System.identityHashCode(Thread.currentThread()))
&& theThreads.size() > 1)
        {
            System.out.println("More than one thread is calling onMessage: "
+ theThreads);
        }

        if (!theLock.tryLock())
        {
            System.err.println("This would be really bad, but fortunately it
doesn't happen");
        }
        try
        {
            theReceivedMessages.add(((TextMessage)aMessage).getText());
        }
        finally
        {
            theLock.unlock();
        }
    }
}

-- 
View this message in context: http://old.nabble.com/onMessage%28%29-called-by-multiple-threads---synchronization-needed--tp28190843p28190843.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message