activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Strachan" <james.strac...@gmail.com>
Subject Re: Virtual Topics (was Re: Failover topic subscribers)
Date Mon, 24 Jul 2006 10:29:01 GMT
Many thanks! I've added your patch to svn trunk and have got the test
case working (by tweaking the wildcard slightly and adding a pause
between sending the messages and asserting they are consumed). Yay!

For now I've made the code add a VirtualTopicBroker by default -
unless its disabled. We may  want to get more clever going forward by
allowing virtual topics & queues to be configured via XML in the
broker.xml file.

See if SVN trunk is working for you now - from my initial testing it
looks OK to me

On 7/21/06, bmadigan <bmadigan@orbitz.com> wrote:
>
> Thanks James, that test case works for me too. I wrote a use case that (I
> think) covers the base Virtual Topic functionality. There is a problem
> somewhere that causes this test to fail. Running it in debug I can see that
> the Message is dispatched, but not delivered for some reason. Most of the
> internals for virtual topics seem to be working fine though, so thats good
> news. If you run the test case below, you can see that the MessageListeners
> on the queue don't get any messages.  There is some additional code to add
> the VirtualTopicBroker to the interceptor chain in BrokerService (or it can
> be added as a plugin).
>
> Test case:
>
> package org.apache.activemq.usecases;
>
> import org.apache.log4j.Logger;
> import org.apache.activemq.EmbeddedBrokerTestSupport;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.ActiveMQTopic;
>
> import javax.jms.Session;
> import javax.jms.Connection;
> import javax.jms.MessageProducer;
> import javax.jms.MessageConsumer;
> import javax.jms.MessageListener;
> import javax.jms.Message;
>
> public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport {
>
>     private Connection connection;
>
>     public void testVirtualTopicCreation( )throws Exception{
>         if(connection == null){
>             connection = createConnection();
>         }
>
>         String queueAName  = "ActiveMQ.Virtual.A.TEST";
>         //create consumer 'cluster'
>         ActiveMQQueue queue1 = new ActiveMQQueue(queueAName);
>         ActiveMQQueue queue2 = new ActiveMQQueue(queueAName);
>
>         Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>         MessageConsumer c1 = session.createConsumer(queue1);
>         MessageConsumer c2 =  session.createConsumer(queue2);
>
>         MessageCountListener exclusive1 = new MessageCountListener();
>         c1.setMessageListener(exclusive1);
>
>         MessageCountListener exclusive2 = new MessageCountListener();
>         c2.setMessageListener(exclusive2);
>
>         //create topic producer
>         MessageProducer producer =
>                 session.createProducer(new ActiveMQTopic("TEST"));
>         assertNotNull(producer);
>
>         int total = 10;
>         for(int i = 0; i < total; i++){
>             producer.send(session.createTextMessage("xxxxxxxxxxxxxxxxxx"));
>         }
>
>         int delivered = exclusive1.getCount( ) & exclusive2.getCount();
>         assertTrue("Expected "+total+" delivered, found "+delivered,
>                 delivered == total);
>
>     }
>
>     class MessageCountListener implements MessageListener{
>
>         private int count = 0;
>
>         public void onMessage(Message m){
>             System.out.println("Got one! "+count);
>             count++;
>         }
>
>         public int getCount(){
>             return count;
>         }
>     }
>
>     protected void tearDown() throws Exception {
>         if (connection != null) {
>             connection.close();
>         }
>         super.tearDown();
>     }
>
>
> the Broker:
>
> package org.apache.activemq.broker;
>
> import org.apache.activemq.broker.region.Destination;
> import org.apache.activemq.command.ActiveMQQueue;
> import org.apache.activemq.command.Message;
>
> import java.util.Iterator;
> import java.util.Set;
>
>
> public class VirtualTopicBroker
>         extends BrokerFilter
>         implements BrokerPlugin {
>
>     public static final String VIRTUAL_WILDCARD = "ActiveMQ.Virtual.*";
>
>     public VirtualTopicBroker(Broker next) {
>         super(next);
>     }
>
>     public VirtualTopicBroker() {
>         super(null);
>     }
>
>     public void send(ConnectionContext ctx,
>                      Message message) throws Exception {
>
>         String name = message.getDestination().getPhysicalName();
>
>         String virtualName = VIRTUAL_WILDCARD+ name;
>
>         Set destinations = getDestinations(
>                 new ActiveMQQueue(virtualName));
>
>         for (Iterator iter = destinations.iterator();
>              iter.hasNext();) {
>             Destination dest = (Destination) iter.next();
>             dest.send(ctx, message);
>         }
>         next.send(ctx, message);
>     }
>
>     public Broker installPlugin(Broker broker) throws Exception {
>         return new VirtualTopicBroker(broker);
>     }
> }
>
>
> --
> View this message in context: http://www.nabble.com/Re%3A-Virtual-Topics-%28was-Re%3A-Failover-topic-subscribers%29-tf1942508.html#a5439965
> Sent from the ActiveMQ - Dev forum at Nabble.com.
>
>


-- 

James
-------
http://radio.weblogs.com/0112098/

Mime
View raw message