qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Venkat Rangan <ven...@clariussystems.com>
Subject Re: Using the same producer multiple times does not work...
Date Wed, 12 Dec 2012 02:21:39 GMT
Robbie,

Thanks for your prompt response. Closing the consumer did indeed fix the
second client consumer. Thanks again...

venkat

On Tue, Dec 11, 2012 at 3:11 PM, Robbie Gemmell <robbie.gemmell@gmail.com>wrote:

> Hi Venkat,
>
> It appears that you are not closing your consumers, so I would guess what
> you are seeing is the result of message prefetch.
>
> The consumer has a prefetch buffer for messages (defaults to 500 but is
> configurable, see
> http://qpid.apache.org/books/0.18/Programming-In-Apache-Qpid/html/ for
> details) such that the broker is able to send messages to the consumer so
> they can be available client side in advance of receive() being called,
> which increases performance. By leaving your consumer open, it is possible
> that the second test execution whicih promted the second message published
> onto the queue could actually be delivered to the first client by the
> broker, which would lead to the second consumer awaiting the arrival of a
> message in its [unbounded time limit] recieve() call whereas the first
> consumer could have had recieve() called on it and return the message
> immediately. Closing the consumer if you are not going to use it again will
> lead to prefetched messages becoming available for delivery to other
> clients if that is the case.
>
> Regarding 'committing the send instead of closing', that shouldnt be having
> any effect on whether your consumer actually recieves the message here, but
> it sounds like what you want to do is use transacted sessions for your
> producers which would be achieved by using the session creation code:
> connection.createSession(true,
> Session.SESSION_TRANSACTED), and then using session.commit() to complete
> the session transaction after each unit of work is performed. Note that
> this might mean you want to use a different sesion for your producer and
> consumer so that you can commit their actions independently of each other
> (unless you for example wanted to 'atomically' consumer a message and send
> a reply based on it, in which case you would want to use a single session).
>
> Also, you shouldnt use the AMQAnyDestination object in your code as it ties
> your code to an underlying client implementation object that is subject to
> change. You can use the JMS API to create your Destination objects, e.g
> calling session.createQueue(..) and supplying the address string there
> instead.
>
> Robbie
>
> On 11 December 2012 19:15, Venkat Rangan <venkat@clariussystems.com>
> wrote:
>
> > Hi,
> >
> >
> > I have a situation where I reuse the same destination queue for multiple
> > producers. What I am finding is that if I do this, the second instance
> > message is stuck and is not available for the MessageConsumer. Assume
> that
> > there is a connection that is started. I would have expected both test1()
> > and test2() below to succeed. What I observe is that the test2() is stuck
> > on the consumer.receive(). Also, is there a way to "commit" the send
> > without a producer.close() and would that make the second instance of the
> > test below also have the receive() pull out the message?
> >
> >
> > Thanks!
> >
> >
> >        public void test1() {
> >
> > Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE
> > );;
> >
> > Destination queue = new AMQAnyDestination("ADDR:testQueue; {create:
> > always}"
> > );
> >
> > MessageProducer producer = session.createProducer(queue);
> >
> > TextMessage msg = session.createTextMessage("hello, world-1");
> >
> > producer.send(msg);
> >
> > producer.close();
> >
> > MessageConsumer consumer = session.createConsumer(queue);
> >
> > TextMessage recvdMessage = (TextMessage)consumer.receive();
> >
> > System.out.println("Received: " + recvdMessage.getText());
> >
> > }
> >
> >
> >  public void test2() {
> >
> > Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE
> > );;
> >
> > Destination queue = new AMQAnyDestination("ADDR:testQueue; {create:
> > always}"
> > );
> >
> > MessageProducer producer = session.createProducer(queue);
> >
> > TextMessage msg = session.createTextMessage("hello, world-2");
> >
> > producer.send(msg);
> >
> > producer.close();
> >
> > MessageConsumer consumer = session.createConsumer(queue);
> >
> > TextMessage recvdMessage = (TextMessage)consumer.receive();
> >
> > System.out.println("Received: " + recvdMessage.getText());
> >
> > }
> >
> > Thanks!
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message