activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1458514 - /activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
Date Tue, 19 Mar 2013 21:02:24 GMT
Author: tabish
Date: Tue Mar 19 21:02:24 2013
New Revision: 1458514

URL: http://svn.apache.org/r1458514
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4389

Modified:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java?rev=1458514&r1=1458513&r2=1458514&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
Tue Mar 19 21:02:24 2013
@@ -130,9 +130,14 @@ public class AMQ4351Test extends BrokerT
         final AtomicLong size = new AtomicLong();
         final AtomicBoolean done = new AtomicBoolean();
         CountDownLatch doneLatch = new CountDownLatch(1);
+        CountDownLatch started;
+        CountDownLatch finished;
 
-        public ConsumingClient(String name) {
+
+        public ConsumingClient(String name, CountDownLatch started, CountDownLatch finished)
{
             this.name = name;
+            this.started = started;
+            this.finished = finished;
         }
 
         public void start() {
@@ -141,6 +146,7 @@ public class AMQ4351Test extends BrokerT
         }
 
         public void stopAsync() {
+            finished.countDown();
             done.set(true);
         }
 
@@ -158,6 +164,7 @@ public class AMQ4351Test extends BrokerT
                 try {
                     Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                     MessageConsumer consumer = session.createDurableSubscriber(destination,
name, null, false);
+                    started.countDown();
                     while( !done.get() ) {
                         Message msg = consumer.receive(100);
                         if(msg!=null ) {
@@ -181,24 +188,28 @@ public class AMQ4351Test extends BrokerT
 
     public void testAMQ4351() throws InterruptedException, JMSException {
         LOG.info("Start test.");
+        int subs = 100;
+        CountDownLatch startedLatch = new CountDownLatch(subs - 1);
+        CountDownLatch shutdownLatch = new CountDownLatch(subs - 4);
+
 
         ProducingClient producer = new ProducingClient();
-        ConsumingClient listener1 = new ConsumingClient("subscriber-1");
-        ConsumingClient listener2 = new ConsumingClient("subscriber-2");
-        ConsumingClient listener3 = new ConsumingClient("subscriber-3");
+        ConsumingClient listener1 = new ConsumingClient("subscriber-1", startedLatch, shutdownLatch);
+        ConsumingClient listener2 = new ConsumingClient("subscriber-2", startedLatch, shutdownLatch);
+        ConsumingClient listener3 = new ConsumingClient("subscriber-3", startedLatch, shutdownLatch);
         try {
 
             listener1.start();
             listener2.start();
             listener3.start();
-            int subs = 100;
 
             List<ConsumingClient> subscribers = new ArrayList<ConsumingClient>(subs);
             for (int i = 4; i < subs; i++) {
-                ConsumingClient client = new ConsumingClient("subscriber-" + i);
+                ConsumingClient client = new ConsumingClient("subscriber-" + i, startedLatch,
shutdownLatch);
                 subscribers.add(client);
                 client.start();
             }
+            startedLatch.await(10, TimeUnit.SECONDS);
 
             LOG.info("All subscribers started.");
             producer.sendMessage();
@@ -207,12 +218,12 @@ public class AMQ4351Test extends BrokerT
             for (ConsumingClient client : subscribers) {
                 client.stopAsync();
             }
+            shutdownLatch.await(10, TimeUnit.SECONDS);
 
             // Start producing messages for 10 minutes, at high rate
             LOG.info("Starting mass message producer...");
             producer.start();
 
-
             long lastSize = listener1.size.get();
             for( int i=0 ; i < 10; i++ ) {
                 Thread.sleep(1000);



Mime
View raw message