activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1434956 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java
Date Thu, 17 Jan 2013 23:21:55 GMT
Author: tabish
Date: Thu Jan 17 23:21:54 2013
New Revision: 1434956

URL: http://svn.apache.org/viewvc?rev=1434956&view=rev
Log:
apply patch for: https://issues.apache.org/jira/browse/AMQ-4260

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java?rev=1434956&r1=1434955&r2=1434956&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupLateArrivalsTest.java
Thu Jan 17 23:21:54 2013
@@ -16,18 +16,23 @@
  */
 package org.apache.activemq.usecases;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+
 import javax.jms.Connection;
 import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+
 import junit.framework.Test;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -38,7 +43,6 @@ import org.apache.activemq.command.Activ
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 public class MessageGroupLateArrivalsTest extends JmsTestSupport {
     public static final Logger log = LoggerFactory.getLogger(MessageGroupLateArrivalsTest.class);
     protected Connection connection;
@@ -60,6 +64,7 @@ public class MessageGroupLateArrivalsTes
         junit.textui.TestRunner.run(suite());
     }
 
+    @Override
     public void setUp() throws Exception {
         broker = createBroker();
         broker.start();
@@ -71,6 +76,7 @@ public class MessageGroupLateArrivalsTes
         connection.start();
     }
 
+    @Override
     protected BrokerService createBroker() throws Exception {
         BrokerService service = new BrokerService();
         service.setPersistent(false);
@@ -86,6 +92,7 @@ public class MessageGroupLateArrivalsTes
         return service;
     }
 
+    @Override
     public void tearDown() throws Exception {
         producer.close();
         session.close();
@@ -99,20 +106,25 @@ public class MessageGroupLateArrivalsTes
         int[] counters = {perBatch, perBatch, perBatch};
 
         CountDownLatch startSignal = new CountDownLatch(0);
-        CountDownLatch doneSignal = new CountDownLatch(1);
+        CountDownLatch doneSignal = new CountDownLatch(3);
+        CountDownLatch worker1Started = new CountDownLatch(1);
+        CountDownLatch worker2Started = new CountDownLatch(1);
+        CountDownLatch worker3Started = new CountDownLatch(1);
 
         messageCount.put("worker1", 0);
         messageGroups.put("worker1", new HashSet<String>());
-        Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal,
counters, messageCount, messageGroups);
+        Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal,
counters, messageCount, messageGroups, worker1Started);
         messageCount.put("worker2", 0);
         messageGroups.put("worker2", new HashSet<String>());
-        Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal,
counters, messageCount, messageGroups);
+        Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal,
counters, messageCount, messageGroups, worker2Started);
         messageCount.put("worker3", 0);
         messageGroups.put("worker3", new HashSet<String>());
-        Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal,
counters, messageCount, messageGroups);
+        Worker worker3 = new Worker(connection, destination, "worker3", startSignal, doneSignal,
counters, messageCount, messageGroups, worker3Started);
 
         new Thread(worker1).start();
         new Thread(worker2).start();
