qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gordon Sim <g...@redhat.com>
Subject Re: Python: How to consume one message?
Date Tue, 04 Mar 2008 08:56:15 GMT
Brent Villalobos wrote:
> This is my first time using QPID and I'm trying to write a simple 
> producer/consumer setup.  

Great; thanks for giving it a try! We'll be grateful for any feedback on 
what we can improve. Please feel free to keep asking questions and we'll 
do our best to get the answers to you.

> What am I doing wrong that all the messages in my queue are wiped out 
> after I consume one message?

The issue is that the broker is sending the client as many messages as 
available and because acknowledgements are turned off (no_ack=True), it 
assumes that delivery is always successful so dequeues those messages 
immediately.

The no_ack mode is really intended for cases where every client has 
their own temporary queue bound to an exchange (i.e. a transient pub sub 
model).

You probably need to turn acking on in your case so that messages are 
only dequeued when the application acknowledges their receipt:

reply = channel.basic_consume(queue="message_queue", no_ack=False)

or even just:

reply = channel.basic_consume(queue="message_queue")

as no_ack is False by default.

You then need to ack every message you receive:

channel.basic_ack(delivery_tag=msg.delivery_tag)

or to acknowledge all messages up to a particular message:

channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)

This will also make your application tolerant of client failures. 
Messages not yet acked will be redelivered once the client reconnects 
and consumes from the queue again.

You can also control the prefetching by sending basic_qos commands to 
the broker. This only has an effect when acking is turned on though. e.g.

channel.basic_qos(prefetch_count=1)# or whatever value seems appropriate

If you set it to 1 you have to acknowledge every message before another 
one is sent. This minimises redelivery in the event of a failure but 
will also slow things down.

A final option with 0-8 is to use basic_get instead of basic_consume 
(though in my view its a little more cumbersome). E.g.

reply = channel.basic_get(no_ack=False)
if (reply.method.name == "get_ok"):
    print "Consumer gets this message: ", msg.content.body
    channel.basic_ack(delivery_tag=reply.delivery_tag)
else:
    print "Queue was empty"


fyi: The 0-10 version of the protocol which the AMQP WG has recently 
voted through in its final form has a more powerful flow control 
mechanism and unifies the functionality of get and consume. Support for 
that is currently in progress on trunk (c++ broker and c++, python & 
java clients will be the first available, java broker following shortly 
after).




Mime
View raw message