activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bmadigan <bmadi...@orbitz.com>
Subject Re: Failover topic subscribers
Date Thu, 13 Jul 2006 21:46:47 GMT

This is almost working, there are a few things I need to fix:
- Need to figure out how to add the new Broker to the factory without using
the plugin loader
- It may not be a problem, but I'm synchronizing  on next when I create the
queues for the virtual groups in addConsumer().  This could be finer grained
I think.
- I'm calling ConsumerInfo.setDestination(virtualQueue) to point the
consumer to a virtual queue. This is probably incorrect, not sure if there
is a better way. 
- The virtual queues can't provide subscription recovery. Not sure how to
handle that.

I created a BrokerFilter subclass which overrides addConsumer() and send():

 public Subscription addConsumer(ConnectionContext cc,
                                    ConsumerInfo ci) throws Exception {
        synchronized(next){
            String name  = ci.getDestination().getPhysicalName();
            if(name.startsWith(VIRTUAL)){
                Set destinations = getDestinations(
                        new ActiveMQQueue(name));
                if(destinations.size()==0){//create a new virtual queue
                    ActiveMQQueue queue = new ActiveMQQueue(
                            name+"?consumer.exclusive=true");
                        next.addDestination(cc,queue);
                    ci.setDestination(queue);
                }else{ //queue exists, add the consumer
                    ActiveMQQueue queue = (ActiveMQQueue)
                            destinations.iterator().next();
                    ci.setDestination(queue);
                }
            }
        }
        return next.addConsumer(cc, ci);
    }

    public void send(ConnectionContext ctx,
                     Message message) throws Exception {
        String topic = message.getDestination().getPhysicalName();
        Iterator destinations = getDestinations(
                new ActiveMQQueue(VIRTUAL + ".*." + topic)).iterator();
        while(destinations.hasNext()){
            Destination dest = (Destination) destinations.next();
            dest.send(ctx, message);
        }
        next.send(ctx, message);
    }

Except for the subscription recovery part, this seems to work. 

-- 
View this message in context: http://www.nabble.com/Failover-topic-subscribers-tf1896829.html#a5317425
Sent from the ActiveMQ - User forum at Nabble.com.


Mime
View raw message