activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [activemq-artemis] branch master updated: ARTEMIS-2564 retryMessages incorrectly removes msgs
Date Tue, 03 Dec 2019 19:24:12 GMT
This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8614ca1  ARTEMIS-2564 retryMessages incorrectly removes msgs
     new 09e3b7b  This closes #2904
8614ca1 is described below

commit 8614ca1167ceaad3eacf9e48cce136b893fd7dad
Author: Justin Bertram <jbertram@apache.org>
AuthorDate: Mon Dec 2 14:02:16 2019 -0600

    ARTEMIS-2564 retryMessages incorrectly removes msgs
---
 .../artemis/core/server/impl/LastValueQueue.java   |  4 +-
 .../artemis/core/server/impl/QueueImpl.java        | 72 ++++++++++++----------
 .../integration/management/QueueControlTest.java   | 51 +++++++++++++++
 3 files changed, 92 insertions(+), 35 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 5f9c82a..63044b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -220,9 +220,9 @@ public class LastValueQueue extends QueueImpl {
       QueueIterateAction queueIterateAction = super.createDeleteMatchingAction(ackReason);
       return new QueueIterateAction() {
          @Override
-         public void actMessage(Transaction tx, MessageReference ref) throws Exception {
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
{
             removeIfCurrent(ref);
-            queueIterateAction.actMessage(tx, ref);
+            return queueIterateAction.actMessage(tx, ref);
          }
       };
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index f06e053..3f28b3a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -1928,17 +1928,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    QueueIterateAction createDeleteMatchingAction(AckReason ackReason) {
       return new QueueIterateAction() {
          @Override
-         public void actMessage(Transaction tx, MessageReference ref) throws Exception {
-            actMessage(tx, ref, true);
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
{
+            return actMessage(tx, ref, true);
          }
 
          @Override
-         public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences)
throws Exception {
+         public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences)
throws Exception {
             incDelivering(ref);
             acknowledge(tx, ref, ackReason, null);
             if (fromMessageReferences) {
                refRemoved(ref);
             }
+            return true;
          }
       };
    }
@@ -1956,23 +1957,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    private int iterQueue(final int flushLimit,
                                       final Filter filter1,
                                       QueueIterateAction messageAction) throws Exception
{
-      return iterQueue(flushLimit, filter1, messageAction, true);
-   }
-
-   /**
-    * This is a generic method for any method interacting on the Queue to move or delete
messages
-    * Instead of duplicate the feature we created an abstract class where you pass the logic
for
-    * each message.
-    *
-    * @param filter1
-    * @param messageAction
-    * @return
-    * @throws Exception
-    */
-   private int iterQueue(final int flushLimit,
-                                      final Filter filter1,
-                                      QueueIterateAction messageAction,
-                                      final boolean remove) throws Exception {
       int count = 0;
       int txCount = 0;
 
@@ -1996,8 +1980,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
                   }
 
                   if (filter1 == null || filter1.match(ref.getMessage())) {
-                     messageAction.actMessage(tx, ref);
-                     if (remove) {
+                     if (messageAction.actMessage(tx, ref)) {
                         iter.remove();
                      }
                      txCount++;
@@ -2393,7 +2376,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
       return iterQueue(flushLimit, filter, new QueueIterateAction() {
          @Override
-         public void actMessage(Transaction tx, MessageReference ref) throws Exception {
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
{
             boolean ignored = false;
 
             incDelivering(ref);
@@ -2414,6 +2397,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
                refRemoved(ref);
                //move(toAddress, tx, ref, false, rejectDuplicates);
             }
+
+            return true;
          }
       });
    }
@@ -2421,8 +2406,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    public int moveReferencesBetweenSnFQueues(final SimpleString queueSuffix) throws Exception
{
       return iterQueue(DEFAULT_FLUSH_LIMIT, null, new QueueIterateAction() {
          @Override
-         public void actMessage(Transaction tx, MessageReference ref) throws Exception {
-            moveBetweenSnFQueues(queueSuffix, tx, ref);
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
{
+            return moveBetweenSnFQueues(queueSuffix, tx, ref);
          }
       });
    }
@@ -2430,13 +2415,14 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    public synchronized int rerouteMessages(final SimpleString queueName, final Filter filter)
throws Exception {
       return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
          @Override
-         public void actMessage(Transaction tx, MessageReference ref) throws Exception {
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
{
             RoutingContext routingContext = new RoutingContextImpl(tx);
             routingContext.setAddress(server.locateQueue(queueName).getAddress());
             server.getPostOffice().getBinding(queueName).route(ref.getMessage(), routingContext);
             postOffice.processRoute(ref.getMessage(), routingContext, false);
+            return false;
          }
-      }, false);
+      });
    }
 
    @Override
@@ -2446,7 +2432,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
 
       return iterQueue(DEFAULT_FLUSH_LIMIT, filter, new QueueIterateAction() {
          @Override
-         public void actMessage(Transaction tx, MessageReference ref) throws Exception {
+         public boolean actMessage(Transaction tx, MessageReference ref) throws Exception
{
 
             String originalMessageAddress = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_ADDRESS);
             String originalMessageQueue = ref.getMessage().getAnnotationString(Message.HDR_ORIGINAL_QUEUE);
@@ -2474,7 +2460,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
                   move(SimpleString.toSimpleString(originalMessageAddress), tx, ref, false,
false);
                }
                refRemoved(ref);
+               return true;
             }
+
+            return false;
          }
       });
 
