qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rob Godfrey <rob.j.godf...@gmail.com>
Subject Re: [Java Broker] Message Grouping
Date Wed, 20 May 2015 19:32:01 GMT
On 20 May 2015 at 19:41, Olivier Mallassi <olivier.mallassi@gmail.com> wrote:
> Hello all
> a question regarding message grouping.
> In my case, I use .....x-declare: {arguments:
> {'qpid.group_header_key':'JMSXGroupID', 'qpid.shared_msg_group':0}}}}
> so, looking at the doc (
> https://qpid.apache.org/releases/qpid-0.32/java-broker/book/Java-Broker-Management-Managing-Queues.html
> )
>> *Enforce consumption ordering among messages belonging to the same group.
>> Consumption ordering means one of two things depending on how the queue has
>> been configured.In default mode, a group gets assigned to a single consumer
>> for the lifetime of that consumer, and the broker will pass all subsequent
>> messages in the group to that consumer.In 'shared groups' mode (which gives
>> the same behaviour as the Qpid C++ Broker) the broker enforces a looser
>> guarantee, namely that all the currently unacknowledged messages in a group
>> are sent to the same consumer, but the consumer used may change over time
>> even if the consumers do not. This means that only one consumer can be
>> processing messages from a particular group at any given time, however if
>> the consumer acknowledges all of its acquired messages then the broker may
>> pass the next pending message in that group to a different consumer.*
> I am currently using the default mode, to I have the guarantee that given a
> group value, as long as the consumer is there, message will be routed to
> the same consumer. Am I right?


> Has someone a use case for 'shared groups'?

So, as above, the "shared groups" is the way that the C++ Broker
implements message grouping... essentially the use case there is that
no two messages from the same group can be processed concurrently by
different consumers.  I have seen use cases for this in the past where
there wasn't actually any need to tie messages from a particular group
to a single consumer instance (no state was retained in the consumer)
but that it was required that messages from the same group were
processed sequentially - this type of message grouping provides this

> Looking at the code (If I understand well), it looks like we use this class
> DefinedGroupMessageGroupManager where there is a _groupMap HashMap
> attribute.
> Does it mean that if I have one million different groupId, I will end up
> with 1 million entries in that Map?  (which is not an issue but good to
> know)

So, IIRC, DefinedGroupMessageGroupManager is the "shared groups" case
- which does have an unbounded map.

For the non-shared group case it uses
AssignedConsumerMessageGroupManager which does bound the size of the
group -> consumer map using the queue property maximumDistinctGroups.
It doesn't look like you can set that if you create the queue via the
declare args in the client though - you'd have to either use the REST
API or manually create through the UI (which is really just anther way
of calling the REST API).  The default number of distinct groups is
255.  You can change this broker wide by setting the context value
queue.maximumDistinctGroups.  You can do this through the Management
UI, or you could just run with -Dqueue.maximumDistinctGroups=1023 or
whatever value you like (it rounds up to the next 2^x -1).

> Can someone help me finding the classes that dispatch the 'old groupId' to
> a new consumer when a consumer is closed?

So in the shared groups case the group is closed when there is no
message of the group currently being processed by a consumer... this
is managed by the subtract() method of the inner class Group in

In the non-shared group case

void unregisterConsumer(final QueueConsumerImpl consumer) in
AbstractQueue is called when the consumer is closed.  This then calls

resetSubPointersForGroups(consumer, true);

which in turn calls


which for the AssignedConsumerMessageGroupManager looks like this:

public void clearAssignments(QueueConsumer<?> sub)
    Iterator<QueueConsumer<?>> subIter = _groupMap.values().iterator();
        if(subIter.next() == sub)

which basically iterates over all the groups and if the are assigned
to that consumer removes that assignment.

Hope this helps,

> Regards.

To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org

View raw message