activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <>
Subject Virtual Topics (was Re: Failover topic subscribers)
Date Fri, 14 Jul 2006 09:37:41 GMT
I thought I'd change the subject as really we are discussing a kind of
Virtual Topics where folks can use Queues to subscribe and consume
from them etc...

On 7/13/06, bmadigan <> wrote:
> This is almost working,


> there are a few things I need to fix:
> - Need to figure out how to add the new Broker to the factory without using
> the plugin loader

I was thinking, we should maybe add the Virtual Topic interceptor to
the broker by default, but allow the configuration to be overridden in
the activemq.xml as I can't help think it's be great to be able to use
it out of the box.

We've already got the "ActiveMQ.Advisory." prefix in topics to be used
for advisories. So how about we use ActiveMQ.Virtual. as the prefix
for the default prefix for virtual topics & we let folks customize
this if they don't like the defaults?

So we should maybe add a VirtualDestinationPlugin property on the
BrokerService which is created by default (unless explicitly disabled
using a bean property) which would auto-default to a sensible default
that could be overloaded- or forks could set the virtual destinations
to something else if they prefer etc.

> - It may not be a problem, but I'm synchronizing  on next when I create the
> queues for the virtual groups in addConsumer().  This could be finer grained
> I think.

Am not sure if you need to do that; I think the default mechanism of
clients consuming on the queue with the destination auto-created on
the fly would be fine?

> - I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
> consumer to a virtual queue. This is probably incorrect, not sure if there
> is a better way.

Are you trying to transform a durable topic consumer into a queue
consumer? Am not sure you need. Am thinking if you are a topic
consumer (durable or non-durable) then we leave you as you are; a
fully JMS compliant durable topic consumer. However to use the nice
queue-centric virtual topics, you really use a queue consumer of the
right name and things all just work. (BTW the topic regions are
optimised so that if no consumers are available, publishing to a topic
is a no-op)

> - The virtual queues can't provide subscription recovery. Not sure how to
> handle that.

Yeah thats true. Am not too worried about that right now - but we
could look at fixing that later on.

> I created a BrokerFilter subclass which overrides addConsumer() and send():
>  public Subscription addConsumer(ConnectionContext cc,
>                                     ConsumerInfo ci) throws Exception {
>         synchronized(next){
>             String name  = ci.getDestination().getPhysicalName();
>             if(name.startsWith(VIRTUAL)){
>                 Set destinations = getDestinations(
>                         new ActiveMQQueue(name));
>                 if(destinations.size()==0){//create a new virtual queue
>                     ActiveMQQueue queue = new ActiveMQQueue(
>                             name+"?consumer.exclusive=true");
>                         next.addDestination(cc,queue);
>                     ci.setDestination(queue);
>                 }else{ //queue exists, add the consumer
>                     ActiveMQQueue queue = (ActiveMQQueue)
>                             destinations.iterator().next();
>                     ci.setDestination(queue);
>                 }
>             }
>         }
>         return next.addConsumer(cc, ci);
>     }
>     public void send(ConnectionContext ctx,
>                      Message message) throws Exception {
>         String topic = message.getDestination().getPhysicalName();
>         Iterator destinations = getDestinations(
>                 new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
>         while(destinations.hasNext()){
>             Destination dest = (Destination);
>             dest.send(ctx, message);
>         }
>         next.send(ctx, message);
>     }
> Except for the subscription recovery part, this seems to work.

Looks great - though am thinking we only really need the send() part -
as the other stuff like adding consumers should just work out side the
box (unless I'm missing something).

BTW do you want to submit a patch, then we can start wiring this stuff
in? Great work though - am excited to see this stuff implemented! :)



View raw message