@@ -3137,7 +3126,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
    }
 
    @SuppressWarnings({"ArrayToString", "ArrayToStringConcatenation"})
-   private void moveBetweenSnFQueues(final SimpleString queueSuffix,
+   private boolean moveBetweenSnFQueues(final SimpleString queueSuffix,
                                      final Transaction tx,
                                      final MessageReference ref) throws Exception {
       Message copyMessage = makeCopy(ref, false, false);
@@ -3199,6 +3188,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
             deliverAsync();
          }
       });
+
+      return true;
    }
 
    private Pair<String, Binding> locateTargetBinding(SimpleString queueSuffix, Message
copyMessage, long oldQueueID) {
@@ -3909,10 +3900,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue
{
     */
    abstract class QueueIterateAction {
 
-      public abstract void actMessage(Transaction tx, MessageReference ref) throws Exception;
-
-      public void actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences)
throws Exception {
-         actMessage(tx, ref);
+      /**
+       *
+       * @param tx   the transaction which the message action should participate in
+       * @param ref  the message reference which the action should act upon
+       * @return     true if the action should result in the removal of the message from
the queue; false otherwise
+       * @throws Exception
+       */
+      public abstract boolean actMessage(Transaction tx, MessageReference ref) throws Exception;
+
+      /**
+       *
+       * @param tx   the transaction which the message action should participate in
+       * @param ref  the message reference which the action should act upon
+       * @param fromMessageReferences false if the queue's stats should *not* be updated
(e.g. paged or scheduled refs)
+       * @return     true if the action should result in the removal of the message from
the queue; false otherwise
+       * @throws Exception
+       */
+      public boolean actMessage(Transaction tx, MessageReference ref, boolean fromMessageReferences)
throws Exception {
+         return actMessage(tx, ref);
       }
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 4596558..d8c4746 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1073,6 +1073,57 @@ public class QueueControlTest extends ManagementTestBase {
       clientConsumer.close();
    }
 
+   @Test
+   public void testRetryMessageWithoutDLQ() throws Exception {
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString qName2 = new SimpleString("q2");
+      final SimpleString adName = new SimpleString("ad1");
+      final SimpleString adName2 = new SimpleString("ad2");
+      final String sampleText = "Put me on DLQ";
+
+      session.createQueue(adName, RoutingType.MULTICAST, qName, null, durable);
+      session.createQueue(adName2, RoutingType.MULTICAST, qName2, null, durable);
+
+      // Send message to queue.
+      ClientProducer producer = session.createProducer(adName);
+      producer.send(createTextMessage(session, sampleText));
+      ClientMessage m = createTextMessage(session, sampleText);
+      m.putStringProperty(Message.HDR_ORIGINAL_ADDRESS, adName2);
+      m.putStringProperty(Message.HDR_ORIGINAL_QUEUE, qName2);
+      producer.send(m);
+      session.start();
+
+      QueueControl queueControl = createManagementControl(adName, qName);
+      assertMessageMetrics(queueControl, 2, durable);
+
+      QueueControl queueControl2 = createManagementControl(adName2, qName2);
+      assertMessageMetrics(queueControl2, 0, durable);
+
+      queueControl.retryMessages();
+
+      Wait.assertTrue(() -> getMessageCount(queueControl) == 1, 2000, 100);
+      assertMessageMetrics(queueControl, 1, durable);
+
+      Wait.assertTrue(() -> getMessageCount(queueControl2) == 1, 2000, 100);
+      assertMessageMetrics(queueControl2, 1, durable);
+
+      ClientConsumer clientConsumer = session.createConsumer(qName);
+      ClientMessage clientMessage = clientConsumer.receive(500);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+      clientConsumer = session.createConsumer(qName2);
+      clientMessage = clientConsumer.receive(500);
+      Assert.assertNotNull(clientMessage);
+      clientMessage.acknowledge();
+
+      Assert.assertEquals(sampleText, clientMessage.getBodyBuffer().readString());
+
+      clientConsumer.close();
+   }
+
    /**
     * Test retry - get a diverted message from DLQ and put on original queue.
     */


Mime
View raw message