activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/2] activemq-artemis git commit: ARTEMIS-646 track expired msg count on queue
Date Tue, 26 Jul 2016 22:22:55 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master ec605e664 -> a242f2761


ARTEMIS-646 track expired msg count on queue


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/32abe618
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/32abe618
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/32abe618

Branch: refs/heads/master
Commit: 32abe61876d388b6d1208de8f74968d86be24872
Parents: ec605e6
Author: jbertram <jbertram@apache.org>
Authored: Tue Jul 26 16:06:01 2016 -0500
Committer: jbertram <jbertram@apache.org>
Committed: Tue Jul 26 16:15:02 2016 -0500

----------------------------------------------------------------------
 .../api/core/management/QueueControl.java       | 12 +++++
 .../api/jms/management/JMSQueueControl.java     |  6 +++
 .../management/impl/JMSQueueControlImpl.java    |  5 ++
 .../core/management/impl/QueueControlImpl.java  | 27 +++++++++++
 .../activemq/artemis/core/server/Queue.java     |  4 ++
 .../artemis/core/server/impl/QueueImpl.java     | 51 +++++++++++++++++---
 .../impl/ScheduledDeliveryHandlerTest.java      | 10 ++++
 .../management/JMSQueueControlUsingJMSTest.java |  5 ++
 .../management/QueueControlTest.java            | 40 +++++++++++++++
 .../management/QueueControlUsingCoreTest.java   | 10 ++++
 .../unit/core/postoffice/impl/FakeQueue.java    | 12 +++++
 11 files changed, 174 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index 3a3b349..3492892 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -99,6 +99,12 @@ public interface QueueControl {
    long getMessagesAcknowledged();
 
    /**
+    * Returns the number of messages expired from this queue since it was created.
+    */
+   @Attribute(desc = "number of messages expired from this queue since it was created")
+   long getMessagesExpired();
+
+   /**
     * Returns the first message on the queue as JSON
     */
    @Attribute(desc = "first message on the queue as JSON")
@@ -435,6 +441,12 @@ public interface QueueControl {
    void resetMessagesAcknowledged() throws Exception;
 
    /**
+    * Resets the MessagesExpired property
+    */
+   @Operation(desc = "Resets the MessagesExpired property", impact = MBeanOperationInfo.ACTION)
+   void resetMessagesExpired() throws Exception;
+
+   /**
     * it will flush one cycle on internal executors, so you would be sure that any pending
tasks are done before you call
     * any other measure.
     * It is useful if you need the exact number of counts on a message

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
index 837ec68..c13e3b9 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/api/jms/management/JMSQueueControl.java
@@ -56,6 +56,12 @@ public interface JMSQueueControl extends DestinationControl {
    int getConsumerCount();
 
    /**
+    * Returns the number of messages expired from this queue since it was created.
+    */
+   @Attribute(desc = "the number of messages expired from this queue since it was created")
+   long getMessagesExpired();
+
+   /**
     * returns the selector for the queue
     */
    @Attribute(desc = "selector for the queue")

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
index 0516182..b037d72 100644
--- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
+++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/management/impl/JMSQueueControlImpl.java
@@ -125,6 +125,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
    }
 
    @Override
+   public long getMessagesExpired() {
+      return coreQueueControl.getMessagesExpired();
+   }
+
+   @Override
    public int getConsumerCount() {
       return coreQueueControl.getConsumerCount();
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index de2459f..8f1d6e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -257,6 +257,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
    }
 
    @Override
+   public long getMessagesExpired() {
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getMessagesExpired();
+      }
+      finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public long getID() {
       checkStarted();
 
@@ -1011,6 +1024,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
{
 
    }
 
+   @Override
+   public void resetMessagesExpired() throws Exception {
+      checkStarted();
+
+      clearIO();
+      try {
+         queue.resetMessagesExpired();
+      }
+      finally {
+         blockOnIO();
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index fa39c22..6645d36 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -121,6 +121,8 @@ public interface Queue extends Bindable {
 
    long getMessagesAcknowledged();
 
+   long getMessagesExpired();
+
    MessageReference removeReferenceWithID(long id) throws Exception;
 
    MessageReference getReference(long id) throws ActiveMQException;
@@ -234,6 +236,8 @@ public interface Queue extends Bindable {
 
    void resetMessagesAcknowledged();
 
+   void resetMessagesExpired();
+
    void incrementMesssagesAdded();
 
    /**

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index c809abf..724da5b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -167,6 +167,8 @@ public class QueueImpl implements Queue {
 
    private long messagesAcknowledged;
 
+   private long messagesExpired;
+
    protected final AtomicInteger deliveringCount = new AtomicInteger(0);
 
    private boolean paused;
@@ -962,6 +964,10 @@ public class QueueImpl implements Queue {
 
    @Override
    public void acknowledge(final MessageReference ref) throws Exception {
+      acknowledge(ref, OperationType.NORMAL);
+   }
+
+   private void acknowledge(final MessageReference ref, OperationType type) throws Exception
{
       if (ref.isPaged()) {
          pageSubscription.ack((PagedReference) ref);
          postAcknowledge(ref);
@@ -977,12 +983,21 @@ public class QueueImpl implements Queue {
          postAcknowledge(ref);
       }
 
-      messagesAcknowledged++;
+      if (type == OperationType.EXPIRED) {
+         messagesExpired++;
+      }
+      else {
+         messagesAcknowledged++;
+      }
 
    }
 
    @Override
    public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception
{
+      acknowledge(tx, ref, OperationType.NORMAL);
+   }
+
+   private void acknowledge(final Transaction tx, final MessageReference ref, OperationType
type) throws Exception {
       if (ref.isPaged()) {
          pageSubscription.ackTx(tx, (PagedReference) ref);
 
@@ -1002,7 +1017,12 @@ public class QueueImpl implements Queue {
          getRefsOperation(tx).addAck(ref);
       }
 
-      messagesAcknowledged++;
+      if (type == OperationType.EXPIRED) {
+         messagesExpired++;
+      }
+      else {
+         messagesAcknowledged++;
+      }
    }
 
    @Override
@@ -1075,13 +1095,13 @@ public class QueueImpl implements Queue {
          if (logger.isTraceEnabled()) {
             logger.trace("moving expired reference " + ref + " to address = " + expiryAddress
+ " from queue=" + this.getName());
          }
-         move(null, expiryAddress, ref, true, false);
+         move(null, expiryAddress, ref, true, false, OperationType.EXPIRED);
       }
       else {
          if (logger.isTraceEnabled()) {
             logger.trace("expiry is null, just acking expired message for reference " + ref
+ " from queue=" + this.getName());
          }
-         acknowledge(ref);
+         acknowledge(ref, OperationType.EXPIRED);
       }
    }
 
@@ -1128,6 +1148,11 @@ public class QueueImpl implements Queue {
    }
 
    @Override
+   public long getMessagesExpired() {
+      return messagesExpired;
+   }
+
+   @Override
    public int deleteAllReferences() throws Exception {
       return deleteAllReferences(DEFAULT_FLUSH_LIMIT);
    }
@@ -1508,7 +1533,7 @@ public class QueueImpl implements Queue {
                refRemoved(ref);
                incDelivering();
                try {
-                  move(null, toAddress, ref, false, rejectDuplicate);
+                  move(null, toAddress, ref, false, rejectDuplicate, OperationType.NORMAL);
                }
                catch (Exception e) {
                   decDelivering();
@@ -2353,7 +2378,7 @@ public class QueueImpl implements Queue {
          }
          else {
             ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress,
name);
-            move(tx, deadLetterAddress, ref, false, false);
+            move(tx, deadLetterAddress, ref, false, false, OperationType.NORMAL);
          }
       }
       else {
@@ -2367,7 +2392,8 @@ public class QueueImpl implements Queue {
                      final SimpleString address,
                      final MessageReference ref,
                      final boolean expiry,
-                     final boolean rejectDuplicate) throws Exception {
+                     final boolean rejectDuplicate,
+                     final OperationType type) throws Exception {
       Transaction tx;
 
       if (originalTX != null) {
@@ -2384,7 +2410,7 @@ public class QueueImpl implements Queue {
 
       postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
 
-      acknowledge(tx, ref);
+      acknowledge(tx, ref, type);
 
       if (originalTX == null) {
          tx.commit();
@@ -2634,6 +2660,11 @@ public class QueueImpl implements Queue {
    }
 
    @Override
+   public synchronized void resetMessagesExpired() {
+      messagesExpired = 0;
+   }
+
+   @Override
    public float getRate() {
       float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis()))
/ 1000.0f);
       if (timeSlice == 0) {
@@ -2988,5 +3019,9 @@ public class QueueImpl implements Queue {
          }
       }
    }
+
+   private enum OperationType {
+      EXPIRED, NORMAL
+   }
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index c96606d..301833e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1047,6 +1047,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getMessagesExpired() {
+         return 0;
+      }
+
+      @Override
       public MessageReference removeReferenceWithID(long id) throws Exception {
          return null;
       }
@@ -1256,6 +1261,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void resetMessagesExpired() {
+
+      }
+
+      @Override
       public void incrementMesssagesAdded() {
 
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index 2a966cf..8c14ff2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -120,6 +120,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
{
          }
 
          @Override
+         public long getMessagesExpired() {
+            return ((Number) proxy.retrieveAttributeValue("getMessagesExpired")).longValue();
+         }
+
+         @Override
          public String getDeadLetterAddress() {
             return (String) proxy.retrieveAttributeValue("deadLetterAddress");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
index 0073c8d..693b556 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java
@@ -1984,6 +1984,46 @@ public class QueueControlTest extends ManagementTestBase {
       session.deleteQueue(queue);
    }
 
+   @Test
+   public void testResetMessagesExpired() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString queue = RandomUtil.randomSimpleString();
+
+      session.createQueue(address, queue, null, false);
+
+      QueueControl queueControl = createManagementControl(address, queue);
+      Assert.assertEquals(0, queueControl.getMessagesExpired());
+
+      ClientProducer producer = session.createProducer(address);
+      ClientMessage message = session.createMessage(false);
+      producer.send(message);
+
+      // the message IDs are set on the server
+      Map<String, Object>[] messages = queueControl.listMessages(null);
+      Assert.assertEquals(1, messages.length);
+      long messageID = (Long) messages[0].get("messageID");
+
+      queueControl.expireMessage(messageID);
+      Assert.assertEquals(1, queueControl.getMessagesExpired());
+
+      message = session.createMessage(false);
+      producer.send(message);
+
+      // the message IDs are set on the server
+      messages = queueControl.listMessages(null);
+      Assert.assertEquals(1, messages.length);
+      messageID = (Long) messages[0].get("messageID");
+
+      queueControl.expireMessage(messageID);
+      Assert.assertEquals(2, queueControl.getMessagesExpired());
+
+      queueControl.resetMessagesExpired();
+
+      Assert.assertEquals(0, queueControl.getMessagesExpired());
+
+      session.deleteQueue(queue);
+   }
+
    //make sure notifications are always received no matter whether
    //a Queue is created via QueueControl or by JMSServerManager directly.
    @Test

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 68dfd48..f27eaf1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -121,6 +121,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public long getMessagesExpired() {
+            return ((Number) proxy.retrieveAttributeValue("messagesExpired")).longValue();
+         }
+
+         @Override
          public void resetMessagesAdded() throws Exception {
             proxy.invokeOperation("resetMessagesAdded");
          }
@@ -131,6 +136,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public void resetMessagesExpired() throws Exception {
+            proxy.invokeOperation("resetMessagesExpired");
+         }
+
+         @Override
          public String getName() {
             return (String) proxy.retrieveAttributeValue("name");
          }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/32abe618/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 250d211..0633bfb 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -312,6 +312,12 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public long getMessagesExpired() {
+      // no-op
+      return 0;
+   }
+
+   @Override
    public void resetMessagesAdded() {
       // no-op
 
@@ -324,6 +330,12 @@ public class FakeQueue implements Queue {
    }
 
    @Override
+   public void resetMessagesExpired() {
+      // no-op
+
+   }
+
+   @Override
    public void incrementMesssagesAdded() {
 
    }


Mime
View raw message