activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From michaelpea...@apache.org
Subject [2/2] activemq-artemis git commit: ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used
Date Wed, 08 Aug 2018 16:38:13 GMT
ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used

The deliver loop won't give up trying to deliver messages when
back-pressure kicks in (credits and/or TCP) if msg grouping is used and
there are many consumers registered: this change will allow the loop
to exit by instructing the logic that the group consumer is the only
consumer to check.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8dd0e947
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8dd0e947
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8dd0e947

Branch: refs/heads/master
Commit: 8dd0e9472fc9eaf08c3d64c3935aeafbf04a422a
Parents: 99469b1
Author: Francesco Nigro <nigro.fra@gmail.com>
Authored: Tue Jul 31 11:16:26 2018 +0200
Committer: Michael Andre Pearce <michael.andre.pearce@me.com>
Committed: Wed Aug 8 17:37:58 2018 +0100

----------------------------------------------------------------------
 .../unit/core/server/impl/QueueImplTest.java    | 60 ++++++++++++++++++++
 1 file changed, 60 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8dd0e947/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 0aa6e5c..b0987aa 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -1289,6 +1290,65 @@ public class QueueImplTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testGroupMessageWithManyConsumers() throws Exception {
+      final CountDownLatch firstMessageHandled = new CountDownLatch(1);
+      final CountDownLatch finished = new CountDownLatch(2);
+      final Consumer groupConsumer = new FakeConsumer() {
+
+         int count = 0;
+
+         @Override
+         public synchronized HandleStatus handle(MessageReference reference) {
+            if (count == 0) {
+               //the first message is handled and will be used to determine this consumer
+               //to be the group consumer
+               count++;
+               firstMessageHandled.countDown();
+               return HandleStatus.HANDLED;
+            } else if (count <= 2) {
+               //the next two attempts to send the second message will be done
+               //attempting a direct delivery and an async one after that
+               count++;
+               finished.countDown();
+               return HandleStatus.BUSY;
+            } else {
+               //this shouldn't happen, because the last attempt to deliver
+               //the second message should have stop the delivery loop:
+               //it will succeed just to let the message being handled and
+               //reduce the message count to 0
+               return HandleStatus.HANDLED;
+            }
+         }
+      };
+      final Consumer noConsumer = new FakeConsumer() {
+         @Override
+         public synchronized HandleStatus handle(MessageReference reference) {
+            Assert.fail("this consumer isn't allowed to consume any message");
+            throw new AssertionError();
+         }
+      };
+      final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
+                                            null, null, false, true, false,
+                                            scheduledExecutor, null, null, null,
+                                            ArtemisExecutor.delegate(executor), null, null);
+      queue.addConsumer(groupConsumer);
+      queue.addConsumer(noConsumer);
+      final MessageReference firstMessageReference = generateReference(queue, 1);
+      final SimpleString groupName = SimpleString.toSimpleString("group");
+      firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
+      final MessageReference secondMessageReference = generateReference(queue, 2);
+      secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
+      queue.addTail(firstMessageReference, true);
+      Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
+      Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName));
+      queue.addTail(secondMessageReference, true);
+      final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS);
+      Assert.assertTrue(atLeastTwoDeliverAttempts);
+      Thread.sleep(1000);
+      Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount());
+   }
+
    private QueueImpl getNonDurableQueue() {
       return getQueue(QueueImplTest.queue1, false, false, null);
    }


Mime
View raw message