activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: Concurrent use of connections
Date Tue, 14 Oct 2008 08:56:14 GMT
Please see the FAQ entry...
http://activemq.apache.org/how-do-i-use-jms-efficiently.html

you are not meant to create a consumer or producer for each message;
but reuse the same consumer

If you are new to JMS and have not yet grokked how to use it
efficiently, try using the JmsTemplate / MessageListenerContainer
abstractions in Spring along with the JMS pool in ActiveMQ
http://activemq.apache.org/jmstemplate-gotchas.html

Your problem could well be producer flow control kicking in BTW - but
see how things behave when you use JMS correctly

2008/10/14 Steven Van Loon <svanloon@invenso.com>:
> Hi,
>
> I managed to reproduce the problem in the simple sample below. What I do is to create
a connection that is shared between a consuming thread and a producing thread. The consumer
is slower than the producer. When I run this sample, either the consumer stops consuming although
lots of messages are available, or the producer stops producing.
>
> Anybody any ideas what's wrong or what I am doing wrong?
>
>
>
> package test.jms;
>
> import java.util.Hashtable;
>
> import javax.jms.JMSException;
> import javax.jms.Message;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageProducer;
> import javax.jms.Queue;
> import javax.jms.QueueConnection;
> import javax.jms.QueueConnectionFactory;
> import javax.jms.QueueSession;
> import javax.jms.Session;
> import javax.jms.TextMessage;
> import javax.naming.InitialContext;
> import javax.naming.NamingException;
>
> public class ConsumingProducer {
>
>        public final static String INITIAL_CONTEXT_FACTORY = "org.apache.activemq.jndi.ActiveMQInitialContextFactory";
>        public final static String PROVIDER_URL = "tcp://localhost:61616";
>
>        public final static String CONNECTION_FACTORY_NAME = "ConnectionFactory";
>        public final static String DESTINATION_NAME = "TEST.QUEUE";
>
>        public final static long WRITE_DELAY = 10;
>        public final static long READ_DELAY = 1000;
>
>        public static QueueConnectionFactory connectionFactory;
>        public static QueueConnection connection;
>
>        public static void initialize() throws NamingException, JMSException {
>                Hashtable<String, String> env = new Hashtable<String, String>();
>                env
>                                .put(InitialContext.INITIAL_CONTEXT_FACTORY,
>                                                INITIAL_CONTEXT_FACTORY);
>                env.put(InitialContext.PROVIDER_URL, PROVIDER_URL);
>                InitialContext initialContext = new InitialContext(env);
>
>                connectionFactory = (QueueConnectionFactory) initialContext
>                                .lookup(CONNECTION_FACTORY_NAME);
>
>                connection = connectionFactory.createQueueConnection();
>                connection.start();
>        }
>
>        public static void startConsumer() {
>
>                Thread t = new Thread() {
>                        public void run() {
>                                try {
>                                        QueueSession session = connection.createQueueSession(false,
>                                                        Session.AUTO_ACKNOWLEDGE);
>                                        Queue queue = session.createQueue(DESTINATION_NAME);
>                                        while (true) {
>                                                MessageConsumer consumer = session
>                                                                .createConsumer(queue);
>                                                Message msg = consumer.receive();
>                                                if (msg instanceof TextMessage) {
>                                                        System.out.println(((TextMessage)
msg).getText());
>                                                } else if (msg != null) {
>                                                        System.out.println("Message received");
>                                                }
>                                                consumer.close();
>                                                sleep(READ_DELAY);
>                                        }
>                                } catch (Exception ex) {
>                                        ex.printStackTrace();
>                                }
>                        }
>                };
>                t.setName("Consumer");
>                t.start();
>        }
>
>        public static void startProducer() {
>
>                Thread t = new Thread() {
>                        public void run() {
>                                try {
>                                        QueueSession session = connection.createQueueSession(false,
>                                                        Session.AUTO_ACKNOWLEDGE);
>                                        Queue queue = session.createQueue(DESTINATION_NAME);
>                                        int msgctr = 1;
>                                        while (true) {
>                                                MessageProducer producer = session
>                                                                .createProducer(queue);
>                                                Message msg = session.createTextMessage("Message
"
>                                                                + msgctr++);
>                                                producer.send(msg);
>                                                producer.close();
>
>                                                sleep(WRITE_DELAY);
>                                        }
>                                } catch (Exception ex) {
>                                        ex.printStackTrace();
>                                }
>                        }
>                };
>                t.setName("Producer");
>                t.start();
>        }
>
>        public static void main(String[] args) {
>                try {
>                        initialize();
>                        startConsumer();
>                        startProducer();
>                } catch (Exception ex) {
>                        ex.printStackTrace();
>                }
>        }
> }
>
>
>
>
> -----Original Message-----
> From: Joe Fernandez [mailto:joe.fernandez@ttmsolutions.com]
> Sent: maandag 13 oktober 2008 13:06
> To: users@activemq.apache.org
> Subject: Re: Concurrent use of connections
>
>
> Sessions are single-threaded. So could the issue be related more to your use
> of sessions?
>
> Joe
> Get a free ActiveMQ user guide @ http://www.ttmsolutions.com
>
>
> Steven Van Loon-2 wrote:
>>
>> Hi all,
>>
>> Is anybody aware of possible problems when (re)using a same Connection to
>> activeMQ by different threads? According to the JMS specification,
>> implementations should support concurrent use of Connections, so I created
>> one connection to be used by all consumer / producers.
>>
>> However, I experienced the problem that from a certain moment, no more
>> messages were read from the queue although a lot of messages were still
>> waiting. When another programs connects to the queue and reads a message,
>> it can still fetch messages. We were able to track down the problem to
>> activeMQ since using another queuing system solved the problem, it kept
>> reading all the messages, no matter how many were waiting on the queue.
>> Eventually, making a connection for each thread solved the problem also
>> for activeMQ. Hence my question.
>>
>> My apologies I can provide you only with a vague description of the
>> problem and no test case to reproduce the problem but I'm wondering
>> whether anybody else experienced similar problems and can provide me with
>> more insight in the problem.
>>
>> Thanks!
>> Steven.
>>
>>
>>
>
> --
> View this message in context: http://www.nabble.com/Concurrent-use-of-connections-tp19952062p19952819.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>



-- 
James
-------
http://macstrac.blogspot.com/

Open Source Integration
http://open.iona.com

Mime
View raw message