activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6158
Date Thu, 04 Feb 2016 15:18:03 GMT
Repository: activemq
Updated Branches:
  refs/heads/master c17b7fdc7 -> 25ff5699f


https://issues.apache.org/jira/browse/AMQ-6158

Computing messageSize for a durable subscription in KahaDB now runs much
faster (n vs n^2) which is noticable when there are a large number of
pending messages for a durable subscription.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25ff5699
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25ff5699
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25ff5699

Branch: refs/heads/master
Commit: 25ff5699f1fb7c73668d5da2eacb53ad2ef14289
Parents: c17b7fd
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Thu Feb 4 15:16:11 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Thu Feb 4 15:17:49 2016 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 25 ++++++--------
 .../AbstractPendingMessageCursorTest.java       |  6 ++++
 .../cursors/KahaDBPendingMessageCursorTest.java | 36 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/25ff5699/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index c7e1a40..931a18b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -2588,31 +2588,28 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     }
 
     public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
+        //grab the messages attached to this subscription
         SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+
         long locationSize = 0;
         if (messageSequences != null) {
-            Iterator<Long> sequences = messageSequences.iterator();
+            Sequence head = messageSequences.getHead();
+            if (head != null) {
+                //get an iterator over the order index starting at the first unacked message
+                //and go over each message to add up the size
+                Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
+                        new MessageOrderCursor(head.getFirst()));
 
-            while (sequences.hasNext()) {
-                Long sequenceId = sequences.next();
-                //the last item is the next marker
-                if (!sequences.hasNext()) {
-                    break;
-                }
-                Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx);
                 while (iterator.hasNext()) {
-                    Entry<Location, Long> entry = iterator.next();
-                    if (entry.getValue() == sequenceId - 1) {
-                        locationSize += entry.getKey().getSize();
-                        break;
-                    }
-
+                    Entry<Long, MessageKeys> entry = iterator.next();
+                    locationSize += entry.getValue().location.getSize();
                 }
             }
         }
 
         return locationSize;
     }
+
     protected String key(KahaDestination destination) {
         return destination.getType().getNumber() + ":" + destination.getName();
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/25ff5699/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
index f496806..a31f402 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.atomic.AtomicLong;
@@ -276,6 +278,10 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat
         verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
         verifyStoreStats(dest, 200, publishedMessageSize.get());
 
+        //should be equal in this case
+        assertEquals(dest.getDurableTopicSubs().get(subKey).getPendingMessageSize(),
+                dest.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
+
         //consume all messages
         consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/25ff5699/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
index e0921dc..8a2c287 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.activemq.broker.region.cursors;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicLong;
@@ -84,6 +86,10 @@ public class KahaDBPendingMessageCursorTest extends
         verifyPendingStats(topic, subKey, 200, publishedMessageSize.get());
         verifyStoreStats(topic, 200, publishedMessageSize.get());
 
+        //should be equal in this case
+        assertEquals(topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(),
+                topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
+
         // stop, restart broker and publish more messages
         stopBroker();
         this.setUpBroker(false);
@@ -101,6 +107,36 @@ public class KahaDBPendingMessageCursorTest extends
 
     }
 
+    @Test(timeout=60000)
+    public void testMessageSizeTwoDurablesPartialConsumption() throws Exception {
+        AtomicLong publishedMessageSize = new AtomicLong();
+
+        Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection();
+        connection.setClientID("clientId");
+        connection.start();
+
+        SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1");
+        SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2");
+        org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable(
+                connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT);
+
+        //verify the count and size - durable is offline so all 200 should be pending since
none are in prefetch
+        verifyPendingStats(dest, subKey, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        //consume all messages
+        consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize);
+
+        //150 should be left
+        verifyPendingStats(dest, subKey, 150, publishedMessageSize.get());
+
+        //200 should be left
+        verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get());
+        verifyStoreStats(dest, 200, publishedMessageSize.get());
+
+        connection.close();
+    }
+
     /**
      * Test that the the counter restores size and works after restart and more
      * messages are published


Mime
View raw message