Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D9543200BFE for ; Mon, 16 Jan 2017 17:37:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D7FC7160B28; Mon, 16 Jan 2017 16:37:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D795D160B4D for ; Mon, 16 Jan 2017 17:37:25 +0100 (CET) Received: (qmail 9704 invoked by uid 500); 16 Jan 2017 16:37:25 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 9597 invoked by uid 99); 16 Jan 2017 16:37:25 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 16 Jan 2017 16:37:25 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id ED91EDFCAB; Mon, 16 Jan 2017 16:37:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: clebertsuconic@apache.org To: commits@activemq.apache.org Date: Mon, 16 Jan 2017 16:37:26 -0000 Message-Id: <7bfa431609994d058c49dbab75a4241c@git.apache.org> In-Reply-To: <34ed0048fc144ee88d94ba5bc4fa7cea@git.apache.org> References: <34ed0048fc144ee88d94ba5bc4fa7cea@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] activemq-artemis git commit: ARTEMIS-921 Consumers killed as slow even if overall consuming rate is above threshold archived-at: Mon, 16 Jan 2017 16:37:27 -0000 ARTEMIS-921 Consumers killed as slow even if overall consuming rate is above threshold Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/90cf2398 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/90cf2398 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/90cf2398 Branch: refs/heads/master Commit: 90cf2398296d9de83145043c6dc0d8f6a6d4c068 Parents: 490bec9 Author: Howard Gao Authored: Mon Jan 16 22:00:05 2017 +0800 Committer: Clebert Suconic Committed: Mon Jan 16 11:37:12 2017 -0500 ---------------------------------------------------------------------- .../artemis/core/server/impl/QueueImpl.java | 3 + .../client/MultipleSlowConsumerTest.java | 261 +++++++++++++++++++ 2 files changed, 264 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90cf2398/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index a74b0fe..87a5bd7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -3159,6 +3159,9 @@ public class QueueImpl implements Queue { connection.killMessage(server.getNodeID()); remotingService.removeConnection(connection.getID()); connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress())); + //break once a consumer gets killed. This can prevent all + //consumers to this queue get killed all at once. + break; } else if (policy.equals(SlowConsumerPolicy.NOTIFY)) { TypedProperties props = new TypedProperties(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90cf2398/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java new file mode 100644 index 0000000..37ae528 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.client; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientConsumer; +import org.apache.activemq.artemis.api.core.client.ClientMessage; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ConcurrentHashSet; +import org.apache.activemq.artemis.utils.TimeUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Set; + +public class MultipleSlowConsumerTest extends ActiveMQTestBase { + + private int checkPeriod = 3; + private int threshold = 1; + + private ActiveMQServer server; + + private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue"); + + private ServerLocator locator; + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + + server = createServer(true, true); + + AddressSettings addressSettings = new AddressSettings(); + addressSettings.setSlowConsumerCheckPeriod(checkPeriod); + addressSettings.setSlowConsumerThreshold(threshold); + addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL); + + server.start(); + + server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings); + + server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false); + + locator = createFactory(true); + } + + /** + * This test creates 3 consumers on one queue. A producer sends + * messages at a rate of 2 mesages per second. Each consumer + * consumes messages at rate of 1 message per second. The slow + * consumer threshold is 1 message per second. + * Based on the above settings, at least one of the consumers + * will be removed during the test, but at least one of the + * consumers will remain and all messages will be received. + */ + @Test + public void testMultipleConsumersOneQueue() throws Exception { + locator.setAckBatchSize(0); + + ClientSessionFactory sf1 = createSessionFactory(locator); + ClientSessionFactory sf2 = createSessionFactory(locator); + ClientSessionFactory sf3 = createSessionFactory(locator); + ClientSessionFactory sf4 = createSessionFactory(locator); + + final int messages = 10; + + FixedRateProducer producer = new FixedRateProducer(sf1, QUEUE, messages); + + final Set consumers = new ConcurrentHashSet<>(); + final Set receivedMessages = new ConcurrentHashSet<>(); + + consumers.add(new FixedRateConsumer(sf2, QUEUE, consumers, receivedMessages, 1)); + consumers.add(new FixedRateConsumer(sf3, QUEUE, consumers, receivedMessages, 2)); + consumers.add(new FixedRateConsumer(sf4, QUEUE, consumers, receivedMessages, 3)); + + try { + producer.start(threshold * 1000 / 2); + + for (FixedRateConsumer consumer : consumers) { + consumer.start(threshold * 1000); + } + + //check at least one consumer is killed + //but at least one survived + //and all messages are received. + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() < 3)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages)); + assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0)); + } finally { + producer.stopRunning(); + for (FixedRateConsumer consumer : consumers) { + consumer.stopRunning(); + } + System.out.println("***report messages received: " + receivedMessages.size()); + System.out.println("***consumers left: " + consumers.size()); + } + } + + private class FixedRateProducer extends FixedRateClient { + + int messages; + ClientProducer producer; + + FixedRateProducer(ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException { + super(sf, queue); + this.messages = messages; + } + + @Override + protected void prepareWork() throws ActiveMQException { + super.prepareWork(); + this.producer = session.createProducer(queue); + } + + @Override + protected void doWork(int count) throws Exception { + + if (count < messages) { + ClientMessage m = createTextMessage(session, "msg" + count); + producer.send(m); + System.out.println("producer sent a message " + count); + } else { + stopRunning(); + } + } + } + + private class FixedRateConsumer extends FixedRateClient { + + Set consumers; + ClientConsumer consumer; + Set receivedMessages; + int id; + + FixedRateConsumer(ClientSessionFactory sf, SimpleString queue, + Set consumers, Set receivedMessages, + int id) throws ActiveMQException { + super(sf, queue); + this.consumers = consumers; + this.receivedMessages = receivedMessages; + this.id = id; + } + + @Override + protected void prepareWork() throws ActiveMQException { + super.prepareWork(); + this.consumer = session.createConsumer(queue); + this.session.start(); + } + + @Override + protected void doWork(int count) throws Exception { + ClientMessage m = this.consumer.receive(rate); + System.out.println("consumer " + id + " got m: " + m); + if (m != null) { + receivedMessages.add(m); + m.acknowledge(); + System.out.println("acked " + m.getClass().getName() + "now total received: " + receivedMessages.size()); + } + } + + @Override + protected void handleError(int count, Exception e) { + System.err.println("Got error receiving message " + count + " remove self " + this.id); + consumers.remove(this); + e.printStackTrace(); + } + + } + + private abstract class FixedRateClient extends Thread { + + protected ClientSessionFactory sf; + protected SimpleString queue; + protected ClientSession session; + protected int rate; + protected volatile boolean working; + + FixedRateClient(ClientSessionFactory sf, SimpleString queue) throws ActiveMQException { + this.sf = sf; + this.queue = queue; + } + + public void start(int rate) { + this.rate = rate; + working = true; + start(); + } + + protected void prepareWork() throws ActiveMQException { + this.session = addClientSession(sf.createSession(true, true)); + } + + @Override + public void run() { + try { + prepareWork(); + } catch (ActiveMQException e) { + System.out.println("got error in prepareWork(), aborting..."); + e.printStackTrace(); + return; + } + int count = 0; + while (working) { + try { + doWork(count); + Thread.sleep(rate); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (Exception e) { + System.err.println(this + " got exception "); + e.printStackTrace(); + handleError(count, e); + working = false; + } finally { + count++; + } + } + } + + protected abstract void doWork(int count) throws Exception; + + protected void handleError(int count, Exception e) { + } + + public void stopRunning() { + working = false; + interrupt(); + try { + join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +}