Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C7463F1A5 for ; Tue, 9 Apr 2013 17:05:47 +0000 (UTC) Received: (qmail 86018 invoked by uid 500); 9 Apr 2013 17:05:47 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 85974 invoked by uid 500); 9 Apr 2013 17:05:47 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 85966 invoked by uid 99); 9 Apr 2013 17:05:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Apr 2013 17:05:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Apr 2013 17:05:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 74E772388994; Tue, 9 Apr 2013 17:05:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1466131 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java Date: Tue, 09 Apr 2013 17:05:21 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130409170521.74E772388994@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Tue Apr 9 17:05:21 2013 New Revision: 1466131 URL: http://svn.apache.org/r1466131 Log: fix and test for: https://issues.apache.org/jira/browse/AMQ-4464 Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1466131&r1=1466130&r2=1466131&view=diff ============================================================================== --- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Tue Apr 9 17:05:21 2013 @@ -18,6 +18,7 @@ package org.apache.activemq; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -289,6 +290,7 @@ public class ActiveMQMessageConsumer imp return session.isDupsOkAcknowledge() && !getDestination().isQueue() ; } + @Override public StatsImpl getStats() { return stats; } @@ -380,6 +382,7 @@ public class ActiveMQMessageConsumer imp * @throws JMSException if the JMS provider fails to receive the next * message due to some internal error. */ + @Override public String getMessageSelector() throws JMSException { checkClosed(); return selector; @@ -394,6 +397,7 @@ public class ActiveMQMessageConsumer imp * listener due to some internal error. * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener) */ + @Override public MessageListener getMessageListener() throws JMSException { checkClosed(); return this.messageListener.get(); @@ -414,6 +418,7 @@ public class ActiveMQMessageConsumer imp * message due to some internal error. * @see javax.jms.MessageConsumer#getMessageListener */ + @Override public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { @@ -436,6 +441,7 @@ public class ActiveMQMessageConsumer imp } } + @Override public MessageAvailableListener getAvailableListener() { return availableListener; } @@ -445,6 +451,7 @@ public class ActiveMQMessageConsumer imp * message available so that the {@link MessageConsumer#receiveNoWait()} can * be called. */ + @Override public void setAvailableListener(MessageAvailableListener availableListener) { this.availableListener = availableListener; } @@ -514,6 +521,7 @@ public class ActiveMQMessageConsumer imp * @return the next message produced for this message consumer, or null if * this message consumer is concurrently closed */ + @Override public Message receive() throws JMSException { checkClosed(); checkMessageListener(); @@ -547,6 +555,7 @@ public class ActiveMQMessageConsumer imp } if (session.isClientAcknowledge()) { m.setAcknowledgeCallback(new Callback() { + @Override public void execute() throws Exception { session.checkClosed(); session.acknowledge(); @@ -554,6 +563,7 @@ public class ActiveMQMessageConsumer imp }); } else if (session.isIndividualAcknowledge()) { m.setAcknowledgeCallback(new Callback() { + @Override public void execute() throws Exception { session.checkClosed(); acknowledge(md); @@ -577,6 +587,7 @@ public class ActiveMQMessageConsumer imp * the timeout expires or this message consumer is concurrently * closed */ + @Override public Message receive(long timeout) throws JMSException { checkClosed(); checkMessageListener(); @@ -613,6 +624,7 @@ public class ActiveMQMessageConsumer imp * @throws JMSException if the JMS provider fails to receive the next * message due to some internal error. */ + @Override public Message receiveNoWait() throws JMSException { checkClosed(); checkMessageListener(); @@ -651,6 +663,7 @@ public class ActiveMQMessageConsumer imp * @throws JMSException if the JMS provider fails to close the consumer due * to some internal error. */ + @Override public void close() throws JMSException { if (!unconsumedMessages.isClosed()) { if (session.getTransactionContext().isInTransaction()) { @@ -743,6 +756,7 @@ public class ActiveMQMessageConsumer imp executorService = Executors.newSingleThreadExecutor(); } executorService.submit(new Runnable() { + @Override public void run() { try { session.sendAck(ackToSend,true); @@ -1197,6 +1211,10 @@ public class ActiveMQMessageConsumer imp // Adjust the window size. additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size()); redeliveryDelay = 0; + + deliveredCounter -= deliveredMessages.size(); + deliveredMessages.clear(); + } else { // only redelivery_ack after first delivery @@ -1213,8 +1231,14 @@ public class ActiveMQMessageConsumer imp final LinkedList pendingRedeliveries = new LinkedList(deliveredMessages); + Collections.reverse(pendingRedeliveries); + + deliveredCounter -= deliveredMessages.size(); + deliveredMessages.clear(); + // Start up the delivery again a little later. session.getScheduler().executeAfterDelay(new Runnable() { + @Override public void run() { try { if (!unconsumedMessages.isClosed()) { @@ -1236,9 +1260,13 @@ public class ActiveMQMessageConsumer imp unconsumedMessages.enqueueFirst(md); } + deliveredCounter -= deliveredMessages.size(); + deliveredMessages.clear(); + if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) { // Start up the delivery again a little later. session.getScheduler().executeAfterDelay(new Runnable() { + @Override public void run() { try { if (started.get()) { @@ -1254,8 +1282,6 @@ public class ActiveMQMessageConsumer imp } } } - deliveredCounter -= deliveredMessages.size(); - deliveredMessages.clear(); } } if (messageListener.get() != null) { @@ -1304,6 +1330,7 @@ public class ActiveMQMessageConsumer imp } } + @Override public void dispatch(MessageDispatch md) { MessageListener listener = this.messageListener.get(); try { Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java?rev=1466131&r1=1466130&r2=1466131&view=diff ============================================================================== --- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java (original) +++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java Tue Apr 9 17:05:21 2013 @@ -16,8 +16,11 @@ */ package org.apache.activemq.usecases; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.util.Iterator; import java.util.LinkedHashSet; import java.util.concurrent.TimeUnit; @@ -29,6 +32,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; @@ -78,6 +82,7 @@ public class NonBlockingConsumerRedelive assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages."); return received.size() == MSG_COUNT; @@ -91,6 +96,7 @@ public class NonBlockingConsumerRedelive assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages since rollback."); return received.size() == MSG_COUNT; @@ -107,6 +113,76 @@ public class NonBlockingConsumerRedelive } @Test + public void testMessageDeleiveredInCorrectOrder() throws Exception { + + final LinkedHashSet received = new LinkedHashSet(); + final LinkedHashSet beforeRollback = new LinkedHashSet(); + final LinkedHashSet afterRollback = new LinkedHashSet(); + + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + received.add(message); + } + }); + + sendMessages(); + + session.commit(); + connection.start(); + + assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages."); + return received.size() == MSG_COUNT; + } + } + )); + + beforeRollback.addAll(received); + received.clear(); + session.rollback(); + + assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", + Wait.waitFor(new Wait.Condition(){ + @Override + public boolean isSatisified() throws Exception { + LOG.info("Consumer has received " + received.size() + " messages since rollback."); + return received.size() == MSG_COUNT; + } + } + )); + + afterRollback.addAll(received); + received.clear(); + + assertEquals(beforeRollback.size(), afterRollback.size()); + assertEquals(beforeRollback, afterRollback); + + Iterator after = afterRollback.iterator(); + Iterator before = beforeRollback.iterator(); + + while (before.hasNext() && after.hasNext()) { + TextMessage original = (TextMessage) before.next(); + TextMessage rolledBack = (TextMessage) after.next(); + + int originalInt = Integer.parseInt(original.getText()); + int rolledbackInt = Integer.parseInt(rolledBack.getText()); + + assertEquals(originalInt, rolledbackInt); + } + + session.commit(); + } + + @Test public void testMessageDeleiveryDoesntStop() throws Exception { final LinkedHashSet received = new LinkedHashSet(); @@ -130,6 +206,7 @@ public class NonBlockingConsumerRedelive assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages."); return received.size() == MSG_COUNT; @@ -145,6 +222,7 @@ public class NonBlockingConsumerRedelive assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages since rollback."); return received.size() == MSG_COUNT * 2; @@ -182,6 +260,7 @@ public class NonBlockingConsumerRedelive assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages."); return received.size() == MSG_COUNT; @@ -194,6 +273,7 @@ public class NonBlockingConsumerRedelive assertFalse("Delayed redelivery test not expecting any messages yet.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { return received.size() > 0; } @@ -225,6 +305,7 @@ public class NonBlockingConsumerRedelive assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages."); return received.size() == MSG_COUNT; @@ -264,6 +345,7 @@ public class NonBlockingConsumerRedelive assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages since rollback."); return received.size() == MSG_COUNT; @@ -307,6 +389,7 @@ public class NonBlockingConsumerRedelive assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + received.size() + " messages."); return received.size() == MSG_COUNT; @@ -329,6 +412,7 @@ public class NonBlockingConsumerRedelive assertTrue("Post-Rollback expects to DLQ: " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition(){ + @Override public boolean isSatisified() throws Exception { LOG.info("Consumer has received " + dlqed.size() + " messages in DLQ."); return dlqed.size() == MSG_COUNT;