Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 77C37186E8 for ; Wed, 2 Dec 2015 13:26:03 +0000 (UTC) Received: (qmail 47508 invoked by uid 500); 2 Dec 2015 13:26:03 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 47468 invoked by uid 500); 2 Dec 2015 13:26:03 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 47459 invoked by uid 99); 2 Dec 2015 13:26:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Dec 2015 13:26:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id F30FAE03A8; Wed, 2 Dec 2015 13:26:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: <1958e5568ab647eba73921386e511ca6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6069 Date: Wed, 2 Dec 2015 13:26:02 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 5a27bdf07 -> 8363c99b5 https://issues.apache.org/jira/browse/AMQ-6069 Fixed contains method in PrioritizedPendinList which was not returning correctly. This was causing messages to not be removed from the dispatchPendingList when purge was called inside a Queue leading to an eventual OOM error if enough messages were purged. This fix also improves performance of the contains method. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8363c99b Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8363c99b Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8363c99b Branch: refs/heads/master Commit: 8363c99b51a98eb176e6baea82fcafce3225ba2c Parents: 5a27bdf Author: Christopher L. Shannon (cshannon) Authored: Tue Dec 1 19:33:53 2015 +0000 Committer: Christopher L. Shannon (cshannon) Committed: Wed Dec 2 13:24:32 2015 +0000 ---------------------------------------------------------------------- .../region/cursors/PrioritizedPendingList.java | 5 ++- .../activemq/broker/region/QueuePurgeTest.java | 38 +++++++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8363c99b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java index 70eaa53..da2ccaf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -156,10 +156,9 @@ public class PrioritizedPendingList implements PendingList { @Override public boolean contains(MessageReference message) { - if (map.values().contains(message)) { - return true; + if (message != null) { + return this.map.containsKey(message.getMessageId()); } - return false; } http://git-wip-us.apache.org/repos/asf/activemq/blob/8363c99b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index a121619..85faeab 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -30,6 +30,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; @@ -57,6 +58,7 @@ public class QueuePurgeTest extends CombinationTestSupport { Queue queue; MessageConsumer consumer; + @Override protected void setUp() throws Exception { setMaxTestTime(10*60*1000); // 10 mins setAutoFail(true); @@ -78,6 +80,7 @@ public class QueuePurgeTest extends CombinationTestSupport { connection.start(); } + @Override protected void tearDown() throws Exception { super.tearDown(); if (consumer != null) { @@ -90,7 +93,15 @@ public class QueuePurgeTest extends CombinationTestSupport { } public void testPurgeLargeQueue() throws Exception { - applyBrokerSpoolingPolicy(); + testPurgeLargeQueue(false); + } + + public void testPurgeLargeQueuePrioritizedMessages() throws Exception { + testPurgeLargeQueue(true); + } + + private void testPurgeLargeQueue(boolean prioritizedMessages) throws Exception { + applyBrokerSpoolingPolicy(prioritizedMessages); createProducerAndSendMessages(NUM_TO_SEND); QueueViewMBean proxy = getProxyToQueueViewMBean(); LOG.info("purging.."); @@ -127,10 +138,11 @@ public class QueuePurgeTest extends CombinationTestSupport { proxy.getQueueSize()); assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled()); assertTrue("got expected info purge log message", gotPurgeLogMessage.get()); + assertEquals("Found messages when browsing", 0, proxy.browseMessages().size()); } - public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { - applyBrokerSpoolingPolicy(); + public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { + applyBrokerSpoolingPolicy(false); final int expiryPeriod = 500; applyExpiryDuration(expiryPeriod); createProducerAndSendMessages(NUM_TO_SEND); @@ -140,15 +152,16 @@ public class QueuePurgeTest extends CombinationTestSupport { assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND, proxy.getQueueSize()); } - + private void applyExpiryDuration(int i) { broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i); } - private void applyBrokerSpoolingPolicy() { + private void applyBrokerSpoolingPolicy(boolean prioritizedMessages) { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setPrioritizedMessages(prioritizedMessages); defaultEntry.setProducerFlowControl(false); PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy(); defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); @@ -156,9 +169,17 @@ public class QueuePurgeTest extends CombinationTestSupport { broker.setDestinationPolicy(policyMap); } - - public void testPurgeLargeQueueWithConsumer() throws Exception { - applyBrokerSpoolingPolicy(); + + public void testPurgeLargeQueueWithConsumer() throws Exception { + testPurgeLargeQueueWithConsumer(false); + } + + public void testPurgeLargeQueueWithConsumerPrioritizedMessages() throws Exception { + testPurgeLargeQueueWithConsumer(true); + } + + private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception { + applyBrokerSpoolingPolicy(prioritizedMessages); createProducerAndSendMessages(NUM_TO_SEND); QueueViewMBean proxy = getProxyToQueueViewMBean(); createConsumer(); @@ -177,6 +198,7 @@ public class QueuePurgeTest extends CombinationTestSupport { } } while (msg != null); assertEquals("Queue size not valid", 0, proxy.getQueueSize()); + assertEquals("Found messages when browsing", 0, proxy.browseMessages().size()); } private QueueViewMBean getProxyToQueueViewMBean()