activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From danbucatanschi <>
Subject Dynamic management of composite destinations
Date Mon, 20 Apr 2009 18:58:57 GMT

Hi everyone!

I have been searching for a solution for this problem both here and in the
user forum and it seems that nobody had such an issue/request before, so
here it goes:

I really like the idea of composite destinations with filteredDestinations.
I really like the part where a producer can produce on a queue such as
"Producer.Queue" and I can set up 2 other queues, let's say
"Consumer.1.Producer.Queue" and "Consumer.2.Producer.Queue", where the
messages get forwarded based on a different selector for each queue. Thus I
can have Consumer1 read from the first queue and Consumer2 read from the
second queue and each get only a filtered stream of the original messages
from the producer. Since the filtering happens inside the Broker, the
consumers can also user their own selectors to further filter their own
messages on their own queues, thus maybe setting up 2 consumers per queue
that can process different kind of messages in parallel.

Now let me tell you what I like about virtual topics: I really like the idea
of dynamically creating queues based on the queue name (i.e. I like the
namespaces). As you can see above I used intentionally the names of the
forwardTo queues within the same namespace as the original Producer queue. I
also like that the destinations (the queues) are created on the fly
depending on some naming convention. For example I could designate
"Producer.Topic" as a virtual topic with the queues where the messages are
forwarded having the prefix as "Consumer.*.". Thus I can use 2 consumers now
to read the messages produced by the producer using the queues
"Consumer.1.Producer.Topic" and "Consumer.2.Producer.Topic"

What I don't like about virtual topics: I cannot filter the messages from
the Producer.Topic using some selector separately for each consumer WITHIN
the broker itself. I understand that the consumers themselves can set a
selector to listen to only some messages, however, for my application, that
is a security problem. The consumers should not be allowed to see each
other's messages. Basically the producer produces messages on some topic,
and each consumer will have different rights depending on their security
level to see part or all of the messages on that topic. The filtering of
these messages should be performed inside the broker so that the consumers
do not receive any unauthorized message at all. The filtering should be able
to be set to a pretty much arbitrary selector.

At the place where I work, we already have our own custom authentication and
authorization plugin developed that uses virtual topics and overrides the
selector for each consumer connecting, thus the consumers get only the
messages that we set in the selector. However, the problem with this
approach is that any messages produced by the producer that are not
"selected" by the selector on the respective queue, queue up on the queue
indefinitely and thus we have to restart the brokers every once in a while
to clear up their memory.

What we would like to have is the functionality of the composite
destinations using filteredDestinations, where the messages are not even
delivered on the consumers' queues in the first place. This would also buy
us the extra functionality of each consumer being able to set up their own
selector manually (right now, since we override that, they cannot use their
own selector). Since the virtual topics are described using a wildcard, we
would like to describe the composite filteredDestinations using wildcards
too. Also, we would like to intercept within our plugin the moment when a
consumer connects and set the filter for the filteredDestination right then,
programmatically. This way would be able to support (which we do right now
through the weird selector overriding) dynamically changing the
selector/filter for the destination while the broker is running without
restarting the broker to re-read the configuration file.

I already looked throughout ActiveMQ's code to figure out how the
filteredDestinations are created, thinking that I could use that code (or
something similar) to create my own, inside my plugin while consumers are
connecting, dynamically. However it seems to me that the
filteredDestinations are created only when the broker starts up and only
from the configuration file. Also the CompositeDestinationFilter class that
seems to do the forwarding of the messages based on the selector/filter
seems to use a hardcoded FilteredDestination class inside, instead of a more
generic ActiveMQ destination (or an instanceof it). Thus it seems that these
FilteredDestination instances are created only once at startup, and there
are no mutators to access them and dynamically manage them (i.e. create,
delete, change their filters).

I would really like to be wrong about my previous paragraph. If there is any
way in which I can dynamically and programmatically create
filteredDestinations such as the ones described in the activemq.xml file and
someone knows how, please explain it to me. I would assume I would do that
in my plugin somewhere in either addConsumer() or addDestination() method.
Maybe I would have to do some manual filtering in the send() method,
although ideally I would like the CompositeDestinationFilter to take care of
that after I set up the filteredDestinations since it is written already.

If I cannot create these destinations as I described above, maybe one of you
would have some thoughts about how to have this functionality programmed
directly in my plugin. For your reference, all the authentication,
authorization and selector/filter data is read from a database. We already
have the code for that working fine. Right now the database has one selector
defined per user, the idea being that, if we ever wanted separate selectors
for different queues in the same virtual topic, we would just create a new
user in the database.

Thank you very much for you help with this problem.


PS: I was thinking that maybe some functionality like this (dynamically
managing filteredDestinations) would be very helpful for other plugin
developers in other environments too. Maybe in the future, ActiveMQ could
provide some nice hooks about dynamically managing these types of
destinations through the BrokerFilter paradigm. Thanks.
View this message in context:
Sent from the ActiveMQ - Dev mailing list archive at

View raw message