qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: Using the same producer multiple times does not work...
Date Tue, 11 Dec 2012 23:11:09 GMT
Hi Venkat,

It appears that you are not closing your consumers, so I would guess what
you are seeing is the result of message prefetch.

The consumer has a prefetch buffer for messages (defaults to 500 but is
configurable, see
http://qpid.apache.org/books/0.18/Programming-In-Apache-Qpid/html/ for
details) such that the broker is able to send messages to the consumer so
they can be available client side in advance of receive() being called,
which increases performance. By leaving your consumer open, it is possible
that the second test execution whicih promted the second message published
onto the queue could actually be delivered to the first client by the
broker, which would lead to the second consumer awaiting the arrival of a
message in its [unbounded time limit] recieve() call whereas the first
consumer could have had recieve() called on it and return the message
immediately. Closing the consumer if you are not going to use it again will
lead to prefetched messages becoming available for delivery to other
clients if that is the case.

Regarding 'committing the send instead of closing', that shouldnt be having
any effect on whether your consumer actually recieves the message here, but
it sounds like what you want to do is use transacted sessions for your
producers which would be achieved by using the session creation code:
connection.createSession(true,
Session.SESSION_TRANSACTED), and then using session.commit() to complete
the session transaction after each unit of work is performed. Note that
this might mean you want to use a different sesion for your producer and
consumer so that you can commit their actions independently of each other
(unless you for example wanted to 'atomically' consumer a message and send
a reply based on it, in which case you would want to use a single session).

Also, you shouldnt use the AMQAnyDestination object in your code as it ties
your code to an underlying client implementation object that is subject to
change. You can use the JMS API to create your Destination objects, e.g
calling session.createQueue(..) and supplying the address string there
instead.

Robbie

On 11 December 2012 19:15, Venkat Rangan <venkat@clariussystems.com> wrote:

> Hi,
>
>
> I have a situation where I reuse the same destination queue for multiple
> producers. What I am finding is that if I do this, the second instance
> message is stuck and is not available for the MessageConsumer. Assume that
> there is a connection that is started. I would have expected both test1()
> and test2() below to succeed. What I observe is that the test2() is stuck
> on the consumer.receive(). Also, is there a way to "commit" the send
> without a producer.close() and would that make the second instance of the
> test below also have the receive() pull out the message?
>
>
> Thanks!
>
>
>        public void test1() {
>
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE
> );;
>
> Destination queue = new AMQAnyDestination("ADDR:testQueue; {create:
> always}"
> );
>
> MessageProducer producer = session.createProducer(queue);
>
> TextMessage msg = session.createTextMessage("hello, world-1");
>
> producer.send(msg);
>
> producer.close();
>
> MessageConsumer consumer = session.createConsumer(queue);
>
> TextMessage recvdMessage = (TextMessage)consumer.receive();
>
> System.out.println("Received: " + recvdMessage.getText());
>
> }
>
>
>  public void test2() {
>
> Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE
> );;
>
> Destination queue = new AMQAnyDestination("ADDR:testQueue; {create:
> always}"
> );
>
> MessageProducer producer = session.createProducer(queue);
>
> TextMessage msg = session.createTextMessage("hello, world-2");
>
> producer.send(msg);
>
> producer.close();
>
> MessageConsumer consumer = session.createConsumer(queue);
>
> TextMessage recvdMessage = (TextMessage)consumer.receive();
>
> System.out.println("Received: " + recvdMessage.getText());
>
> }
>
> Thanks!
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message