activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dejan Bosanac <de...@nighttale.net>
Subject Re: subscription dispatching
Date Wed, 05 Dec 2012 10:46:42 GMT
Hi Christian,

I played a little bit with this test and I think it's a normal behavior.

For the topic FOO you will add first consumerA, then 100 other
consumers and then consumerB
And for topic BAR consumerA and then consumerB

So when messages arrive close to each other there's a chance that
consumerB will receive BAR message before FOO message as dispatching
is serially inside the topic and it takes some time to dispatch to all
100 consumers.

Here's a link on how to deal with this problem with the single topic
http://activemq.apache.org/total-ordering.html
But with adding composite destination in the mix I think strict
ordering will not work.

Hope this helps a bit.

Regards
--
Dejan Bosanac
----------------------
Red Hat, Inc.
FuseSource is now part of Red Hat
dbosanac@redhat.com
Twitter: @dejanb
Blog: http://sensatic.net
ActiveMQ in Action: http://www.manning.com/snyder/


On Tue, Dec 4, 2012 at 3:59 PM, Christian Posta
<christian.posta@gmail.com> wrote:
> I just wrote a unit test verifying the behavior I discussed above (ie, the
> messages are delivered in different order). I guess it makes sense since
> the connections are all on different threads. I wonder if this can cause an
> issue though?
>
> package org.apache.activemq.usecases;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
>
> import javax.jms.*;
> import java.util.ArrayList;
> import java.util.concurrent.CyclicBarrier;
> import java.util.concurrent.ExecutorService;
> import java.util.concurrent.Executors;
>
> /**
>  * @author <a href="http://www.christianposta.com/blog">Christian Posta</a>
>  */
> public class TopicDispatchTest extends TestSupport{
>
>     protected BrokerService brokerService;
>
>     public void testFoo() throws Exception {
>
>         Connection consumerAConnection = null;
>         Connection consumerBConnection = null;
>         try {
>             // create two consumers on diff connections
>             consumerAConnection = createConnection();
>             consumerAConnection.start();
>             Session consumerASession =
> consumerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>             Destination consumerADestination = createDestination("FOO,BAR");
>             MessageConsumer A =
> consumerASession.createConsumer(consumerADestination);
>             final ArrayList<String> messagesForConsumerA = new
> ArrayList<String>();
>
>             A.setMessageListener(new MessageListener() {
>                 @Override
>                 public void onMessage(Message message) {
>                     try {
>                         if (message instanceof TextMessage) {
>                             String value = ((TextMessage)
> message).getText();
>                             messagesForConsumerA.add(value);
>                         }
>                     } catch (JMSException e) {
>                         e.printStackTrace();  //To change body of catch
> statement use File | Settings | File Templates.
>                     }
>                 }
>             });
>
>             createTenThousandConsumersToTopicFoo();
>
>             consumerBConnection = createConnection();
>             consumerBConnection.start();
>             Session consumerBSession =
> consumerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
> //            Destination consumerBDestination =
> createDestination("FOO,BAR");
>             MessageConsumer B =
> consumerBSession.createConsumer(consumerADestination);
>             final ArrayList<String> messagesForConsumerB = new
> ArrayList<String>();
>
>             B.setMessageListener(new MessageListener() {
>                 @Override
>                 public void onMessage(Message message) {
>                     try {
>                         if (message instanceof TextMessage) {
>                             String value = ((TextMessage)
> message).getText();
>                             messagesForConsumerB.add(value);
>                         }
>                     } catch (JMSException e) {
>                         e.printStackTrace();  //To change body of catch
> statement use File | Settings | File Templates.
>                     }
>                 }
>             });
>
>
>             final CyclicBarrier barrier = new CyclicBarrier(2);
>
>             Runnable producerAThread = new Runnable() {
>                 @Override
>                 public void run() {
>
>                     try {
>                         barrier.await();
>                         // create two producers, each on separate
> connections
>                         Connection producerAConnection = null;
>
>                         producerAConnection = createConnection();
>                         producerAConnection.start();
>                         Session producerASession =
> producerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>                         Destination producerADestination =
> createDestination("FOO");
>                         MessageProducer producerA =
> producerASession.createProducer(producerADestination);
>
> producerA.send(producerASession.createTextMessage("aaa"));
>                     } catch (Exception e) {
>                         e.printStackTrace();
>                     }
>                 }
>             };
>
>
>             Runnable producerBThread = new Runnable() {
>                 @Override
>                 public void run() {
>                     try {
>                         barrier.await();
>                         Connection producerBConnection = null;
>
>                         producerBConnection = createConnection();
>                         producerBConnection.start();
>                         Session producerBSession =
> producerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>                         Destination producerBDestination =
> createDestination("BAR");
>                         MessageProducer producerB =
> producerBSession.createProducer(producerBDestination);
>
> producerB.send(producerBSession.createTextMessage("ccc"));
>                     } catch (Exception e) {
>                         e.printStackTrace();
>                     }
>                 }
>             };
>
>
>             ExecutorService threadService = Executors.newFixedThreadPool(2);
>             threadService.submit(producerAThread);
>             threadService.submit(producerBThread);
>
>
>
>             //situational wait
> //            TimeUnit.SECONDS.wait(2);
>             Thread.sleep(2000);
>
>             assertEquals("aaa", messagesForConsumerA.get(0));
>             assertEquals("ccc", messagesForConsumerA.get(1));
>
>             assertEquals("aaa", messagesForConsumerB.get(0));
>             assertEquals("ccc", messagesForConsumerB.get(1));
>
>
>
>         } catch (Exception e) {
>             e.printStackTrace();
>         } finally {
>             consumerAConnection.close();
>             consumerBConnection.close();
>         }
>
>
>     }
>
>     private void createTenThousandConsumersToTopicFoo() throws Exception {
>         Connection loadConnection = createConnection();
>         Session loadSession = loadConnection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>         Destination fooDest = createDestination("FOO");
>         for (int i = 0; i < 100; i++) {
>             loadSession.createConsumer(fooDest);
>         }
>     }
>
>     @Override
>     protected ActiveMQConnectionFactory createConnectionFactory() throws
> Exception {
>         return new
> ActiveMQConnectionFactory("tcp://localhost:61616?jms.watchTopicAdvisories=false");
>     }
>
>     @Override
>     protected void setUp() throws Exception {
>         topic = true;
>         startBroker();
>     }
>
>     @Override
>     protected void tearDown() throws Exception {
>         brokerService.stop();
>         brokerService.waitUntilStopped();
>     }
>
>     private void startBroker() throws Exception {
>         brokerService = new BrokerService();
>         brokerService.addConnector("tcp://localhost:61616");
>         brokerService.start();
>         brokerService.waitUntilStarted();
>     }
> }
>
>
>
> On Tue, Dec 4, 2012 at 2:44 AM, Dejan Bosanac <dejan@nighttale.net> wrote:
>
>> Hi Christian,
>>
>> I think this shouldn't happen as message on its arrival goes to every
>> subscription and then is dispatched from there. So if messages arrive
>> in the same order in the topics they should later on be dispatched in
>> that same order.
>>
>> Regards
>> --
>> Dejan Bosanac
>> ----------------------
>> Red Hat, Inc.
>> FuseSource is now part of Red Hat
>> dbosanac@redhat.com
>> Twitter: @dejanb
>> Blog: http://sensatic.net
>> ActiveMQ in Action: http://www.manning.com/snyder/
>>
>>
>> On Mon, Dec 3, 2012 at 7:40 PM, Christian Posta
>> <christian.posta@gmail.com> wrote:
>> > So I think this is how it works, but looking for a quick clarification.
>> >
>> >
>> > I have a producer to Topic FOO. I have a different producer on a
>> different
>> > connection to Topic BAR.
>> >
>> > I have two consumers A and B. Both on separate connections. They both
>> > subscribe to the same composite destination topic://FOO,topic://BAR
>> >
>> > Is it possible for the ordering of the messages as seen by A to be
>> > different than B?
>> >
>> >
>> > From what I understand, the answer is Yes.
>> > Scenario:
>> >
>> > FOO dispatches "aaa" message to A
>> > BAR dispatches "ccc" message to A
>> > FOO gets busy dispatching to other subs it might have
>> > BAR dispatches "ccc" to B
>> > FOO finally dispatches "aaa" to B
>> >
>> > Then consumer A would have seen "aaa" "ccc"
>> > Consumer B would have seen "ccc" "aaa"
>> >
>> > Is this correct?
>> >
>> > --
>> > *Christian Posta*
>> > http://www.christianposta.com/blog
>> > twitter: @christianposta
>>
>
>
>
> --
> *Christian Posta*
> http://www.christianposta.com/blog
> twitter: @christianposta

Mime
View raw message