activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [activemq] branch activemq-5.15.x updated: AMQ-7129 - fix durable message size statistics with individual ack
Date Fri, 11 Jan 2019 14:57:56 GMT
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 0b88dab  AMQ-7129 - fix durable message size statistics with individual ack
0b88dab is described below

commit 0b88dabb40c7c29a1b86d9413a701cf31cc2ceda
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
AuthorDate: Fri Jan 11 09:56:03 2019 -0500

    AMQ-7129 - fix durable message size statistics with individual ack
    
    Make sure that the pending message size for a durable sub only includes
    messages part of the ack range
    
    (cherry picked from commit fa2daa25e9acd3f37bb1ee0d37717d2383e67a85)
---
 .../apache/activemq/store/kahadb/MessageDatabase.java   | 13 ++++++++++---
 .../store/kahadb/KahaDBDurableMessageRecoveryTest.java  | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 8bb902d..8030bc4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -3011,16 +3011,23 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
 
             if (messageSequences != null) {
-                Sequence head = messageSequences.getHead();
-                if (head != null) {
+                if (!messageSequences.isEmpty()) {
+                    final Sequence head = messageSequences.getHead();
+
                     //get an iterator over the order index starting at the first unacked
message
                     //and go over each message to add up the size
                     Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
                             new MessageOrderCursor(head.getFirst()));
 
+                    final boolean contiguousRange = messageSequences.size() == 1;
                     while (iterator.hasNext()) {
                         Entry<Long, MessageKeys> entry = iterator.next();
-                        locationSize += entry.getValue().location.getSize();
+                        //Verify sequence contains the key
+                        //if contiguous we just add all starting with the first but if not
+                        //we need to check if the id is part of the range - could happen
if individual ack mode was used
+                        if (contiguousRange || messageSequences.contains(entry.getKey()))
{
+                            locationSize += entry.getValue().location.getSize();
+                        }
                     }
                 }
             }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
index a44e8c0..519648e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -209,6 +209,12 @@ public class KahaDBDurableMessageRecoveryTest {
         // Verify there are 8 messages left still and restart broker
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1",
"sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1",
"sub2"), 3000, 500));
+
+        //Verify the pending size is less for sub1
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic,
"clientId1", "sub2"));
+
         subscriber1.close();
         subscriber2.close();
         restartBroker(recoverIndex);
@@ -217,6 +223,11 @@ public class KahaDBDurableMessageRecoveryTest {
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1",
"sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1",
"sub2"), 3000, 500));
 
+        //Verify the pending size is less for sub1
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic,
"clientId1", "sub2"));
+
         // Recreate subscriber and try and receive the other 8 messages
         session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
         subscriber1 = session.createDurableSubscriber(topic, "sub1");
@@ -347,4 +358,10 @@ public class KahaDBDurableMessageRecoveryTest {
         final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
         return store.getMessageCount(clientId, subId);
     }
+
+    protected long getPendingMessageSize(ActiveMQTopic topic, String clientId, String subId)
throws Exception {
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
+        return store.getMessageSize(clientId, subId);
+    }
 }


Mime
View raw message