activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1466131 - in /activemq/trunk: activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
Date Tue, 09 Apr 2013 17:05:21 GMT
Author: tabish
Date: Tue Apr  9 17:05:21 2013
New Revision: 1466131

URL: http://svn.apache.org/r1466131
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4464

Modified:
    activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java

Modified: activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=1466131&r1=1466130&r2=1466131&view=diff
==============================================================================
--- activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++ activemq/trunk/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Tue Apr  9 17:05:21 2013
@@ -18,6 +18,7 @@ package org.apache.activemq;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -289,6 +290,7 @@ public class ActiveMQMessageConsumer imp
         return session.isDupsOkAcknowledge() && !getDestination().isQueue() ;
     }
 
+    @Override
     public StatsImpl getStats() {
         return stats;
     }
@@ -380,6 +382,7 @@ public class ActiveMQMessageConsumer imp
      * @throws JMSException if the JMS provider fails to receive the next
      *                 message due to some internal error.
      */
+    @Override
     public String getMessageSelector() throws JMSException {
         checkClosed();
         return selector;
@@ -394,6 +397,7 @@ public class ActiveMQMessageConsumer imp
      *                 listener due to some internal error.
      * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
      */
+    @Override
     public MessageListener getMessageListener() throws JMSException {
         checkClosed();
         return this.messageListener.get();
@@ -414,6 +418,7 @@ public class ActiveMQMessageConsumer imp
      *                 message due to some internal error.
      * @see javax.jms.MessageConsumer#getMessageListener
      */
+    @Override
     public void setMessageListener(MessageListener listener) throws JMSException {
         checkClosed();
         if (info.getPrefetchSize() == 0) {
@@ -436,6 +441,7 @@ public class ActiveMQMessageConsumer imp
         }
     }
 
+    @Override
     public MessageAvailableListener getAvailableListener() {
         return availableListener;
     }
@@ -445,6 +451,7 @@ public class ActiveMQMessageConsumer imp
      * message available so that the {@link MessageConsumer#receiveNoWait()} can
      * be called.
      */
+    @Override
     public void setAvailableListener(MessageAvailableListener availableListener) {
         this.availableListener = availableListener;
     }
@@ -514,6 +521,7 @@ public class ActiveMQMessageConsumer imp
      * @return the next message produced for this message consumer, or null if
      *         this message consumer is concurrently closed
      */
+    @Override
     public Message receive() throws JMSException {
         checkClosed();
         checkMessageListener();
@@ -547,6 +555,7 @@ public class ActiveMQMessageConsumer imp
         }
         if (session.isClientAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
+                @Override
                 public void execute() throws Exception {
                     session.checkClosed();
                     session.acknowledge();
@@ -554,6 +563,7 @@ public class ActiveMQMessageConsumer imp
             });
         } else if (session.isIndividualAcknowledge()) {
             m.setAcknowledgeCallback(new Callback() {
+                @Override
                 public void execute() throws Exception {
                     session.checkClosed();
                     acknowledge(md);
@@ -577,6 +587,7 @@ public class ActiveMQMessageConsumer imp
      *         the timeout expires or this message consumer is concurrently
      *         closed
      */
+    @Override
     public Message receive(long timeout) throws JMSException {
         checkClosed();
         checkMessageListener();
@@ -613,6 +624,7 @@ public class ActiveMQMessageConsumer imp
      * @throws JMSException if the JMS provider fails to receive the next
      *                 message due to some internal error.
      */
+    @Override
     public Message receiveNoWait() throws JMSException {
         checkClosed();
         checkMessageListener();
@@ -651,6 +663,7 @@ public class ActiveMQMessageConsumer imp
      * @throws JMSException if the JMS provider fails to close the consumer due
      *                 to some internal error.
      */
+    @Override
     public void close() throws JMSException {
         if (!unconsumedMessages.isClosed()) {
             if (session.getTransactionContext().isInTransaction()) {
@@ -743,6 +756,7 @@ public class ActiveMQMessageConsumer imp
                     executorService = Executors.newSingleThreadExecutor();
                 }
                 executorService.submit(new Runnable() {
+                    @Override
                     public void run() {
                         try {
                             session.sendAck(ackToSend,true);
@@ -1197,6 +1211,10 @@ public class ActiveMQMessageConsumer imp
                     // Adjust the window size.
                     additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());
                     redeliveryDelay = 0;
+
+                    deliveredCounter -= deliveredMessages.size();
+                    deliveredMessages.clear();
+
                 } else {
 
                     // only redelivery_ack after first delivery
@@ -1213,8 +1231,14 @@ public class ActiveMQMessageConsumer imp
                             final LinkedList<MessageDispatch> pendingRedeliveries =
                                 new LinkedList<MessageDispatch>(deliveredMessages);
 
+                            Collections.reverse(pendingRedeliveries);
+
+                            deliveredCounter -= deliveredMessages.size();
+                            deliveredMessages.clear();
+
                             // Start up the delivery again a little later.
                             session.getScheduler().executeAfterDelay(new Runnable() {
+                                @Override
                                 public void run() {
                                     try {
                                         if (!unconsumedMessages.isClosed()) {
@@ -1236,9 +1260,13 @@ public class ActiveMQMessageConsumer imp
                             unconsumedMessages.enqueueFirst(md);
                         }
 
+                        deliveredCounter -= deliveredMessages.size();
+                        deliveredMessages.clear();
+
                         if (redeliveryDelay > 0 && !unconsumedMessages.isClosed())
{
                             // Start up the delivery again a little later.
                             session.getScheduler().executeAfterDelay(new Runnable() {
+                                @Override
                                 public void run() {
                                     try {
                                         if (started.get()) {
@@ -1254,8 +1282,6 @@ public class ActiveMQMessageConsumer imp
                         }
                     }
                 }
-                deliveredCounter -= deliveredMessages.size();
-                deliveredMessages.clear();
             }
         }
         if (messageListener.get() != null) {
@@ -1304,6 +1330,7 @@ public class ActiveMQMessageConsumer imp
         }
     }
 
+    @Override
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener.get();
         try {

Modified: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java?rev=1466131&r1=1466130&r2=1466131&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
(original)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/NonBlockingConsumerRedeliveryTest.java
Tue Apr  9 17:05:21 2013
@@ -16,8 +16,11 @@
  */
 package org.apache.activemq.usecases;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.concurrent.TimeUnit;
 
@@ -29,6 +32,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -78,6 +82,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages.");
                     return received.size() == MSG_COUNT;
@@ -91,6 +96,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages since
rollback.");
                     return received.size() == MSG_COUNT;
@@ -107,6 +113,76 @@ public class NonBlockingConsumerRedelive
     }
 
     @Test
+    public void testMessageDeleiveredInCorrectOrder() throws Exception {
+
+        final LinkedHashSet<Message> received = new LinkedHashSet<Message>();
+        final LinkedHashSet<Message> beforeRollback = new LinkedHashSet<Message>();
+        final LinkedHashSet<Message> afterRollback = new LinkedHashSet<Message>();
+
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+        Destination destination = session.createQueue(destinationName);
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                received.add(message);
+            }
+        });
+
+        sendMessages();
+
+        session.commit();
+        connection.start();
+
+        assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
+            Wait.waitFor(new Wait.Condition(){
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.info("Consumer has received " + received.size() + " messages.");
+                    return received.size() == MSG_COUNT;
+                }
+            }
+        ));
+
+        beforeRollback.addAll(received);
+        received.clear();
+        session.rollback();
+
+        assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
+            Wait.waitFor(new Wait.Condition(){
+                @Override
+                public boolean isSatisified() throws Exception {
+                    LOG.info("Consumer has received " + received.size() + " messages since
rollback.");
+                    return received.size() == MSG_COUNT;
+                }
+            }
+        ));
+
+        afterRollback.addAll(received);
+        received.clear();
+
+        assertEquals(beforeRollback.size(), afterRollback.size());
+        assertEquals(beforeRollback, afterRollback);
+
+        Iterator<Message> after = afterRollback.iterator();
+        Iterator<Message> before = beforeRollback.iterator();
+
+        while (before.hasNext() && after.hasNext()) {
+            TextMessage original = (TextMessage) before.next();
+            TextMessage rolledBack = (TextMessage) after.next();
+
+            int originalInt = Integer.parseInt(original.getText());
+            int rolledbackInt = Integer.parseInt(rolledBack.getText());
+
+            assertEquals(originalInt, rolledbackInt);
+        }
+
+        session.commit();
+    }
+
+    @Test
     public void testMessageDeleiveryDoesntStop() throws Exception {
 
         final LinkedHashSet<Message> received = new LinkedHashSet<Message>();
@@ -130,6 +206,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages.");
                     return received.size() == MSG_COUNT;
@@ -145,6 +222,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages since
rollback.");
                     return received.size() == MSG_COUNT * 2;
@@ -182,6 +260,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages.");
                     return received.size() == MSG_COUNT;
@@ -194,6 +273,7 @@ public class NonBlockingConsumerRedelive
 
         assertFalse("Delayed redelivery test not expecting any messages yet.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     return received.size() > 0;
                 }
@@ -225,6 +305,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages.");
                     return received.size() == MSG_COUNT;
@@ -264,6 +345,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Post-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages since
rollback.");
                     return received.size() == MSG_COUNT;
@@ -307,6 +389,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Pre-Rollback expects to receive: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + received.size() + " messages.");
                     return received.size() == MSG_COUNT;
@@ -329,6 +412,7 @@ public class NonBlockingConsumerRedelive
 
         assertTrue("Post-Rollback expects to DLQ: " + MSG_COUNT + " messages.",
             Wait.waitFor(new Wait.Condition(){
+                @Override
                 public boolean isSatisified() throws Exception {
                     LOG.info("Consumer has received " + dlqed.size() + " messages in DLQ.");
                     return dlqed.size() == MSG_COUNT;



Mime
View raw message