activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-1940
Date Tue, 24 Jan 2017 13:48:34 GMT
Repository: activemq
Updated Branches:
  refs/heads/activemq-5.14.x f947a74f4 -> 1811d191a


https://issues.apache.org/jira/browse/AMQ-1940

Queue purge now acquires the sendLock to prevent new messages from
coming in while purging.  The statistics are no longer zeroed out as
they should properly decrement as messages are removed.  These changes
should prevent the statistics from going negative.

(cherry picked from commit 56bb079c8227a2beee609b205c001d66597db98a)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/1811d191
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/1811d191
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/1811d191

Branch: refs/heads/activemq-5.14.x
Commit: 1811d191afa91d15e6a37d13dbfd7fd8ef32690e
Parents: f947a74
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Jan 24 08:46:15 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Jan 24 08:48:28 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 48 ++++++++++---------
 .../activemq/broker/region/QueuePurgeTest.java  | 50 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/1811d191/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index b841b89..6283232 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1237,33 +1237,35 @@ public class Queue extends BaseDestination implements Task, UsageListener,
Index
     public void purge() throws Exception {
         ConnectionContext c = createConnectionContext();
         List<MessageReference> list = null;
-        long originalMessageCount = this.destinationStatistics.getMessages().getCount();
-        do {
-            doPageIn(true, false, getMaxPageSize());  // signal no expiry processing needed.
-            pagedInMessagesLock.readLock().lock();
-            try {
-                list = new ArrayList<MessageReference>(pagedInMessages.values());
-            }finally {
-                pagedInMessagesLock.readLock().unlock();
-            }
-
-            for (MessageReference ref : list) {
+        try {
+            sendLock.lock();
+            long originalMessageCount = this.destinationStatistics.getMessages().getCount();
+            do {
+                doPageIn(true, false, getMaxPageSize());  // signal no expiry processing
needed.
+                pagedInMessagesLock.readLock().lock();
                 try {
-                    QueueMessageReference r = (QueueMessageReference) ref;
-                    removeMessage(c, r);
-                } catch (IOException e) {
+                    list = new ArrayList<MessageReference>(pagedInMessages.values());
+                }finally {
+                    pagedInMessagesLock.readLock().unlock();
                 }
-            }
-            // don't spin/hang if stats are out and there is nothing left in the
-            // store
-        } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount()
> 0);
 
-        if (this.destinationStatistics.getMessages().getCount() > 0) {
-            LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(),
originalMessageCount, this.destinationStatistics.getMessages().getCount());
+                for (MessageReference ref : list) {
+                    try {
+                        QueueMessageReference r = (QueueMessageReference) ref;
+                        removeMessage(c, r);
+                    } catch (IOException e) {
+                    }
+                }
+                // don't spin/hang if stats are out and there is nothing left in the
+                // store
+            } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount()
> 0);
+
+            if (this.destinationStatistics.getMessages().getCount() > 0) {
+                LOG.warn("{} after purge of {} messages, message count stats report: {}",
getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
+            }
+        } finally {
+            sendLock.unlock();
         }
-        gc();
-        this.destinationStatistics.getMessages().setCount(0);
-        getMessages().clear();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/1811d191/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
index 85faeab..30b2cb2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -17,6 +17,8 @@
 package org.apache.activemq.broker.region;
 
 import java.io.File;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
@@ -178,6 +180,54 @@ public class QueuePurgeTest extends CombinationTestSupport {
         testPurgeLargeQueueWithConsumer(true);
     }
 
+    public void testConcurrentPurgeAndSend() throws Exception {
+        testConcurrentPurgeAndSend(false);
+    }
+
+    public void testConcurrentPurgeAndSendPrioritizedMessages() throws Exception {
+        testConcurrentPurgeAndSend(true);
+    }
+
+    private void testConcurrentPurgeAndSend(boolean prioritizedMessages) throws Exception
{
+        applyBrokerSpoolingPolicy(false);
+        createProducerAndSendMessages(NUM_TO_SEND / 2);
+        QueueViewMBean proxy = getProxyToQueueViewMBean();
+        createConsumer();
+        long start = System.currentTimeMillis();
+        ExecutorService service = Executors.newFixedThreadPool(1);
+        try {
+            LOG.info("purging..");
+            service.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    try {
+                        proxy.purge();
+                    } catch (Exception e) {
+                        fail(e.getMessage());
+                    }
+                    LOG.info("purge done: " + (System.currentTimeMillis() - start) + "ms");
+                }
+            });
+
+            //send should get blocked while purge is running
+            //which should ensure the metrics are correct
+            createProducerAndSendMessages(NUM_TO_SEND / 2);
+
+            Message msg;
+            do {
+                msg = consumer.receive(1000);
+                if (msg != null) {
+                    msg.acknowledge();
+                }
+            } while (msg != null);
+            assertEquals("Queue size not valid", 0, proxy.getQueueSize());
+            assertEquals("Found messages when browsing", 0, proxy.browseMessages().size());
+        } finally {
+            service.shutdownNow();
+        }
+    }
+
     private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception
{
         applyBrokerSpoolingPolicy(prioritizedMessages);
         createProducerAndSendMessages(NUM_TO_SEND);


Mime
View raw message