Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DCD2E10413 for ; Mon, 12 Jan 2015 12:01:11 +0000 (UTC) Received: (qmail 69200 invoked by uid 500); 12 Jan 2015 12:01:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 69158 invoked by uid 500); 12 Jan 2015 12:01:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 69145 invoked by uid 99); 12 Jan 2015 12:01:13 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jan 2015 12:01:13 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 2C27B9AB807; Mon, 12 Jan 2015 12:01:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gtully@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-4107 - apply patch - resolved intermittent failure of TwoMulticastDiscoveryBrokerTopicSendReceiveTest Date: Mon, 12 Jan 2015 12:01:13 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/trunk 3e007d89a -> 8dbb48a23 https://issues.apache.org/jira/browse/AMQ-4107 - apply patch - resolved intermittent failure of TwoMulticastDiscoveryBrokerTopicSendReceiveTest Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8dbb48a2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8dbb48a2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8dbb48a2 Branch: refs/heads/trunk Commit: 8dbb48a23fc6170afd5c3c855e8d385297042e57 Parents: 3e007d8 Author: gtully Authored: Mon Jan 12 11:59:51 2015 +0000 Committer: gtully Committed: Mon Jan 12 12:00:42 2015 +0000 ---------------------------------------------------------------------- .../broker/region/TopicSubscription.java | 41 +++++++++----------- ...castDiscoveryBrokerTopicSendReceiveTest.java | 2 +- 2 files changed, 20 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8dbb48a2/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 6b61379..8db7c62 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -106,26 +106,26 @@ public class TopicSubscription extends AbstractSubscription { // locator /w the message. node = new IndirectMessageReference(node.getMessage()); enqueueCounter.incrementAndGet(); - if (!isFull() && matched.isEmpty()) { - // if maximumPendingMessages is set we will only discard messages which - // have not been dispatched (i.e. we allow the prefetch buffer to be filled) - dispatch(node); - setSlowConsumer(false); - } else { - if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { - // Slow consumers should log and set their state as such. - if (!isSlowConsumer()) { - LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); - setSlowConsumer(true); - for (Destination dest: destinations) { - dest.slowConsumer(getContext(), this); + synchronized (matchedListMutex) { + if (!isFull() && matched.isEmpty()) { + // if maximumPendingMessages is set we will only discard messages which + // have not been dispatched (i.e. we allow the prefetch buffer to be filled) + dispatch(node); + setSlowConsumer(false); + } else { + if (info.getPrefetchSize() > 1 && matched.size() > info.getPrefetchSize()) { + // Slow consumers should log and set their state as such. + if (!isSlowConsumer()) { + LOG.warn("{}: has twice its prefetch limit pending, without an ack; it appears to be slow", toString()); + setSlowConsumer(true); + for (Destination dest: destinations) { + dest.slowConsumer(getContext(), this); + } } } - } - if (maximumPendingMessages != 0) { - boolean warnedAboutWait = false; - while (active) { - synchronized (matchedListMutex) { + if (maximumPendingMessages != 0) { + boolean warnedAboutWait = false; + while (active) { while (matched.isFull()) { if (getContext().getStopping().get()) { LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId()); @@ -150,9 +150,6 @@ public class TopicSubscription extends AbstractSubscription { break; } } - } - synchronized (matchedListMutex) { - // NOTE - be careful about the slaveBroker! if (maximumPendingMessages > 0) { // calculate the high water mark from which point we // will eagerly evict expired messages @@ -195,8 +192,8 @@ public class TopicSubscription extends AbstractSubscription { } } } + dispatchMatched(); } - dispatchMatched(); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/8dbb48a2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java index 8b8643a..f05f993 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoMulticastDiscoveryBrokerTopicSendReceiveTest.java @@ -21,7 +21,7 @@ import javax.jms.JMSException; import org.apache.activemq.ActiveMQConnectionFactory; /** - * + * reproduced: https://issues.apache.org/jira/browse/AMQ-4107 */ public class TwoMulticastDiscoveryBrokerTopicSendReceiveTest extends TwoBrokerTopicSendReceiveTest {