activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject svn commit: r1511307 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ activemq-unit-tests/src/test/java/org/apache/activemq/bugs/ activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/
Date Wed, 07 Aug 2013 13:39:25 GMT
Author: gtully
Date: Wed Aug  7 13:39:24 2013
New Revision: 1511307

URL: http://svn.apache.org/r1511307
Log:
https://issues.apache.org/jira/browse/AMQ-4665 - fix auto ack on duplicate, now use poison
ack. For client ack allow replay after failover. Additional tests to validate new behaviour

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1511307&r1=1511306&r2=1511307&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Wed Aug  7 13:39:24 2013
@@ -121,7 +121,7 @@ public class ActiveMQMessageConsumer imp
     // The are the messages that were delivered to the consumer but that have
     // not been acknowledged. It's kept in reverse order since we
     // Always walk list in reverse order.
-    private final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
+    protected final LinkedList<MessageDispatch> deliveredMessages = new LinkedList<MessageDispatch>();
     // track duplicate deliveries in a transaction such that the tx integrity can be validated
     private PreviouslyDeliveredMap<MessageId, Boolean> previouslyDeliveredMessages;
     private int deliveredCounter;
@@ -143,7 +143,7 @@ public class ActiveMQMessageConsumer imp
     private final AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
     private ExecutorService executorService;
     private MessageTransformer transformer;
-    private boolean clearDispatchList;
+    private boolean clearDeliveredList;
     AtomicInteger inProgressClearRequiredFlag = new AtomicInteger(0);
 
     private MessageAck pendingAck;
@@ -704,7 +704,7 @@ public class ActiveMQMessageConsumer imp
     void inProgressClearRequired() {
         inProgressClearRequiredFlag.incrementAndGet();
         // deal with delivered messages async to avoid lock contention with in progress acks
-        clearDispatchList = true;
+        clearDeliveredList = true;
     }
 
     void clearMessagesInProgress() {
@@ -730,6 +730,7 @@ public class ActiveMQMessageConsumer imp
                 }
             }
         }
+        clearDeliveredList();
     }
 
     void deliverAcks() {
@@ -818,6 +819,9 @@ public class ActiveMQMessageConsumer imp
             if (!this.info.isBrowser()) {
                 for (MessageDispatch old : list) {
                     // ensure we don't filter this as a duplicate
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("on close, rollback: " + old.getMessage().getMessageId());
+                    }
                     session.connection.rollbackDuplicate(this, old.getMessage());
                 }
             }
