activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [2/4] activemq-artemis git commit: ARTEMIS-252 Added support to retry messages via JMX on JMS Queue interface
Date Mon, 12 Oct 2015 21:04:18 GMT
ARTEMIS-252 Added support to retry messages via JMX on JMS Queue interface


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/98917259
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/98917259
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/98917259

Branch: refs/heads/master
Commit: 989172596ee41c77fc5e6cd40c9bdbed89a61a90
Parents: 7afe879
Author: Petter Nordlander <nordlander.petter@gmail.com>
Authored: Sun Oct 11 19:34:23 2015 +0200
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Mon Oct 12 17:03:42 2015 -0400

----------------------------------------------------------------------
 .../api/jms/management/JMSQueueControl.java     | 21 ++++++
 .../management/impl/JMSQueueControlImpl.java    | 19 +++++
 .../server/management/JMSQueueControlTest.java  | 78 ++++++++++++++++++++
 .../management/JMSQueueControlUsingJMSTest.java | 13 ++++
 4 files changed, 131 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
index 43e20ab..246fe7a 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
@@ -228,6 +228,27 @@ public interface JMSQueueControl extends DestinationControl {
                     @Parameter(name = "rejectDuplicates", desc = "Reject messages identified
as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;
 
    /**
+    * Retries the message corresponding to the given messageID to the original queue.
+    * This is appropriate on dead messages on Dead letter queues only.
+    *
+    * @param messageID
+    * @return {@code true} if the message was retried, {@code false} else
+    * @throws Exception
+    */
+   @Operation(desc = "Retry the message corresponding to the given messageID to the original
queue", impact = MBeanOperationInfo.ACTION)
+   boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID") String messageID)
throws Exception;
+
+   /**
+    * Retries all messages on a DLQ to their respective original queues.
+    * This is appropriate on dead messages on Dead letter queues only.
+    *
+    * @return the number of retried messages.
+    * @throws Exception
+    */
+   @Operation(desc = "Retry all messages on a DLQ to their respective original queues", impact
= MBeanOperationInfo.ACTION)
+   int retryMessages() throws Exception;
+
+   /**
     * Lists the message counter for this queue.
     */
    @Operation(desc = "List the message counters", impact = MBeanOperationInfo.INFO)

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
index ed2d922..ff7a387 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
@@ -275,6 +275,25 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
       return coreQueueControl.changeMessagesPriority(filter, newPriority);
    }
 
+   public boolean retryMessage(final String jmsMessageID) throws Exception {
+
+      // Figure out messageID from JMSMessageID.
+      final String filter = createFilterForJMSMessageID(jmsMessageID);
+      Map<String,Object>[] messages = coreQueueControl.listMessages(filter);
+      if ( messages.length != 1) { // if no messages. There should not be more than one,
JMSMessageID should be unique.
+         return false;
+      }
+
+      final Map<String,Object> messageToRedeliver = messages[0];
+      Long messageID = (Long)messageToRedeliver.get("messageID");
+      return messageID != null && coreQueueControl.retryMessage(messageID);
+   }
+
+   public int retryMessages() throws Exception {
+      return coreQueueControl.retryMessages();
+   }
+
+
    public boolean moveMessage(final String messageID, final String otherQueueName) throws
Exception {
       return moveMessage(messageID, otherQueueName, false);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
index b98a165..b5183ca 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlTest.java
@@ -792,6 +792,84 @@ public class JMSQueueControlTest extends ManagementTestBase {
       connection.close();
    }
 
+
+   protected ActiveMQQueue createDLQ(final String deadLetterQueueName) throws Exception {
+      serverManager.createQueue(false, deadLetterQueueName, null, true, deadLetterQueueName);
+      return (ActiveMQQueue) ActiveMQJMSClient.createQueue(deadLetterQueueName);
+   }
+
+   protected ActiveMQQueue createTestQueueWithDLQ(final String queueName, final ActiveMQQueue
dlq) throws Exception {
+      serverManager.createQueue(false,queueName,null,true,queueName);
+      ActiveMQQueue testQueue = (ActiveMQQueue) ActiveMQJMSClient.createQueue(queueName);
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setDeadLetterAddress(new SimpleString(dlq.getAddress()));
+      addressSettings.setMaxDeliveryAttempts(1);
+      server.getAddressSettingsRepository().addMatch(testQueue.getAddress(), addressSettings);
+      return testQueue;
+   }
+
+   /**
+    * Test retrying all messages put on DLQ - i.e. they should appear on the original queue.
+    * @throws Exception
+    */
+   @Test
+   public void testRetryMessages() throws Exception {
+      ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
+      ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
+
+      final int numMessagesToTest = 10;
+      JMSUtil.sendMessages(testQueue, numMessagesToTest);
+
+      Connection connection = createConnection();
+      connection.start();
+      Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(testQueue);
+      for (int i = 0;i < numMessagesToTest;i++) {
+         Message msg = consumer.receive(500L);
+      }
+      session.rollback(); // All <numMessagesToTest> messages should now be on DLQ
+
+      JMSQueueControl testQueueControl = createManagementControl(testQueue);
+      JMSQueueControl dlqQueueControl = createManagementControl(dlq);
+      Assert.assertEquals(0, getMessageCount(testQueueControl));
+      Assert.assertEquals(numMessagesToTest,getMessageCount(dlqQueueControl));
+
+      dlqQueueControl.retryMessages();
+
+      Assert.assertEquals(numMessagesToTest, getMessageCount(testQueueControl));
+      Assert.assertEquals(0,getMessageCount(dlqQueueControl));
+   }
+
+   /**
+    * Test retrying a specific message on DLQ.
+    * Expected to be sent back to original queue.
+    * @throws Exception
+    */
+   @Test
+   public void testRetryMessage() throws Exception {
+      ActiveMQQueue dlq = createDLQ(RandomUtil.randomString());
+      ActiveMQQueue testQueue = createTestQueueWithDLQ(RandomUtil.randomString(),dlq);
+      String messageID = JMSUtil.sendMessages(testQueue,1)[0];
+
+      Connection connection = createConnection();
+      connection.start();
+      Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
+      MessageConsumer consumer = session.createConsumer(testQueue);
+      consumer.receive(500L);
+      session.rollback(); // All <numMessagesToTest> messages should now be on DLQ
+
+      JMSQueueControl testQueueControl = createManagementControl(testQueue);
+      JMSQueueControl dlqQueueControl = createManagementControl(dlq);
+      Assert.assertEquals(0, getMessageCount(testQueueControl));
+      Assert.assertEquals(1,getMessageCount(dlqQueueControl));
+
+      dlqQueueControl.retryMessage(messageID);
+
+      Assert.assertEquals(1, getMessageCount(testQueueControl));
+      Assert.assertEquals(0,getMessageCount(dlqQueueControl));
+
+   }
+
    @Test
    public void testMoveMessage() throws Exception {
       String otherQueueName = RandomUtil.randomString();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/98917259/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index 1d79d7d..c98f942 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.jms.server.management;
 
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.management.Parameter;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.artemis.api.jms.JMSFactoryType;
@@ -171,6 +172,18 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
{
             return (String) proxy.invokeOperation("listMessageCounterHistory");
          }
 
+         public boolean retryMessage(@Parameter(name = "messageID", desc = "A message ID")
long messageID) throws Exception {
+            return (Boolean) proxy.invokeOperation("retryMessage",messageID);
+         }
+
+         public int retryMessages() throws Exception {
+            return (Integer) proxy.invokeOperation("retryMessages");
+         }
+
+         public boolean retryMessage(final String messageID) throws Exception {
+            return (Boolean) proxy.invokeOperation("retryMessage",messageID);
+         }
+
          @Override
          public Map<String, Object>[] listScheduledMessages() throws Exception {
             return null;


Mime
View raw message