activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bmadigan <bmadi...@orbitz.com>
Subject Re: Virtual Topics (was Re: Failover topic subscribers)
Date Fri, 21 Jul 2006 20:40:06 GMT

Thanks James, that test case works for me too. I wrote a use case that (I
think) covers the base Virtual Topic functionality. There is a problem
somewhere that causes this test to fail. Running it in debug I can see that
the Message is dispatched, but not delivered for some reason. Most of the
internals for virtual topics seem to be working fine though, so thats good
news. If you run the test case below, you can see that the MessageListeners
on the queue don't get any messages.  There is some additional code to add
the VirtualTopicBroker to the interceptor chain in BrokerService (or it can
be added as a plugin).

Test case:

package org.apache.activemq.usecases;

import org.apache.log4j.Logger;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;

import javax.jms.Session;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Message;

public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {

    private Connection connection;

    public void testVirtualTopicCreation( )throws Exception{
        if(connection == null){
            connection = createConnection();
        }

        String queueAName  = "ActiveMQ.Virtual.A.TEST";
        //create consumer 'cluster'
        ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
        ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);

        Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        MessageConsumer c1 = session.createConsumer(queue1);
        MessageConsumer c2 =  session.createConsumer(queue2);

        MessageCountListener exclusive1 = new MessageCountListener();
        c1.setMessageListener(exclusive1);

        MessageCountListener exclusive2 = new MessageCountListener();
        c2.setMessageListener(exclusive2);

        //create topic producer
        MessageProducer producer =
                session.createProducer(new ActiveMQTopic("TEST"));
        assertNotNull(producer);

        int total = 10;
        for(int i = 0; i < total; i++){
            producer.send(session.createTextMessage("xxxxxxxxxxxxxxxxxx"));
        }

        int delivered = exclusive1.getCount( ) & exclusive2.getCount();
        assertTrue("Expected "+total+" delivered, found "+delivered,
                delivered == total);

    }

    class MessageCountListener implements MessageListener{

        private int count = 0;

        public void onMessage(Message m){
            System.out.println("Got one! "+count);
            count++;
        }

        public int getCount(){
            return count;
        }
    }

    protected void tearDown() throws Exception {
        if (connection != null) {
            connection.close();
        }
        super.tearDown();
    }


the Broker:

package org.apache.activemq.broker;

import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.Message;

import java.util.Iterator;
import java.util.Set;


public class VirtualTopicBroker
        extends BrokerFilter
        implements BrokerPlugin {

    public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*";

    public VirtualTopicBroker(Broker next) {
        super(next);
    }

    public VirtualTopicBroker() {
        super(null);
    }

    public void send(ConnectionContext ctx,
                     Message message) throws Exception {

        String name = message.getDestination().getPhysicalName();

        String virtualName = VIRTUAL_WILDCARD+ name;

        Set destinations = getDestinations(
                new ActiveMQQueue(virtualName));

        for (Iterator iter = destinations.iterator();
             iter.hasNext();) {
            Destination dest = (Destination) iter.next();
            dest.send(ctx, message);
        }
        next.send(ctx, message);
    }

    public Broker installPlugin(Broker broker) throws Exception {
        return new VirtualTopicBroker(broker);
    }
}


-- 
View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
Sent from the ActiveMQ - Dev forum at Nabble.com.


Mime
View raw message