activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From clebertsuco...@apache.org
Subject [1/3] activemq-artemis git commit: ARTEMIS-1011 adjust slow-consumer detection logic
Date Thu, 09 Mar 2017 22:27:22 GMT
Repository: activemq-artemis
Updated Branches:
  refs/heads/master 4fabbc804 -> 2b7807f7e


ARTEMIS-1011 adjust slow-consumer detection logic

Adjust slow-consumer detection logic to use the number of messages in
the queue and not just the number of messages added since the last
check. This means the getRate() method now returns the rate of messages
which it *could* have dispatched since the last check rather than the
rate at which it received messages. This is a more reliable metric to
ensure the slow-consumer detection logic doesn't flag a consumer as
slow unfairly. Although the reliability will come at a performance cost
since getMessageCount() must lock the queue.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/19ebbfb5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/19ebbfb5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/19ebbfb5

Branch: refs/heads/master
Commit: 19ebbfb5f0a10daca6f2f516efae4755613254fd
Parents: 4fabbc8
Author: Justin Bertram <jbertram@apache.org>
Authored: Thu Mar 9 08:11:00 2017 -0600
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Thu Mar 9 16:59:22 2017 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 20 ++++-----
 docs/user-manual/en/slow-consumers.md           | 19 ++++++++-
 .../integration/client/SlowConsumerTest.java    | 44 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19ebbfb5/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 8bab051..44c5ba4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -232,8 +232,6 @@ public class QueueImpl implements Queue {
 
    private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
 
-   private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
-
    private ScheduledFuture slowConsumerReaperFuture;
 
    private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
@@ -2816,13 +2814,11 @@ public class QueueImpl implements Queue {
 
    @Override
    public float getRate() {
-      long locaMessageAdded = getMessagesAdded();
       float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis()))
/ 1000.0f);
       if (timeSlice == 0) {
-         messagesAddedSnapshot.getAndSet(locaMessageAdded);
          return 0.0f;
       }
-      return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded))
/ timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
+      return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
    }
 
    // Inner classes
@@ -3131,17 +3127,19 @@ public class QueueImpl implements Queue {
 
       @Override
       public void run() {
-         float queueRate = getRate();
-         if (logger.isDebugEnabled()) {
-            logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() +
" consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
-         }
-
          Set<Consumer> consumersSet = getConsumers();
 
          if (consumersSet.size() == 0) {
             logger.debug("There are no consumers, no need to check slow consumer's rate");
             return;
-         } else if (queueRate  < (threshold * consumersSet.size())) {
+         }
+
+         float queueRate = getRate();
+         if (logger.isDebugEnabled()) {
+            logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() +
" consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
+         }
+
+         if (queueRate < (threshold * consumersSet.size())) {
             if (logger.isDebugEnabled()) {
                logger.debug("Insufficient messages received on queue \"" + getName() + "\"
to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19ebbfb5/docs/user-manual/en/slow-consumers.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/slow-consumers.md b/docs/user-manual/en/slow-consumers.md
index b53bac5..5840912 100644
--- a/docs/user-manual/en/slow-consumers.md
+++ b/docs/user-manual/en/slow-consumers.md
@@ -18,8 +18,23 @@ By default the server will not detect slow consumers. If slow consumer
 detection is desired then see [queue attributes chapter](queue-attributes.md)
 for more details.
 
-The calculation to determine whether or not a consumer is slow only
-inspects the number of messages a particular consumer has
+The calculation to determine whether or not a consumer is slow inspects two notable
+metrics:
+
+1. The queue's message count.
+
+2. The number of messages a consumer has acknowledged.
+
+The queue's message count is inspected to ensure that the queue actually has had enough
+messages to actually satisfy the consumer's threshold. For example, it would not be
+fair to mark a consumer as "slow" if the queue received no messages. This is also notable
+because in order to get an accurate message count the queue must be locked which can
+negatively impact performance in high-throughput use-cases. Therefore slow-consumer
+detection is only recommended on queues where it is absolutely necessary and in those
+cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not
+running so often as to negatively impact performance.
+
+Finally, the algorithm inspects the number of messages a particular consumer has
 *acknowledged*. It doesn't take into account whether or not flow control
 has been enabled on the consumer, whether or not the consumer is
 streaming a large message, etc. Keep this in mind when configuring slow

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/19ebbfb5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index c81c24c..4f47ac1 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.TimeUtils;
@@ -244,6 +245,49 @@ public class SlowConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
+   public void testSlowConsumerWithBurst() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(true, true));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      final int numMessages = 20;
+
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(createTextMessage(session, "m" + i));
+      }
+
+      assertPaging();
+
+      final Queue queue = server.locateQueue(QUEUE);
+
+      queue.getRate();
+
+      logger.info("Creating consumer...");
+
+      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+      session.start();
+
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            forceGC();
+            return queue.getConsumerCount() == 0;
+         }
+      }, 3000, 100);
+
+      try {
+         consumer.receive(500);
+         fail("Consumer should have been killed since it's slow!");
+      } catch (ActiveMQObjectClosedException e) {
+         // ignore
+      } catch (Exception e) {
+         fail("Wrong exception thrown");
+      }
+   }
+
+   @Test
    public void testFastThenSlowConsumerSpared() throws Exception {
       locator.setAckBatchSize(0);
 


Mime
View raw message