activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [3/4] activemq-artemis git commit: ARTEMIS-252 added jmx operations to retry messages
Date Mon, 12 Oct 2015 21:04:19 GMT
ARTEMIS-252 added jmx operations to retry messages


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7afe8799
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7afe8799
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7afe8799

Branch: refs/heads/master
Commit: 7afe87996bcebe47247a6cc30179cd8cfacb193a
Parents: 78410bc
Author: Petter Nordlander <nordlander.petter@gmail.com>
Authored: Sat Oct 10 16:58:04 2015 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Oct 12 17:03:42 2015 -0400

----------------------------------------------------------------------
 .../api/core/management/QueueControl.java       |  22 ++++
 .../core/management/impl/QueueControlImpl.java  |  50 ++++++++
 .../management/QueueControlTest.java            | 118 +++++++++++++++++++
 .../management/QueueControlUsingCoreTest.java   |   8 ++
 4 files changed, 198 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afe8799/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 8d211ca..7d325ec 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -216,6 +216,28 @@ public interface QueueControl {
    @Operation(desc = "Remove the message corresponding to the given messageID", impact =
MBeanOperationInfo.ACTION)
    boolean expireMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID)
throws Exception;
 
+
+   /**
+    * Retries the message corresponding to the given messageID to the original queue.
+    * This is appropriate on dead messages on Dead letter queues only.
+    *
+    * @param messageID
+    * @return {@code true} if the message was retried, {@code false} else
+    * @throws Exception
+    */
+   @Operation(desc = "Retry the message corresponding to the given messageID to the original
queue", impact = MBeanOperationInfo.ACTION)
+   boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID)
throws Exception;
+
+   /**
+    * Retries all messages on a DLQ to their respective original queues.
+    * This is appropriate on dead messages on Dead letter queues only.
+    *
+    * @return the number of retried messages.
+    * @throws Exception
+    */
+   @Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact
= MBeanOperationInfo.ACTION)
+   int retryMessages() throws Exception;
+
    /**
     * Moves the message corresponding to the specified message ID to the specified other
queue.
     *

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afe8799/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 7d4aaaf..1260169 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -551,6 +552,55 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
       }
    }
 
+   public boolean retryMessage(final long messageID) throws Exception {
+
+      checkStarted();
+      clearIO();
+
+      try {
+         MessageReference message = queue.getReference(messageID);
+         if ( message == null ) {
+            return false;
+         }
+         else {
+            final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
+            if (originalAddress != null) {
+               return queue.moveReference(messageID, new SimpleString(originalAddress));
+            }
+         }
+      }
+      finally {
+         blockOnIO();
+      }
+
+      return false;
+   }
+
+   public int retryMessages() throws Exception {
+      checkStarted();
+      clearIO();
+
+      int retriedMessages = 0;
+      try {
+         Iterator<MessageReference> messageIterator = queue.totalIterator();
+         while (messageIterator.hasNext()) {
+            MessageReference message = messageIterator.next();
+            // Will only try messages with Message.HDR_ORIGINAL_ADDRESS set.
+            final String originalAddress = message.getMessage().getStringProperty(Message.HDR_ORIGINAL_ADDRESS);
+            final long messageID = message.getMessage().getMessageID();
+            if ( originalAddress != null) {
+               if ( queue.moveReference(messageID, new SimpleString(originalAddress))) {
+                  retriedMessages++;
+               }
+            }
+         }
+      }
+      finally {
+         blockOnIO();
+      }
+      return retriedMessages;
+   }
+
    public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
       return moveMessage(messageID, otherQueueName, false);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afe8799/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 0de4047..b968a03 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -748,6 +748,118 @@ public class QueueControlTest extends ManagementTestBase {
    }
 
    /**
+    * Test retry - get a message from DLQ and put on original queue.
+    */
+   @Test
+   public void testRetryMessage() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString adName = new SimpleString("ad1");
+      final SimpleString dlq = new SimpleString("DLQ1");
+      final String sampleText = "Put me on DLQ";
+
+      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
+      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
+
+      session.createQueue(dla, dlq, null, false);
+      session.createQueue(adName, qName, null, false);
+
+      // Send message to queue.
+      ClientProducer producer = session.createProducer(adName);
+      producer.send(createTextMessage(session, sampleText));
+      session.start();
+
+      ClientConsumer clientConsumer = session.createConsumer(qName);
+      ClientMessage clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+
+      // force a rollback to DLQ
+      session.rollback();
+      clientMessage = clientConsumer.receiveImmediate();
+      Assert.assertNull(clientMessage);
+
+      QueueControl queueControl = createManagementControl(dla, dlq);
+      Assert.assertEquals(1, getMessageCount(queueControl));
+      final long messageID = getFirstMessageId(queueControl);
+
+      // Retry the message - i.e. it should go from DLQ to original Queue.
+      Assert.assertTrue(queueControl.retryMessage(messageID));
+
+      // Assert DLQ is empty...
+      Assert.assertEquals(0, getMessageCount(queueControl));
+
+      // .. and that the message is now on the original queue once more.
+      clientMessage = clientConsumer.receive(500);
+      clientMessage.acknowledge();
+      Assert.assertNotNull(clientMessage);
+
+      Assert.assertEquals(clientMessage.getBodyBuffer().readString(), "Put me on DLQ!");
+
+      clientConsumer.close();
+   }
+
+   /**
+    * Test retry multiple messages from  DLQ to original queue.
+    */
+   @Test
+   public void testRetryMultipleMessages() throws Exception {
+      final SimpleString dla = new SimpleString("DLA");
+      final SimpleString qName = new SimpleString("q1");
+      final SimpleString adName = new SimpleString("ad1");
+      final SimpleString dlq = new SimpleString("DLQ1");
+      final String sampleText = "Put me on DLQ";
+      final int numMessagesToTest = 10;
+
+      AddressSettings addressSettings = new AddressSettings().setMaxDeliveryAttempts(1).setDeadLetterAddress(dla);
+      server.getAddressSettingsRepository().addMatch(adName.toString(), addressSettings);
+
+      session.createQueue(dla, dlq, null, false);
+      session.createQueue(adName, qName, null, false);
+
+      // Send message to queue.
+      ClientProducer producer = session.createProducer(adName);
+      for (int i = 0; i < numMessagesToTest; i++) {
+         producer.send(createTextMessage(session, sampleText));
+      }
+
+      session.start();
+
+      // Read and rollback all messages to DLQ
+      ClientConsumer clientConsumer = session.createConsumer(qName);
+      for (int i = 0; i < numMessagesToTest; i++) {
+         ClientMessage clientMessage = clientConsumer.receive(500);
+         clientMessage.acknowledge();
+         Assert.assertNotNull(clientMessage);
+         Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+         session.rollback();
+      }
+
+      Assert.assertNull(clientConsumer.receiveImmediate());
+
+      QueueControl dlqQueueControl = createManagementControl(dla, dlq);
+      Assert.assertEquals(numMessagesToTest, getMessageCount(dlqQueueControl));
+
+      // Retry all messages - i.e. they should go from DLQ to original Queue.
+      Assert.assertEquals(numMessagesToTest, dlqQueueControl.retryMessages());
+
+      // Assert DLQ is empty...
+      Assert.assertEquals(0, getMessageCount(dlqQueueControl));
+
+      // .. and that the messages is now on the original queue once more.
+      for (int i = 0; i < numMessagesToTest; i++) {
+         ClientMessage clientMessage = clientConsumer.receive(500);
+         clientMessage.acknowledge();
+         Assert.assertNotNull(clientMessage);
+         Assert.assertEquals(clientMessage.getBodyBuffer().readString(), sampleText);
+      }
+
+      clientConsumer.close();
+   }
+
+   /**
     * <ol>
     * <li>send a message to queue</li>
     * <li>move all messages from queue to otherQueue using management method</li>
@@ -1930,4 +2042,10 @@ public class QueueControlTest extends ManagementTestBase {
 
       return queueControl;
    }
+
+   protected long getFirstMessageId(final QueueControl queueControl) throws Exception {
+      Map<String, Object>[] messages = queueControl.listMessages(null);
+      long messageID = (Long) messages[0].get("messageID");
+      return messageID;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afe8799/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index d296819..2b75f12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -229,6 +229,14 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
             return (Boolean) proxy.invokeOperation("moveMessage", messageID, otherQueueName,
rejectDuplicates);
          }
 
+         public boolean retryMessage(final long messageID) throws Exception {
+            return (Boolean) proxy.invokeOperation("retryMessage", messageID);
+         }
+
+         public int retryMessages() throws Exception {
+            return (Integer) proxy.invokeOperation("retryMessages");
+         }
+
          public int removeMessages(final String filter) throws Exception {
             return (Integer) proxy.invokeOperation("removeMessages", filter);
          }


Mime
View raw message