Excellent, thank you Dejan! On Wed, Dec 5, 2012 at 3:46 AM, Dejan Bosanac wrote: > Hi Christian, > > I played a little bit with this test and I think it's a normal behavior. > > For the topic FOO you will add first consumerA, then 100 other > consumers and then consumerB > And for topic BAR consumerA and then consumerB > > So when messages arrive close to each other there's a chance that > consumerB will receive BAR message before FOO message as dispatching > is serially inside the topic and it takes some time to dispatch to all > 100 consumers. > > Here's a link on how to deal with this problem with the single topic > http://activemq.apache.org/total-ordering.html > But with adding composite destination in the mix I think strict > ordering will not work. > > Hope this helps a bit. > > Regards > -- > Dejan Bosanac > ---------------------- > Red Hat, Inc. > FuseSource is now part of Red Hat > dbosanac@redhat.com > Twitter: @dejanb > Blog: http://sensatic.net > ActiveMQ in Action: http://www.manning.com/snyder/ > > > On Tue, Dec 4, 2012 at 3:59 PM, Christian Posta > wrote: > > I just wrote a unit test verifying the behavior I discussed above (ie, > the > > messages are delivered in different order). I guess it makes sense since > > the connections are all on different threads. I wonder if this can cause > an > > issue though? > > > > package org.apache.activemq.usecases; > > > > import org.apache.activemq.ActiveMQConnectionFactory; > > import org.apache.activemq.broker.BrokerService; > > > > import javax.jms.*; > > import java.util.ArrayList; > > import java.util.concurrent.CyclicBarrier; > > import java.util.concurrent.ExecutorService; > > import java.util.concurrent.Executors; > > > > /** > > * @author Christian > Posta > > */ > > public class TopicDispatchTest extends TestSupport{ > > > > protected BrokerService brokerService; > > > > public void testFoo() throws Exception { > > > > Connection consumerAConnection = null; > > Connection consumerBConnection = null; > > try { > > // create two consumers on diff connections > > consumerAConnection = createConnection(); > > consumerAConnection.start(); > > Session consumerASession = > > consumerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); > > Destination consumerADestination = > createDestination("FOO,BAR"); > > MessageConsumer A = > > consumerASession.createConsumer(consumerADestination); > > final ArrayList messagesForConsumerA = new > > ArrayList(); > > > > A.setMessageListener(new MessageListener() { > > @Override > > public void onMessage(Message message) { > > try { > > if (message instanceof TextMessage) { > > String value = ((TextMessage) > > message).getText(); > > messagesForConsumerA.add(value); > > } > > } catch (JMSException e) { > > e.printStackTrace(); //To change body of catch > > statement use File | Settings | File Templates. > > } > > } > > }); > > > > createTenThousandConsumersToTopicFoo(); > > > > consumerBConnection = createConnection(); > > consumerBConnection.start(); > > Session consumerBSession = > > consumerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); > > // Destination consumerBDestination = > > createDestination("FOO,BAR"); > > MessageConsumer B = > > consumerBSession.createConsumer(consumerADestination); > > final ArrayList messagesForConsumerB = new > > ArrayList(); > > > > B.setMessageListener(new MessageListener() { > > @Override > > public void onMessage(Message message) { > > try { > > if (message instanceof TextMessage) { > > String value = ((TextMessage) > > message).getText(); > > messagesForConsumerB.add(value); > > } > > } catch (JMSException e) { > > e.printStackTrace(); //To change body of catch > > statement use File | Settings | File Templates. > > } > > } > > }); > > > > > > final CyclicBarrier barrier = new CyclicBarrier(2); > > > > Runnable producerAThread = new Runnable() { > > @Override > > public void run() { > > > > try { > > barrier.await(); > > // create two producers, each on separate > > connections > > Connection producerAConnection = null; > > > > producerAConnection = createConnection(); > > producerAConnection.start(); > > Session producerASession = > > producerAConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); > > Destination producerADestination = > > createDestination("FOO"); > > MessageProducer producerA = > > producerASession.createProducer(producerADestination); > > > > producerA.send(producerASession.createTextMessage("aaa")); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > } > > }; > > > > > > Runnable producerBThread = new Runnable() { > > @Override > > public void run() { > > try { > > barrier.await(); > > Connection producerBConnection = null; > > > > producerBConnection = createConnection(); > > producerBConnection.start(); > > Session producerBSession = > > producerBConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); > > Destination producerBDestination = > > createDestination("BAR"); > > MessageProducer producerB = > > producerBSession.createProducer(producerBDestination); > > > > producerB.send(producerBSession.createTextMessage("ccc")); > > } catch (Exception e) { > > e.printStackTrace(); > > } > > } > > }; > > > > > > ExecutorService threadService = > Executors.newFixedThreadPool(2); > > threadService.submit(producerAThread); > > threadService.submit(producerBThread); > > > > > > > > //situational wait > > // TimeUnit.SECONDS.wait(2); > > Thread.sleep(2000); > > > > assertEquals("aaa", messagesForConsumerA.get(0)); > > assertEquals("ccc", messagesForConsumerA.get(1)); > > > > assertEquals("aaa", messagesForConsumerB.get(0)); > > assertEquals("ccc", messagesForConsumerB.get(1)); > > > > > > > > } catch (Exception e) { > > e.printStackTrace(); > > } finally { > > consumerAConnection.close(); > > consumerBConnection.close(); > > } > > > > > > } > > > > private void createTenThousandConsumersToTopicFoo() throws Exception > { > > Connection loadConnection = createConnection(); > > Session loadSession = loadConnection.createSession(false, > > Session.AUTO_ACKNOWLEDGE); > > Destination fooDest = createDestination("FOO"); > > for (int i = 0; i < 100; i++) { > > loadSession.createConsumer(fooDest); > > } > > } > > > > @Override > > protected ActiveMQConnectionFactory createConnectionFactory() throws > > Exception { > > return new > > > ActiveMQConnectionFactory("tcp://localhost:61616?jms.watchTopicAdvisories=false"); > > } > > > > @Override > > protected void setUp() throws Exception { > > topic = true; > > startBroker(); > > } > > > > @Override > > protected void tearDown() throws Exception { > > brokerService.stop(); > > brokerService.waitUntilStopped(); > > } > > > > private void startBroker() throws Exception { > > brokerService = new BrokerService(); > > brokerService.addConnector("tcp://localhost:61616"); > > brokerService.start(); > > brokerService.waitUntilStarted(); > > } > > } > > > > > > > > On Tue, Dec 4, 2012 at 2:44 AM, Dejan Bosanac > wrote: > > > >> Hi Christian, > >> > >> I think this shouldn't happen as message on its arrival goes to every > >> subscription and then is dispatched from there. So if messages arrive > >> in the same order in the topics they should later on be dispatched in > >> that same order. > >> > >> Regards > >> -- > >> Dejan Bosanac > >> ---------------------- > >> Red Hat, Inc. > >> FuseSource is now part of Red Hat > >> dbosanac@redhat.com > >> Twitter: @dejanb > >> Blog: http://sensatic.net > >> ActiveMQ in Action: http://www.manning.com/snyder/ > >> > >> > >> On Mon, Dec 3, 2012 at 7:40 PM, Christian Posta > >> wrote: > >> > So I think this is how it works, but looking for a quick > clarification. > >> > > >> > > >> > I have a producer to Topic FOO. I have a different producer on a > >> different > >> > connection to Topic BAR. > >> > > >> > I have two consumers A and B. Both on separate connections. They both > >> > subscribe to the same composite destination topic://FOO,topic://BAR > >> > > >> > Is it possible for the ordering of the messages as seen by A to be > >> > different than B? > >> > > >> > > >> > From what I understand, the answer is Yes. > >> > Scenario: > >> > > >> > FOO dispatches "aaa" message to A > >> > BAR dispatches "ccc" message to A > >> > FOO gets busy dispatching to other subs it might have > >> > BAR dispatches "ccc" to B > >> > FOO finally dispatches "aaa" to B > >> > > >> > Then consumer A would have seen "aaa" "ccc" > >> > Consumer B would have seen "ccc" "aaa" > >> > > >> > Is this correct? > >> > > >> > -- > >> > *Christian Posta* > >> > http://www.christianposta.com/blog > >> > twitter: @christianposta > >> > > > > > > > > -- > > *Christian Posta* > > http://www.christianposta.com/blog > > twitter: @christianposta > -- *Christian Posta* http://www.christianposta.com/blog twitter: @christianposta