activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6069
Date Wed, 02 Dec 2015 13:42:54 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 95fc59397 -> db87a051c


https://issues.apache.org/jira/browse/AMQ-6069

Fixed contains method in PrioritizedPendinList which was not returning
correctly.  This was causing messages to not be removed from the
dispatchPendingList when purge was called inside a Queue leading to an
eventual OOM error if enough messages were purged. This fix also
improves performance of the contains method.

(cherry picked from commit 8363c99b51a98eb176e6baea82fcafce3225ba2c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/db87a051
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/db87a051
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/db87a051

Branch: refs/heads/activemq-5.13.x
Commit: db87a051ca22abec6ce3e296e0c7f6935becd803
Parents: 95fc593
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Dec 1 19:33:53 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Wed Dec 2 13:42:42 2015 +0000

----------------------------------------------------------------------
 .../region/cursors/PrioritizedPendingList.java  |  5 ++-
 .../activemq/broker/region/QueuePurgeTest.java  | 38 +++++++++++++++-----
 2 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/db87a051/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 70eaa53..da2ccaf 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -156,10 +156,9 @@ public class PrioritizedPendingList implements PendingList {
 
     @Override
     public boolean contains(MessageReference message) {
-        if (map.values().contains(message)) {
-            return true;
+        if (message != null) {
+            return this.map.containsKey(message.getMessageId());
         }
-
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/db87a051/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
index a121619..85faeab 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -30,6 +30,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -57,6 +58,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
     Queue queue;
     MessageConsumer consumer;
 
+    @Override
     protected void setUp() throws Exception {
         setMaxTestTime(10*60*1000); // 10 mins
         setAutoFail(true);
@@ -78,6 +80,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
         connection.start();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         super.tearDown();
         if (consumer != null) {
@@ -90,7 +93,15 @@ public class QueuePurgeTest extends CombinationTestSupport {
     }
 
     public void testPurgeLargeQueue() throws Exception {
-        applyBrokerSpoolingPolicy();
+        testPurgeLargeQueue(false);
+    }
+
+    public void testPurgeLargeQueuePrioritizedMessages() throws Exception {
+        testPurgeLargeQueue(true);
+    }
+
+    private void testPurgeLargeQueue(boolean prioritizedMessages) throws Exception {
+        applyBrokerSpoolingPolicy(prioritizedMessages);
         createProducerAndSendMessages(NUM_TO_SEND);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         LOG.info("purging..");
@@ -127,10 +138,11 @@ public class QueuePurgeTest extends CombinationTestSupport {
                 proxy.getQueueSize());
         assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled());
         assertTrue("got expected info purge log message", gotPurgeLogMessage.get());
+        assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
     }
 
-    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {       
-        applyBrokerSpoolingPolicy();
+    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
+        applyBrokerSpoolingPolicy(false);
         final int expiryPeriod = 500;
         applyExpiryDuration(expiryPeriod);
         createProducerAndSendMessages(NUM_TO_SEND);
@@ -140,15 +152,16 @@ public class QueuePurgeTest extends CombinationTestSupport {
         assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND,
                 proxy.getQueueSize());
     }
-    
+
 
     private void applyExpiryDuration(int i) {
         broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
     }
 
-    private void applyBrokerSpoolingPolicy() {
+    private void applyBrokerSpoolingPolicy(boolean prioritizedMessages) {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPrioritizedMessages(prioritizedMessages);
         defaultEntry.setProducerFlowControl(false);
         PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy();
         defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
@@ -156,9 +169,17 @@ public class QueuePurgeTest extends CombinationTestSupport {
         broker.setDestinationPolicy(policyMap);
     }
 
-    
-    public void testPurgeLargeQueueWithConsumer() throws Exception {       
-        applyBrokerSpoolingPolicy();
+
+    public void testPurgeLargeQueueWithConsumer() throws Exception {
+        testPurgeLargeQueueWithConsumer(false);
+    }
+
+    public void testPurgeLargeQueueWithConsumerPrioritizedMessages() throws Exception {
+        testPurgeLargeQueueWithConsumer(true);
+    }
+
+    private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception
{
+        applyBrokerSpoolingPolicy(prioritizedMessages);
         createProducerAndSendMessages(NUM_TO_SEND);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         createConsumer();
@@ -177,6 +198,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
             }
         } while (msg != null);
         assertEquals("Queue size not valid", 0, proxy.getQueueSize());
+        assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
     }
 
     private QueueViewMBean getProxyToQueueViewMBean()


Mime
View raw message