activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r990106 - /activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Date Fri, 27 Aug 2010 11:28:37 GMT
Author: gtully
Date: Fri Aug 27 11:28:37 2010
New Revision: 990106

URL: http://svn.apache.org/viewvc?rev=990106&view=rev
Log:
follow up to fix for https://issues.apache.org/activemq/browse/AMQ-2877 - one test depended
on the lack of pull replay to validate redeliver in a transaciton, re worked the test to make
it work with this change

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java?rev=990106&r1=990105&r2=990106&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
(original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java
Fri Aug 27 11:28:37 2010
@@ -807,7 +807,7 @@ public class FailoverTransactionTest ext
 
 
     public void testPoisonOnDeliveryWhilePending() throws Exception {
-        LOG.info("testWaitForMissingRedeliveries()");
+        LOG.info("testPoisonOnDeliveryWhilePending()");
         broker = createBroker(true);
         broker.start();
         ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")?jms.consumerFailoverRedeliveryWaitPeriod=10000");
@@ -825,41 +825,51 @@ public class FailoverTransactionTest ext
         }
         assertNotNull("got message just produced", msg);
 
+        // add another consumer into the mix that may get the message after restart
+        MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME
+ "?consumer.prefetchSize=1"));
+
         broker.stop();
         broker = createBroker(false);
         broker.start();
 
         final CountDownLatch commitDone = new CountDownLatch(1);
 
+        final Vector<Exception> exceptions = new Vector<Exception>();
 
-        // with prefetch=0, it will not get redelivered as there will not be another receive
-        // for this consumer. so it will block till it timeout with an exception
-        // will block pending re-deliveries
+        // commit may fail if other consumer gets the message on restart, it will be seen
a a duplicate on teh connection
+        // but with no transaciton and it pending on another consumer it will be posion
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 LOG.info("doing async commit...");
                 try {
                     consumerSession.commit();
-                } catch (JMSException ignored) {
+                } catch (JMSException ex) {
+                    exceptions.add(ex);                    
+                } finally {
                     commitDone.countDown();
                 }
             }
         });
 
-        // pull the pending message to this consumer where it will be poison as it is a duplicate
without a tx
-        MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME
+ "?consumer.prefetchSize=1"));
-        assertNull("consumer2 not get a message while pending to 1", consumer2.receive(2000));
+        assertNull("consumer2 not get a message while pending to 1 or consumed by 1", consumer2.receive(2000));
 
-        assertTrue("commit completed with ex", commitDone.await(15, TimeUnit.SECONDS));
-        assertNull("consumer should not get rolledback and non redelivered message", consumer.receive(5000));
-
-        // message should be in dlq
-        MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
-        TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
-        assertNotNull("found message in dlq", dlqMessage);
-        assertEquals("text matches", "Test message", dlqMessage.getText());
-        consumerSession.commit();
+        assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS));
 
+        // either message consumed or sent to dlq via poison on redelivery to wrong consumer
+        // message should not be available again in any event
+        assertNull("consumer should not get rolledback on non redelivered message or duplicate",
consumer.receive(5000));
+
+        // consumer replay is hashmap order dependent on a failover connection state recover
so need to deal with both cases
+        if (exceptions.isEmpty()) {
+            // commit succeeded, message was redelivered to the correct consumer after restart
so commit was fine
+        } else {
+            // message should be in dlq
+            MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ"));
+            TextMessage dlqMessage = (TextMessage) dlqConsumer.receive(5000);
+            assertNotNull("found message in dlq", dlqMessage);
+            assertEquals("text matches", "Test message", dlqMessage.getText());
+            consumerSession.commit();
+        }
         connection.close();
     }
 



Mime
View raw message