qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Helen Kwong <helenkw...@gmail.com>
Subject Limiting the number of concurrent consumers across multiple queues
Date Thu, 16 Jan 2014 19:20:20 GMT
Hi Qpid users / experts,

I need to limit the number of consumers concurrently processing messages
considered to be in the same group, across multiple queues, and was
wondering if anyone has ideas about how to do it. We’re using the Java
broker and client, and have multiple queues, each with multiple listeners,
each listener’s session listening to multiple queues. Some messages are
associated with groups, and for a given group we want at most K listeners
processing messages from the group at any given time. The messages are
enqueued to multiple queues, and it’s possible for messages from the same
group to be in different queues.

If messages in the same group can go into only one queue, then the message
groups feature will give us what we need (it’d work directly with K = 1 and
with K > 1 we can tweak the grouping value, e.g., hash it to one of 1 to K
and append the number to the grouping value). But since messages considered
to be in the same group can be in different queues, the feature is not
enough for our case.

Since it looks like the broker side doesn’t have what we need exactly,
we’re thinking about how to do this from the client side. We’re thinking
along the lines of having some semaphore object per group, shared between
the different listeners, and whenever a listener receives a message, it
will try to acquire a permit from the semaphore for that group. If it’s
able to acquire a permit, then process the message and release the permit
upon completion. If it’s not able to acquire a permit, reenqueue the
message in some way. For example:

1) Reenqueue the message back to the same queue so it can be retried right
away. But this would lead to a lot of churning when permits are not
available for a while, so we’ve ruled this out.

2) Same as #1, but sleep for a short while first so we wouldn’t have the
high churning. But since each listener’s session is responsible for
multiple queues, this can decrease the throughput of other queues.

3) Enqueue the message to a special queue that stores messages waiting for
a permit, a queue that is not listened to by anyone. A periodic sweeper job
will wake up once in a while, say every minute, and pulls all the messages
off of the waiting queue and reenqueues them to their respective original
queues. But throughput would be limited by sweeper interval.

4) Like #3, but don’t use a periodic sweeper. Instead, when a listener that
was able to acquire a permit is done with a message, look up the next
waiting message of the same group in the waiting queue using a JMS
selector, and reenqueue it back to the original queue. But look up
performance might be bad if queue depth is high.

Each of these has some drawbacks. Does anyone have ideas about other
possible approaches (maybe entirely different from the above), or has done
something similar?


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message