@@ -838,7 +842,7 @@ public class ActiveMQMessageConsumer imp
      * broker to pull a message we are about to receive
      */
     protected void sendPullCommand(long timeout) throws JMSException {
-        clearDispatchList();
+        clearDeliveredList();
         if (info.getCurrentPrefetchSize() == 0 && unconsumedMessages.isEmpty()) {
             MessagePull messagePull = new MessagePull();
             messagePull.configure(info);
@@ -1010,6 +1014,9 @@ public class ActiveMQMessageConsumer imp
         // AMQ-3956 evaluate both expired and normal msgs as
         // otherwise consumer may get stalled
         if ((0.5 * info.getPrefetchSize()) <= (deliveredCounter + ackCounter - additionalWindowSize))
{
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ackLater: sending: " + pendingAck);
+            }
             session.sendAck(pendingAck);
             pendingAck=null;
             deliveredCounter = 0;
@@ -1025,7 +1032,7 @@ public class ActiveMQMessageConsumer imp
                 @Override
                 public void beforeEnd() throws Exception {
                     if (transactedIndividualAck) {
-                        clearDispatchList();
+                        clearDeliveredList();
                         waitForRedeliveries();
                         synchronized(deliveredMessages) {
                             rollbackOnFailedRecoveryRedelivery();
@@ -1058,7 +1065,7 @@ public class ActiveMQMessageConsumer imp
      * @throws JMSException
      */
     public void acknowledge() throws JMSException {
-        clearDispatchList();
+        clearDeliveredList();
         waitForRedeliveries();
         synchronized(deliveredMessages) {
             // Acknowledge all messages so far.
@@ -1162,7 +1169,7 @@ public class ActiveMQMessageConsumer imp
     }
 
     public void rollback() throws JMSException {
-        clearDispatchList();
+        clearDeliveredList();
         synchronized (unconsumedMessages.getMutex()) {
             if (optimizeAcknowledge) {
                 // remove messages read but not acked at the broker yet through
@@ -1301,6 +1308,9 @@ public class ActiveMQMessageConsumer imp
         if (previouslyDeliveredMessages != null) {
             for (Entry<MessageId, Boolean> entry: previouslyDeliveredMessages.entrySet())
{
                 if (!entry.getValue()) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("rollback non redelivered: " + entry.getKey());
+                    }
                     removeFromDeliveredMessages(entry.getKey());
                 }
             }
@@ -1338,7 +1348,7 @@ public class ActiveMQMessageConsumer imp
         MessageListener listener = this.messageListener.get();
         try {
             clearMessagesInProgress();
-            clearDispatchList();
+            clearDeliveredList();
             synchronized (unconsumedMessages.getMutex()) {
                 if (!unconsumedMessages.isClosed()) {
                     if (this.info.isBrowser() || !session.connection.isDuplicate(this, md.getMessage()))
{
@@ -1375,10 +1385,11 @@ public class ActiveMQMessageConsumer imp
                         }
                     } else {
                         if (!session.isTransacted()) {
-                            LOG.warn("Duplicate dispatch on connection: " + session.getConnection().getConnectionInfo().getConnectionId()
-                                    + " to consumer: "  + getConsumerId() + ", ignoring (auto
acking) duplicate: " + md);
-                            MessageAck ack = new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE,
1);
-                            session.sendAck(ack);
+                            LOG.warn("Duplicate non transacted dispatch to consumer: "  +
getConsumerId() + ", poison acking: " + md);
+                            MessageAck poisonAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE,
1);
+                            poisonAck.setFirstMessageId(md.getMessage().getMessageId());
+                            poisonAck.setPoisonCause(new Throwable("Duplicate non transacted
delivery to " + getConsumerId()));
+                            session.sendAck(poisonAck);
                         } else {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(getConsumerId() + " tracking transacted redelivery
of duplicate: " + md.getMessage());
@@ -1423,22 +1434,35 @@ public class ActiveMQMessageConsumer imp
     }
 
     // async (on next call) clear or track delivered as they may be flagged as duplicates
if they arrive again
-    private void clearDispatchList() {
-        if (clearDispatchList) {
+    private void clearDeliveredList() {
+        if (clearDeliveredList) {
             synchronized (deliveredMessages) {
-                if (clearDispatchList) {
+                if (clearDeliveredList) {
                     if (!deliveredMessages.isEmpty()) {
                         if (session.isTransacted()) {
-                            if (LOG.isDebugEnabled()) {
-                                LOG.debug(getConsumerId() + " tracking existing transacted
delivered list (" + deliveredMessages.size() + ") on transport interrupt");
-                            }
+
                             if (previouslyDeliveredMessages == null) {
                                 previouslyDeliveredMessages = new PreviouslyDeliveredMap<MessageId,
Boolean>(session.getTransactionContext().getTransactionId());
                             }
                             for (MessageDispatch delivered : deliveredMessages) {
                                 previouslyDeliveredMessages.put(delivered.getMessage().getMessageId(),
false);
                             }
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug(getConsumerId() + " tracking existing transacted
" + previouslyDeliveredMessages.transactionId +
+                                        " delivered list (" + deliveredMessages.size() +
") on transport interrupt");
+                            }
                         } else {
+                            if (session.isClientAcknowledge()) {
+                                if (LOG.isDebugEnabled()) {
+                                    LOG.debug(getConsumerId() + " rolling back delivered
list (" + deliveredMessages.size() + ") on transport interrupt");
+                                }
+                                // allow redelivery
+                                if (!this.info.isBrowser()) {
+                                    for (MessageDispatch md: deliveredMessages) {
+                                        this.session.connection.rollbackDuplicate(this, md.getMessage());
+                                    }
+                                }
+                            }
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(getConsumerId() + " clearing delivered list ("
+ deliveredMessages.size() + ") on transport interrupt");
                             }
@@ -1446,7 +1470,7 @@ public class ActiveMQMessageConsumer imp
                             pendingAck = null;
                         }
                     }
-                    clearDispatchList = false;
+                    clearDeliveredList = false;
                 }
             }
         }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=1511307&r1=1511306&r2=1511307&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
Wed Aug  7 13:39:24 2013
@@ -41,9 +41,6 @@ import org.apache.activemq.broker.region
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.util.LoggingBrokerPlugin;
 import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.leveldb.LevelDBStore;
-import org.apache.activemq.usage.MemoryUsage;
-import org.apache.activemq.usage.SystemUsage;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,7 +72,7 @@ public class AMQ2149Test extends AutoFai
     String brokerURL = DEFAULT_BROKER_URL;
     
     int numBrokerRestarts = 0;
-    final static int MAX_BROKER_RESTARTS = 3;
+    final static int MAX_BROKER_RESTARTS = 4;
     BrokerService broker;
     Vector<Throwable> exceptions = new Vector<Throwable>();
 
@@ -171,6 +168,7 @@ public class AMQ2149Test extends AutoFai
         }
         
         final int TRANSACITON_BATCH = 500;
+        boolean resumeOnNextOrPreviousIsOk = false;
         public void onMessage(Message message) {
             try {
                 final long seqNum = message.getLongProperty(SEQ_NUM_PROPERTY);
@@ -182,6 +180,16 @@ public class AMQ2149Test extends AutoFai
                         session.commit();
                     }
                 }
+                if (resumeOnNextOrPreviousIsOk) {
+                    // after an indoubt commit we need to accept what we get (within reason)
+                    if (seqNum != nextExpectedSeqNum) {
+                        if (seqNum == nextExpectedSeqNum - (TRANSACITON_BATCH -1)) {
+                            nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                            LOG.info("In doubt commit failed, getting replay at:" +  nextExpectedSeqNum);
+                        }
+                    }
+                    resumeOnNextOrPreviousIsOk = false;
+                }
                 if (seqNum != nextExpectedSeqNum) {
                     LOG.warn(dest + " received " + seqNum
                             + " in msg: " + message.getJMSMessageID()
@@ -196,8 +204,16 @@ public class AMQ2149Test extends AutoFai
                 lastId = message.getJMSMessageID();
             } catch (TransactionRolledBackException expectedSometimesOnFailoverRecovery)
{
                 LOG.info("got rollback: " + expectedSometimesOnFailoverRecovery);
-                // batch will be replayed
-                nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                if (expectedSometimesOnFailoverRecovery.getMessage().contains("completion
in doubt")) {
+                    // in doubt - either commit command or reply missing
+                    // don't know if we will get a replay
+                    resumeOnNextOrPreviousIsOk = true;
+                } else {
+                    resumeOnNextOrPreviousIsOk = false;
+                    // batch will be replayed
+                    nextExpectedSeqNum -= (TRANSACITON_BATCH -1);
+                }
+
             } catch (Throwable e) {
                 LOG.error(dest + " onMessage error", e);
                 exceptions.add(e);
@@ -327,7 +343,7 @@ public class AMQ2149Test extends AutoFai
     public void doTestTransactionalOrderWithRestart(byte destinationType) throws Exception
{
         numtoSend = 10000;
         sleepBetweenSend = 3;
-        brokerStopPeriod = 30 * 1000;
+        brokerStopPeriod = 10 * 1000;
               
         createBroker(new Configurer() {
             public void configure(BrokerService broker) throws Exception {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java?rev=1511307&r1=1511306&r2=1511307&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerOutstandingCommitTest.java
Wed Aug  7 13:39:24 2013
@@ -289,11 +289,14 @@ public class FailoverConsumerOutstanding
         assertTrue("commit done through failover", commitDoneLatch.await(20, TimeUnit.SECONDS));
         assertTrue("commit failed", gotCommitException.get());
         assertTrue("another message was received after failover", messagesReceived.await(20,
TimeUnit.SECONDS));
-        assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(0).getText());
-        // it was redelivered
-        assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(1).getText());
+        int receivedIndex = 0;
+        assertEquals("get message 0 first", MESSAGE_TEXT + "0", receivedMessages.get(receivedIndex++).getText());
+        if (!doActualBrokerCommit) {
+            // it will be redelivered and not tracked as a duplicate
+            assertEquals("get message 0 second", MESSAGE_TEXT + "0", receivedMessages.get(receivedIndex++).getText());
+        }
         assertTrue("another message was received", messagesReceived.await(20, TimeUnit.SECONDS));
-        assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(2).getText());
+        assertEquals("get message 1 eventually", MESSAGE_TEXT + "1", receivedMessages.get(receivedIndex++).getText());
 
         connection.close();
     }

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java?rev=1511307&r1=1511306&r2=1511307&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverConsumerUnconsumedTest.java
Wed Aug  7 13:39:24 2013
@@ -25,6 +25,8 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -96,6 +98,141 @@ public class FailoverConsumerUnconsumedT
     }
 
     @SuppressWarnings("unchecked")
+    @Test
+    public void testFailoverClientAckMissingRedelivery() throws Exception {
+
+        final int maxConsumers = 2;
+        broker = createBroker(true);
+
+        broker.setPlugins(new BrokerPlugin[] {
+                new BrokerPluginSupport() {
+                    int consumerCount;
+
+                    // broker is killed on x create consumer
+                    @Override
+                    public Subscription addConsumer(ConnectionContext context,
+                            final ConsumerInfo info) throws Exception {
+                         if (++consumerCount == maxConsumers) {
+                             context.setDontSendReponse(true);
+                             Executors.newSingleThreadExecutor().execute(new Runnable() {
+                                 public void run() {
+                                     LOG.info("Stopping broker on consumer: " + info.getConsumerId());
+                                     try {
+                                         broker.stop();
+                                     } catch (Exception e) {
+                                         e.printStackTrace();
+                                     }
+                                 }
+                             });
+                         }
+                        return super.addConsumer(context, info);
+                    }
+                }
+        });
+        broker.start();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url +
")");
+        cf.setWatchTopicAdvisories(false);
+
+        final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
+        connection.start();
+
+        final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        final Queue destination = consumerSession.createQueue(QUEUE_NAME + "?jms.consumer.prefetch="
+ prefetch);
+
+        final Vector<TestConsumer> testConsumers = new Vector<TestConsumer>();
+        TestConsumer testConsumer = new TestConsumer(consumerSession, destination, connection);
+        testConsumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("onMessage:" + message.getJMSMessageID());
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        testConsumers.add(testConsumer);
+
+
+        produceMessage(consumerSession, destination, maxConsumers * prefetch);
+
+        assertTrue("add messages are delivered", Wait.waitFor(new Wait.Condition() {
+            public boolean isSatisified() throws Exception {
+                int totalDelivered = 0;
+                for (TestConsumer testConsumer : testConsumers) {
+                    long delivered = testConsumer.deliveredSize();
+                    LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
+                    totalDelivered += delivered;
+                }
+                return totalDelivered == maxConsumers * prefetch;
+            }
+        }));
+
+        final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
+
+        Executors.newSingleThreadExecutor().execute(new Runnable() {
+            public void run() {
+                try {
+                    LOG.info("add last consumer...");
+                    TestConsumer testConsumer = new TestConsumer(consumerSession, destination,
connection);
+                    testConsumer.setMessageListener(new MessageListener() {
+                                @Override
+                                public void onMessage(Message message) {
+                                    try {
+                                        LOG.info("onMessage:" + message.getJMSMessageID());
+                                    } catch (JMSException e) {
+                                        e.printStackTrace();
+                                    }
+                                }
+                            });
+                    testConsumers.add(testConsumer);
+                    shutdownConsumerAdded.countDown();
+                    LOG.info("done add last consumer");
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        // will be stopped by the plugin
+        broker.waitUntilStopped();
+
+        broker = createBroker(false, this.url);
+        broker.start();
+
+        assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
+
+        // each should again get prefetch messages - all unacked deliveries should be rolledback
+        assertTrue("after restart all messages are re dispatched", Wait.waitFor(new Wait.Condition()
{
+            public boolean isSatisified() throws Exception {
+                int totalDelivered = 0;
+                for (TestConsumer testConsumer : testConsumers) {
+                    long delivered = testConsumer.deliveredSize();
+                    LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
+                    totalDelivered += delivered;
+                }
+                return totalDelivered == maxConsumers * prefetch;
+            }
+        }));
+
+        assertTrue("after restart each got prefetch amount", Wait.waitFor(new Wait.Condition()
{
+            public boolean isSatisified() throws Exception {
+                for (TestConsumer testConsumer : testConsumers) {
+                    long delivered = testConsumer.deliveredSize();
+                    LOG.info(testConsumer.getConsumerId() + " delivered: " + delivered);
+                    if (delivered != prefetch) {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }));
+
+        connection.close();
+    }
+
+    @SuppressWarnings("unchecked")
     public void doTestFailoverConsumerDups(final boolean watchTopicAdvisories) throws Exception
{
 
         final int maxConsumers = 4;
@@ -156,14 +293,14 @@ public class FailoverConsumerUnconsumedT
             }
         }));
 
-        final CountDownLatch commitDoneLatch = new CountDownLatch(1);
+        final CountDownLatch shutdownConsumerAdded = new CountDownLatch(1);
 
         Executors.newSingleThreadExecutor().execute(new Runnable() {
             public void run() {
                 try {
                     LOG.info("add last consumer...");
                     testConsumers.add(new TestConsumer(consumerSession, destination, connection));
-                    commitDoneLatch.countDown();
+                    shutdownConsumerAdded.countDown();
                     LOG.info("done add last consumer");
                 } catch (Exception e) {
                     e.printStackTrace();
@@ -190,7 +327,7 @@ public class FailoverConsumerUnconsumedT
         broker = createBroker(false, this.url);
         broker.start();
 
-        assertTrue("consumer added through failover", commitDoneLatch.await(30, TimeUnit.SECONDS));
+        assertTrue("consumer added through failover", shutdownConsumerAdded.await(30, TimeUnit.SECONDS));
 
         // each should again get prefetch messages - all unconsumed deliveries should be
rolledback
         assertTrue("after start all messages are re dispatched", Wait.waitFor(new Wait.Condition()
{
@@ -231,6 +368,10 @@ public class FailoverConsumerUnconsumedT
         public int unconsumedSize() {
             return unconsumedMessages.size();
         }
+
+        public int deliveredSize() {
+            return deliveredMessages.size();
+        }
     }
 
     static long idGen = 100;

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java?rev=1511307&r1=1511306&r2=1511307&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverDuplicateTest.java
Wed Aug  7 13:39:24 2013
@@ -203,7 +203,7 @@ public class FailoverDuplicateTest exten
         receiveConnection.close();
 
         // verify stats
-        assertEquals("expect all messages are dequeued with one duplicate", totalSent +1,
((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
+        assertEquals("expect all messages are dequeued with one duplicate to dlq", totalSent
+ 2, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount());
 
         Wait.waitFor(new Wait.Condition() {
             @Override
@@ -212,7 +212,7 @@ public class FailoverDuplicateTest exten
                 return  totalSent + 1 <= ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount();
             }
         });
-        assertEquals("dequeue correct, including duplicate dispatch auto acked", totalSent
 + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
+        assertEquals("dequeue correct, including duplicate dispatch poisoned", totalSent
 + 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount());
 
         // ensure no dangling messages with fresh broker etc
         broker.stop();



Mime
View raw message