activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Christian Posta <christian.po...@gmail.com>
Subject Re: subscription dispatching
Date Wed, 05 Dec 2012 13:00:27 GMT
Excellent, thank you Dejan!


On Wed, Dec 5, 2012 at 3:46 AM, Dejan Bosanac <dejan@nighttale.net> wrote:

> Hi Christian,
>
> I played a little bit with this test and I think it's a normal behavior.
>
> For the topic FOO you will add first consumerA, then 100 other
> consumers and then consumerB
> And for topic BAR consumerA and then consumerB
>
> So when messages arrive close to each other there's a chance that
> consumerB will receive BAR message before FOO message as dispatching
> is serially inside the topic and it takes some time to dispatch to all
> 100 consumers.
>
> Here's a link on how to deal with this problem with the single topic
> http://activemq.apache.org/total-ordering.html
> But with adding composite destination in the mix I think strict
> ordering will not work.
>
> Hope this helps a bit.
>
> Regards
> --
> Dejan Bosanac
> ----------------------
> Red Hat, Inc.
> FuseSource is now part of Red Hat
> dbosanac@redhat.com
> Twitter: @dejanb
> Blog: http://sensatic.net
> ActiveMQ in Action: http://www.manning.com/snyder/
>
>
> On Tue, Dec 4, 2012 at 3:59 PM, Christian Posta
> <christian.posta@gmail.com> wrote:
> > I just wrote a unit test verifying the behavior I discussed above (ie,
> the
> > messages are delivered in different order). I guess it makes sense since
> > the connections are all on different threads. I wonder if this can cause
> an
> > issue though?
> >
> > package org.apache.activemq.usecases;
> >
> > import org.apache.activemq.ActiveMQConnectionFactory;
> > import org.apache.activemq.broker.BrokerService;
> >
> > import javax.jms.*;
> > import java.util.ArrayList;
> > import java.util.concurrent.CyclicBarrier;
> > import java.util.concurrent.ExecutorService;
> > import java.util.concurrent.Executors;
> >
> > /**
> >  * @author <a href="http://www.christianposta.com/blog">Christian
> Posta</a>
> >  */
> > public class TopicDispatchTest extends TestSupport{
> >
> >     protected BrokerService brokerService;
> >
> >     public void testFoo() throws Exception {
> >
> >         Connection consumerAConnection = null;
> >         Connection consumerBConnection = null;
> >         try {
> >             // create two consumers on diff connections
> >             consumerAConnection = createConnection();
> >             consumerAConnection.start();
> >             Session consumerASession =
> > consumerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> >             Destination consumerADestination =
> createDestination("FOO,BAR");
> >             MessageConsumer A =
> > consumerASession.createConsumer(consumerADestination);
> >             final ArrayList<String> messagesForConsumerA = new
> > ArrayList<String>();
> >
> >             A.setMessageListener(new MessageListener() {
> >                 @Override
> >                 public void onMessage(Message message) {
> >                     try {
> >                         if (message instanceof TextMessage) {
> >                             String value = ((TextMessage)
> > message).getText();
> >                             messagesForConsumerA.add(value);
> >                         }
> >                     } catch (JMSException e) {
> >                         e.printStackTrace();  //To change body of catch
> > statement use File | Settings | File Templates.
> >                     }
> >                 }
> >             });
> >
> >             createTenThousandConsumersToTopicFoo();
> >
> >             consumerBConnection = createConnection();
> >             consumerBConnection.start();
> >             Session consumerBSession =
> > consumerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> > //            Destination consumerBDestination =
> > createDestination("FOO,BAR");
> >             MessageConsumer B =
> > consumerBSession.createConsumer(consumerADestination);
> >             final ArrayList<String> messagesForConsumerB = new
> > ArrayList<String>();
> >
> >             B.setMessageListener(new MessageListener() {
> >                 @Override
> >                 public void onMessage(Message message) {
> >                     try {
> >                         if (message instanceof TextMessage) {
> >                             String value = ((TextMessage)
> > message).getText();
> >                             messagesForConsumerB.add(value);
> >                         }
> >                     } catch (JMSException e) {
> >                         e.printStackTrace();  //To change body of catch
> > statement use File | Settings | File Templates.
> >                     }
> >                 }
> >             });
> >
> >
> >             final CyclicBarrier barrier = new CyclicBarrier(2);
> >
> >             Runnable producerAThread = new Runnable() {
> >                 @Override
> >                 public void run() {
> >
> >                     try {
> >                         barrier.await();
> >                         // create two producers, each on separate
> > connections
> >                         Connection producerAConnection = null;
> >
> >                         producerAConnection = createConnection();
> >                         producerAConnection.start();
> >                         Session producerASession =
> > producerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> >                         Destination producerADestination =
> > createDestination("FOO");
> >                         MessageProducer producerA =
> > producerASession.createProducer(producerADestination);
> >
> > producerA.send(producerASession.createTextMessage("aaa"));
> >                     } catch (Exception e) {
> >                         e.printStackTrace();
> >                     }
> >                 }
> >             };
> >
> >
> >             Runnable producerBThread = new Runnable() {
> >                 @Override
> >                 public void run() {
> >                     try {
> >                         barrier.await();
> >                         Connection producerBConnection = null;
> >
> >                         producerBConnection = createConnection();
> >                         producerBConnection.start();
> >                         Session producerBSession =
> > producerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> >                         Destination producerBDestination =
> > createDestination("BAR");
> >                         MessageProducer producerB =
> > producerBSession.createProducer(producerBDestination);
> >
> > producerB.send(producerBSession.createTextMessage("ccc"));
> >                     } catch (Exception e) {
> >                         e.printStackTrace();
> >                     }
> >                 }
> >             };
> >
> >
> >             ExecutorService threadService =
> Executors.newFixedThreadPool(2);
> >             threadService.submit(producerAThread);
> >             threadService.submit(producerBThread);
> >
> >
> >
> >             //situational wait
> > //            TimeUnit.SECONDS.wait(2);
> >             Thread.sleep(2000);
> >
> >             assertEquals("aaa", messagesForConsumerA.get(0));
> >             assertEquals("ccc", messagesForConsumerA.get(1));
> >
> >             assertEquals("aaa", messagesForConsumerB.get(0));
> >             assertEquals("ccc", messagesForConsumerB.get(1));
> >
> >
> >
> >         } catch (Exception e) {
> >             e.printStackTrace();
> >         } finally {
> >             consumerAConnection.close();
> >             consumerBConnection.close();
> >         }
> >
> >
> >     }
> >
> >     private void createTenThousandConsumersToTopicFoo() throws Exception
> {
> >         Connection loadConnection = createConnection();
> >         Session loadSession = loadConnection.createSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >         Destination fooDest = createDestination("FOO");
> >         for (int i = 0; i < 100; i++) {
> >             loadSession.createConsumer(fooDest);
> >         }
> >     }
> >
> >     @Override
> >     protected ActiveMQConnectionFactory createConnectionFactory() throws
> > Exception {
> >         return new
> >
> ActiveMQConnectionFactory("tcp://localhost:61616?jms.watchTopicAdvisories=false");
> >     }
> >
> >     @Override
> >     protected void setUp() throws Exception {
> >         topic = true;
> >         startBroker();
> >     }
> >
> >     @Override
> >     protected void tearDown() throws Exception {
> >         brokerService.stop();
> >         brokerService.waitUntilStopped();
> >     }
> >
> >     private void startBroker() throws Exception {
> >         brokerService = new BrokerService();
> >         brokerService.addConnector("tcp://localhost:61616");
> >         brokerService.start();
> >         brokerService.waitUntilStarted();
> >     }
> > }
> >
> >
> >
> > On Tue, Dec 4, 2012 at 2:44 AM, Dejan Bosanac <dejan@nighttale.net>
> wrote:
> >
> >> Hi Christian,
> >>
> >> I think this shouldn't happen as message on its arrival goes to every
> >> subscription and then is dispatched from there. So if messages arrive
> >> in the same order in the topics they should later on be dispatched in
> >> that same order.
> >>
> >> Regards
> >> --
> >> Dejan Bosanac
> >> ----------------------
> >> Red Hat, Inc.
> >> FuseSource is now part of Red Hat
> >> dbosanac@redhat.com
> >> Twitter: @dejanb
> >> Blog: http://sensatic.net
> >> ActiveMQ in Action: http://www.manning.com/snyder/
> >>
> >>
> >> On Mon, Dec 3, 2012 at 7:40 PM, Christian Posta
> >> <christian.posta@gmail.com> wrote:
> >> > So I think this is how it works, but looking for a quick
> clarification.
> >> >
> >> >
> >> > I have a producer to Topic FOO. I have a different producer on a
> >> different
> >> > connection to Topic BAR.
> >> >
> >> > I have two consumers A and B. Both on separate connections. They both
> >> > subscribe to the same composite destination topic://FOO,topic://BAR
> >> >
> >> > Is it possible for the ordering of the messages as seen by A to be
> >> > different than B?
> >> >
> >> >
> >> > From what I understand, the answer is Yes.
> >> > Scenario:
> >> >
> >> > FOO dispatches "aaa" message to A
> >> > BAR dispatches "ccc" message to A
> >> > FOO gets busy dispatching to other subs it might have
> >> > BAR dispatches "ccc" to B
> >> > FOO finally dispatches "aaa" to B
> >> >
> >> > Then consumer A would have seen "aaa" "ccc"
> >> > Consumer B would have seen "ccc" "aaa"
> >> >
> >> > Is this correct?
> >> >
> >> > --
> >> > *Christian Posta*
> >> > http://www.christianposta.com/blog
> >> > twitter: @christianposta
> >>
> >
> >
> >
> > --
> > *Christian Posta*
> > http://www.christianposta.com/blog
> > twitter: @christianposta
>



-- 
*Christian Posta*
http://www.christianposta.com/blog
twitter: @christianposta

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message