qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robbie Gemmell <robbie.gemm...@gmail.com>
Subject Re: Using the same producer multiple times does not work...
Date Wed, 12 Dec 2012 10:07:17 GMT
Good to hear. I should also have mentioned however that you should really
be closing the session too (which will implicitly close the open
consumers/producers) when you are done with it, otherwise they will leak
and you will eventually run out of available sessions.


On 12 December 2012 02:21, Venkat Rangan <venkat@clariussystems.com> wrote:

> Robbie,
>
> Thanks for your prompt response. Closing the consumer did indeed fix the
> second client consumer. Thanks again...
>
> venkat
>
> On Tue, Dec 11, 2012 at 3:11 PM, Robbie Gemmell <robbie.gemmell@gmail.com
> >wrote:
>
> > Hi Venkat,
> >
> > It appears that you are not closing your consumers, so I would guess what
> > you are seeing is the result of message prefetch.
> >
> > The consumer has a prefetch buffer for messages (defaults to 500 but is
> > configurable, see
> > http://qpid.apache.org/books/0.18/Programming-In-Apache-Qpid/html/ for
> > details) such that the broker is able to send messages to the consumer so
> > they can be available client side in advance of receive() being called,
> > which increases performance. By leaving your consumer open, it is
> possible
> > that the second test execution whicih promted the second message
> published
> > onto the queue could actually be delivered to the first client by the
> > broker, which would lead to the second consumer awaiting the arrival of a
> > message in its [unbounded time limit] recieve() call whereas the first
> > consumer could have had recieve() called on it and return the message
> > immediately. Closing the consumer if you are not going to use it again
> will
> > lead to prefetched messages becoming available for delivery to other
> > clients if that is the case.
> >
> > Regarding 'committing the send instead of closing', that shouldnt be
> having
> > any effect on whether your consumer actually recieves the message here,
> but
> > it sounds like what you want to do is use transacted sessions for your
> > producers which would be achieved by using the session creation code:
> > connection.createSession(true,
> > Session.SESSION_TRANSACTED), and then using session.commit() to complete
> > the session transaction after each unit of work is performed. Note that
> > this might mean you want to use a different sesion for your producer and
> > consumer so that you can commit their actions independently of each other
> > (unless you for example wanted to 'atomically' consumer a message and
> send
> > a reply based on it, in which case you would want to use a single
> session).
> >
> > Also, you shouldnt use the AMQAnyDestination object in your code as it
> ties
> > your code to an underlying client implementation object that is subject
> to
> > change. You can use the JMS API to create your Destination objects, e.g
> > calling session.createQueue(..) and supplying the address string there
> > instead.
> >
> > Robbie
> >
> > On 11 December 2012 19:15, Venkat Rangan <venkat@clariussystems.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > I have a situation where I reuse the same destination queue for
> multiple
> > > producers. What I am finding is that if I do this, the second instance
> > > message is stuck and is not available for the MessageConsumer. Assume
> > that
> > > there is a connection that is started. I would have expected both
> test1()
> > > and test2() below to succeed. What I observe is that the test2() is
> stuck
> > > on the consumer.receive(). Also, is there a way to "commit" the send
> > > without a producer.close() and would that make the second instance of
> the
> > > test below also have the receive() pull out the message?
> > >
> > >
> > > Thanks!
> > >
> > >
> > >        public void test1() {
> > >
> > > Session session = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE
> > > );;
> > >
> > > Destination queue = new AMQAnyDestination("ADDR:testQueue; {create:
> > > always}"
> > > );
> > >
> > > MessageProducer producer = session.createProducer(queue);
> > >
> > > TextMessage msg = session.createTextMessage("hello, world-1");
> > >
> > > producer.send(msg);
> > >
> > > producer.close();
> > >
> > > MessageConsumer consumer = session.createConsumer(queue);
> > >
> > > TextMessage recvdMessage = (TextMessage)consumer.receive();
> > >
> > > System.out.println("Received: " + recvdMessage.getText());
> > >
> > > }
> > >
> > >
> > >  public void test2() {
> > >
> > > Session session = connection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE
> > > );;
> > >
> > > Destination queue = new AMQAnyDestination("ADDR:testQueue; {create:
> > > always}"
> > > );
> > >
> > > MessageProducer producer = session.createProducer(queue);
> > >
> > > TextMessage msg = session.createTextMessage("hello, world-2");
> > >
> > > producer.send(msg);
> > >
> > > producer.close();
> > >
> > > MessageConsumer consumer = session.createConsumer(queue);
> > >
> > > TextMessage recvdMessage = (TextMessage)consumer.receive();
> > >
> > > System.out.println("Received: " + recvdMessage.getText());
> > >
> > > }
> > >
> > > Thanks!
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message