activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Posta <christian.po...@gmail.com>
Subject Re: subscription dispatching
Date Tue, 04 Dec 2012 14:59:25 GMT
I just wrote a unit test verifying the behavior I discussed above (ie, the
messages are delivered in different order). I guess it makes sense since
the connections are all on different threads. I wonder if this can cause an
issue though?

package org.apache.activemq.usecases;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;

import javax.jms.*;
import java.util.ArrayList;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
 */
public class TopicDispatchTest extends TestSupport{

    protected BrokerService brokerService;

    public void testFoo() throws Exception {

        Connection consumerAConnection = null;
        Connection consumerBConnection = null;
        try {
            // create two consumers on diff connections
            consumerAConnection = createConnection();
            consumerAConnection.start();
            Session consumerASession =
consumerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination consumerADestination = createDestination("FOO,BAR");
            MessageConsumer A =
consumerASession.createConsumer(consumerADestination);
            final ArrayList<String> messagesForConsumerA = new
ArrayList<String>();

            A.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            String value = ((TextMessage)
message).getText();
                            messagesForConsumerA.add(value);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();  //To change body of catch
statement use File | Settings | File Templates.
                    }
                }
            });

            createTenThousandConsumersToTopicFoo();

            consumerBConnection = createConnection();
            consumerBConnection.start();
            Session consumerBSession =
consumerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//            Destination consumerBDestination =
createDestination("FOO,BAR");
            MessageConsumer B =
consumerBSession.createConsumer(consumerADestination);
            final ArrayList<String> messagesForConsumerB = new
ArrayList<String>();

            B.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            String value = ((TextMessage)
message).getText();
                            messagesForConsumerB.add(value);
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();  //To change body of catch
statement use File | Settings | File Templates.
                    }
                }
            });


            final CyclicBarrier barrier = new CyclicBarrier(2);

            Runnable producerAThread = new Runnable() {
                @Override
                public void run() {

                    try {
                        barrier.await();
                        // create two producers, each on separate
connections
                        Connection producerAConnection = null;

                        producerAConnection = createConnection();
                        producerAConnection.start();
                        Session producerASession =
producerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                        Destination producerADestination =
createDestination("FOO");
                        MessageProducer producerA =
producerASession.createProducer(producerADestination);

producerA.send(producerASession.createTextMessage("aaa"));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };


            Runnable producerBThread = new Runnable() {
                @Override
                public void run() {
                    try {
                        barrier.await();
                        Connection producerBConnection = null;

                        producerBConnection = createConnection();
                        producerBConnection.start();
                        Session producerBSession =
producerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                        Destination producerBDestination =
createDestination("BAR");
                        MessageProducer producerB =
producerBSession.createProducer(producerBDestination);

producerB.send(producerBSession.createTextMessage("ccc"));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };


            ExecutorService threadService = Executors.newFixedThreadPool(2);
            threadService.submit(producerAThread);
            threadService.submit(producerBThread);



            //situational wait
//            TimeUnit.SECONDS.wait(2);
            Thread.sleep(2000);

            assertEquals("aaa", messagesForConsumerA.get(0));
            assertEquals("ccc", messagesForConsumerA.get(1));

            assertEquals("aaa", messagesForConsumerB.get(0));
            assertEquals("ccc", messagesForConsumerB.get(1));



        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumerAConnection.close();
            consumerBConnection.close();
        }


    }

    private void createTenThousandConsumersToTopicFoo() throws Exception {
        Connection loadConnection = createConnection();
        Session loadSession = loadConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
        Destination fooDest = createDestination("FOO");
        for (int i = 0; i < 100; i++) {
            loadSession.createConsumer(fooDest);
        }
    }

    @Override
    protected ActiveMQConnectionFactory createConnectionFactory() throws
Exception {
        return new
ActiveMQConnectionFactory("tcp://localhost:61616?jms.watchTopicAdvisories=false");
    }

    @Override
    protected void setUp() throws Exception {
        topic = true;
        startBroker();
    }

    @Override
    protected void tearDown() throws Exception {
        brokerService.stop();
        brokerService.waitUntilStopped();
    }

    private void startBroker() throws Exception {
        brokerService = new BrokerService();
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
        brokerService.waitUntilStarted();
    }
}



On Tue, Dec 4, 2012 at 2:44 AM, Dejan Bosanac <dejan@nighttale.net> wrote:

> Hi Christian,
>
> I think this shouldn't happen as message on its arrival goes to every
> subscription and then is dispatched from there. So if messages arrive
> in the same order in the topics they should later on be dispatched in
> that same order.
>
> Regards
> --
> Dejan Bosanac
> ----------------------
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> dbosanac@redhat.com
> Twitter: @dejanb
> Blog: http://sensatic.net
> ActiveMQ in Action: http://www.manning.com/snyder/
>
>
> On Mon, Dec 3, 2012 at 7:40 PM, Christian Posta
> <christian.posta@gmail.com> wrote:
> > So I think this is how it works, but looking for a quick clarification.
> >
> >
> > I have a producer to Topic FOO. I have a different producer on a
> different
> > connection to Topic BAR.
> >
> > I have two consumers A and B. Both on separate connections. They both
> > subscribe to the same composite destination topic://FOO,topic://BAR
> >
> > Is it possible for the ordering of the messages as seen by A to be
> > different than B?
> >
> >
> > From what I understand, the answer is Yes.
> > Scenario:
> >
> > FOO dispatches "aaa" message to A
> > BAR dispatches "ccc" message to A
> > FOO gets busy dispatching to other subs it might have
> > BAR dispatches "ccc" to B
> > FOO finally dispatches "aaa" to B
> >
> > Then consumer A would have seen "aaa" "ccc"
> > Consumer B would have seen "ccc" "aaa"
> >
> > Is this correct?
> >
> > --
> > *Christian Posta*
> > http://www.christianposta.com/blog
> > twitter: @christianposta
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message