+        worker1Started.await();
+        worker2Started.await();
 
         for (int i = 0; i < perBatch; i++) {
             Message msga = session.createTextMessage("hello a");
@@ -128,11 +140,7 @@ public class MessageGroupLateArrivalsTes
         new Thread(worker3).start();
 
         // wait for presence before new group
-        TimeUnit.SECONDS.sleep(4);
-
-        // ensure worker 3 is not next in line with normal dispatch
-        //Message msga = session.createTextMessage("hello to who ever is next in line");
-        //producer.send(msga);
+        worker3Started.await();
 
         for (int i = 0; i < perBatch; i++) {
             Message msgc = session.createTextMessage("hello c");
@@ -142,8 +150,13 @@ public class MessageGroupLateArrivalsTes
 
         doneSignal.await();
 
-        for (String worker : messageCount.keySet()) {
+        List<String> workers = new ArrayList<String>(messageCount.keySet());
+        Collections.sort(workers);
+        for (String worker : workers) {
             log.info("worker " + worker + " received " + messageCount.get(worker) + " messages
from groups " + messageGroups.get(worker));
+        }
+
+        for (String worker : workers) {
             assertEquals("worker " + worker + " received " + messageCount.get(worker) + "
messages from groups " + messageGroups.get(worker)
                     , perBatch, messageCount.get(worker).intValue());
             assertEquals("worker " + worker + " received " + messageCount.get(worker) + "
messages from groups " + messageGroups.get(worker)
@@ -157,14 +170,16 @@ public class MessageGroupLateArrivalsTes
         int[] counters = {perBatch, perBatch, perBatch};
 
         CountDownLatch startSignal = new CountDownLatch(0);
-        CountDownLatch doneSignal = new CountDownLatch(1);
+        CountDownLatch doneSignal = new CountDownLatch(2);
+        CountDownLatch worker1Started = new CountDownLatch(1);
+        CountDownLatch worker2Started = new CountDownLatch(1);
 
         messageCount.put("worker1", 0);
         messageGroups.put("worker1", new HashSet<String>());
-        Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal,
counters, messageCount, messageGroups);
+        Worker worker1 = new Worker(connection, destination, "worker1", startSignal, doneSignal,
counters, messageCount, messageGroups, worker1Started);
         messageCount.put("worker2", 0);
         messageGroups.put("worker2", new HashSet<String>());
-        Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal,
counters, messageCount, messageGroups);
+        Worker worker2 = new Worker(connection, destination, "worker2", startSignal, doneSignal,
counters, messageCount, messageGroups, worker2Started);
 
         new Thread(worker1).start();
 
@@ -182,7 +197,7 @@ public class MessageGroupLateArrivalsTes
         new Thread(worker2).start();
 
         // wait for presence before new group
-        TimeUnit.SECONDS.sleep(4);
+        worker2Started.await();
 
         for (int i = 0; i < perBatch; i++) {
             Message msgc = session.createTextMessage("hello a");
@@ -211,12 +226,13 @@ public class MessageGroupLateArrivalsTes
         private String workerName = null;
         private CountDownLatch startSignal = null;
         private CountDownLatch doneSignal = null;
+        private CountDownLatch workerStarted = null;
         private int[] counters = null;
-        private HashMap<String, Integer> messageCount;
-        private HashMap<String, Set<String>> messageGroups;
-
+        private final HashMap<String, Integer> messageCount;
+        private final HashMap<String, Set<String>> messageGroups;
 
-        private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch
startSignal, CountDownLatch doneSignal, int[] counters, HashMap<String, Integer> messageCount,
HashMap<String, Set<String>> messageGroups) {
+        private Worker(Connection connection, Destination queueName, String workerName, CountDownLatch
startSignal, CountDownLatch doneSignal,
+                       int[] counters, HashMap<String, Integer> messageCount, HashMap<String,
Set<String>> messageGroups, CountDownLatch workerStarted) {
             this.connection = connection;
             this.queueName = queueName;
             this.workerName = workerName;
@@ -225,6 +241,7 @@ public class MessageGroupLateArrivalsTes
             this.counters = counters;
             this.messageCount = messageCount;
             this.messageGroups = messageGroups;
+            this.workerStarted = workerStarted;
         }
 
         private void update(String group) {
@@ -235,6 +252,7 @@ public class MessageGroupLateArrivalsTes
             messageGroups.put(workerName, groups);
         }
 
+        @Override
         public void run() {
 
             try {
@@ -242,6 +260,7 @@ public class MessageGroupLateArrivalsTes
                 log.info(workerName);
                 Session sess = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
                 MessageConsumer consumer = sess.createConsumer(queueName);
+                workerStarted.countDown();
 
                 while (true) {
                     if (counters[0] == 0 && counters[1] == 0 && counters[2]
== 0) {
@@ -257,20 +276,17 @@ public class MessageGroupLateArrivalsTes
                     msg.acknowledge();
 
                     String group = msg.getStringProperty("JMSXGroupID");
-                    boolean first = msg.getBooleanProperty("JMSXGroupFirstForConsumer");
+                    msg.getBooleanProperty("JMSXGroupFirstForConsumer");
 
                     if ("A".equals(group)) {
                         --counters[0];
                         update(group);
-                        //Thread.sleep(500);
                     } else if ("B".equals(group)) {
                         --counters[1];
                         update(group);
-                        //Thread.sleep(100);
                     } else if ("C".equals(group)) {
                         --counters[2];
                         update(group);
-                        //Thread.sleep(10);
                     } else {
                         log.warn(workerName + ", unknown group");
                     }



Mime
View raw message