activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r905641 - /activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Date Tue, 02 Feb 2010 14:18:55 GMT
Author: gtully
Date: Tue Feb  2 14:18:54 2010
New Revision: 905641

URL: http://svn.apache.org/viewvc?rev=905641&view=rev
Log:
merge -c 905640 https://svn.apache.org/repos/asf/activemq/trunk - tidy up test case for https://issues.apache.org/activemq/browse/AMQ-2590

Modified:
    activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=905641&r1=905640&r2=905641&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Tue Feb  2 14:18:54 2010
@@ -27,6 +27,7 @@
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
 import javax.jms.JMSException;
@@ -391,16 +392,16 @@
     @Test
     public void testFailoverConsumerAckLost() throws Exception {
         // as failure depends on hash order, do a few times
-        for (int i=0; i<3; i++) {
+        for (int i=0; i<4; i++) {
             try {
-                doTestFailoverConsumerAckLost();
+                doTestFailoverConsumerAckLost(i);
             } finally {
                 stopBroker();
             }
         }
     }
     
-    public void doTestFailoverConsumerAckLost() throws Exception {
+    public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception {
         final int adapter = 0;
         broker = createBroker(true);
         setPersistenceAdapter(adapter);
@@ -456,7 +457,7 @@
         
         final Vector<Message> receivedMessages = new Vector<Message>();
         final CountDownLatch commitDoneLatch = new CountDownLatch(1);
-        
+        final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false);
         Executors.newSingleThreadExecutor().execute(new Runnable() {   
             public void run() {
                 LOG.info("doing async commit after consume...");
@@ -465,12 +466,17 @@
                     LOG.info("consumer1 first attempt got message: " + msg);
                     receivedMessages.add(msg);
                     
-                    TimeUnit.SECONDS.sleep(7);
+                    // give some variance to the runs
+                    TimeUnit.SECONDS.sleep(pauseSeconds * 2);
                     
                     // should not get a second message as there are two messages and two
consumers
-                    // but with failover and unordered connection restore it can get the
second
-                    // message which could create a problem for a pending ack and also invalidate
-                    // the transaction in which the first was consumed and acked
+                    // and prefetch=1, but with failover and unordered connection restore
it can get the second
+                    // message.
+                    
+                    // For the transaction to complete it needs to get the same one or two
messages
+                    // again so that the acks line up.
+                    // If redelivery order is different, the commit should fail with an ex
+                    //
                     msg = consumer1.receive(5000);
                     LOG.info("consumer1 second attempt got message: " + msg);
                     if (msg != null) {
@@ -481,8 +487,9 @@
                     try {
                         consumerSession1.commit();
                     } catch (JMSException expectedSometimes) {
-                        LOG.info("got rollback ex on commit", expectedSometimes);
-                        if (expectedSometimes instanceof TransactionRolledBackException &&
receivedMessages.size() == 2) {
+                        LOG.info("got exception ex on commit", expectedSometimes);
+                        if (expectedSometimes instanceof TransactionRolledBackException)
{
+                            gotTransactionRolledBackException.set(true);
                             // ok, message one was not replayed so we expect the rollback
                         } else {
                             throw expectedSometimes;
@@ -506,24 +513,27 @@
 
         assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
         
-        // getting 2 is indicative of orderiing issue. a problem if dangling message found
after restart
         LOG.info("received message count: " + receivedMessages.size());
         
         // new transaction
-        Message msg = consumer1.receive(2000);
+        Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 :
20000);
         LOG.info("post: from consumer1 received: " + msg);
-        if (receivedMessages.size() == 1) {
-            assertNull("should be nothing left for consumer as recieve should have committed",
msg);
-        } else {
+        if (gotTransactionRolledBackException.get()) {
             assertNotNull("should be available again after commit rollback ex", msg);
+        } else {
+            assertNull("should be nothing left for consumer as recieve should have committed",
msg);
         }
         consumerSession1.commit();
         
-        // consumer2 should get other message
-        msg = consumer2.receive(5000);
-        LOG.info("post: from consumer2 received: " + msg);
-        assertNotNull("got second message on consumer2", msg);
-        consumerSession2.commit();
+        if (gotTransactionRolledBackException.get() ||
+                !gotTransactionRolledBackException.get() && receivedMessages.size()
== 1) {
+            // just one message successfully consumed or none consumed
+            // consumer2 should get other message
+            msg = consumer2.receive(10000);
+            LOG.info("post: from consumer2 received: " + msg);
+            assertNotNull("got second message on consumer2", msg);
+            consumerSession2.commit();
+        }
         
         for (Connection c: connections) {
             c.close();



Mime
View raw message