activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [activemq] branch master updated: AMQ-7136 - Improve recovery of durable subscription metrics in KahaDB
Date Tue, 15 Jan 2019 19:24:09 GMT
This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new c371445  AMQ-7136 - Improve recovery of durable subscription metrics in KahaDB
c371445 is described below

commit c3714457f11633231a6e925f09028686db04e423
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
AuthorDate: Tue Jan 15 14:10:11 2019 -0500

    AMQ-7136 - Improve recovery of durable subscription metrics in KahaDB
    
    Updated metrics recovery to only have to iterate over the order index 1
    time to recovery the pending metrics for the subscriptions instead of
    making a pass over the index once per subscription
---
 .../apache/activemq/store/kahadb/KahaDBStore.java  | 20 ++++++--
 .../activemq/store/kahadb/MessageDatabase.java     | 56 +++++++++++++++++++++-
 .../kahadb/KahaDBDurableMessageRecoveryTest.java   | 30 +++++++-----
 3 files changed, 87 insertions(+), 19 deletions(-)

diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 37f3b90..47285ea 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.broker.ConnectionContext;
@@ -1050,7 +1051,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
 
         protected void recoverMessageStoreSubMetrics() throws IOException {
             if (isEnableSubscriptionStatistics()) {
-
                 final MessageStoreSubscriptionStatistics statistics = getMessageStoreSubStatistics();
                 indexLock.writeLock().lock();
                 try {
@@ -1058,19 +1058,29 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
                         @Override
                         public void execute(Transaction tx) throws IOException {
                             StoredDestination sd = getStoredDestination(dest, tx);
+
+                            List<String> subscriptionKeys = new ArrayList<>();
                             for (Iterator<Entry<String, KahaSubscriptionCommand>>
iterator = sd.subscriptions
                                     .iterator(tx); iterator.hasNext();) {
                                 Entry<String, KahaSubscriptionCommand> entry = iterator.next();
 
-                                String subscriptionKey = entry.getKey();
-                                LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
+                                final String subscriptionKey = entry.getKey();
+                                final LastAck cursorPos = getLastAck(tx, sd, subscriptionKey);
                                 if (cursorPos != null) {
-                                    long size = getStoredMessageSize(tx, sd, subscriptionKey);
+                                    //add the subscriptions to a list for recovering pending
sizes below
+                                    subscriptionKeys.add(subscriptionKey);
+                                    //recover just the count here as that is fast
                                     statistics.getMessageCount(subscriptionKey)
                                             .setCount(getStoredMessageCount(tx, sd, subscriptionKey));
-                                    statistics.getMessageSize(subscriptionKey).addSize(size
> 0 ? size : 0);
                                 }
                             }
+
+                            //Recover the message sizes for each subscription by iterating
only 1 time over the order index
+                            //to speed up recovery
+                            final Map<String, AtomicLong> subPendingMessageSizes =
getStoredMessageSize(tx, sd, subscriptionKeys);
+                            subPendingMessageSizes.forEach((k,v) -> {
+                                statistics.getMessageSize(k).addSize(v.get() > 0 ? v.get()
: 0);
+                            });
                         }
                     });
                 } finally {
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 78d2bfa..8d0693d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -46,7 +46,6 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -3154,6 +3153,61 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         return 0;
     }
 
+    /**
+     * Recovers durable subscription pending message size with only 1 pass over the order
index on recovery
+     * instead of iterating over the index once per subscription
+     *
+     * @param tx
+     * @param sd
+     * @param subscriptionKeys
+     * @return
+     * @throws IOException
+     */
+    protected Map<String, AtomicLong> getStoredMessageSize(Transaction tx, StoredDestination
sd, List<String> subscriptionKeys) throws IOException {
+
+        final Map<String, AtomicLong> subPendingMessageSizes = new HashMap<>();
+        final Map<String, SequenceSet> messageSequencesMap = new HashMap<>();
+
+        if (sd.ackPositions != null) {
+            Long recoveryPosition = null;
+            //Go through each subscription and find matching ackPositions and their first
+            //position to find the initial recovery position which is the first message across
all subs
+            //that needs to still be acked
+            for (String subscriptionKey : subscriptionKeys) {
+                subPendingMessageSizes.put(subscriptionKey, new AtomicLong());
+                final SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey);
+                if (messageSequences != null && !messageSequences.isEmpty()) {
+                    final long head = messageSequences.getHead().getFirst();
+                    recoveryPosition = recoveryPosition != null ? Math.min(recoveryPosition,
head) : head;
+                    //cache the SequenceSet to speed up recovery of metrics below and avoid
a second index hit
+                    messageSequencesMap.put(subscriptionKey, messageSequences);
+                }
+            }
+            recoveryPosition = recoveryPosition != null ? recoveryPosition : 0;
+
+            final Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx,
+                    new MessageOrderCursor(recoveryPosition));
+
+            //iterate through all messages starting at the recovery position to recover metrics
+            while (iterator.hasNext()) {
+                final Entry<Long, MessageKeys> messageEntry = iterator.next();
+
+                //For each message in the index check if each subscription needs to ack the
message still
+                //if the ackPositions SequenceSet contains the message then it has not been
acked and should be
+                //added to the pending metrics for that subscription
+                for (Entry<String, SequenceSet> seqEntry : messageSequencesMap.entrySet())
{
+                    final String subscriptionKey = seqEntry.getKey();
+                    final SequenceSet messageSequences = messageSequencesMap.get(subscriptionKey);
+                    if (messageSequences.contains(messageEntry.getKey())) {
+                        subPendingMessageSizes.get(subscriptionKey).addAndGet(messageEntry.getValue().location.getSize());
+                    }
+                }
+            }
+        }
+
+        return subPendingMessageSizes;
+    }
+
     protected long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey)
throws IOException {
         long locationSize = 0;
 
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
index 519648e..cf22b27 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -55,9 +55,9 @@ import org.junit.runners.Parameterized.Parameters;
 @RunWith(Parameterized.class)
 public class KahaDBDurableMessageRecoveryTest {
 
-    @Parameters(name = "{0}")
+    @Parameters(name = "recoverIndex={0},enableSubscriptionStats={1}")
     public static Collection<Object[]> data() {
-        return Arrays.asList(new Object[][] { { false }, { true } });
+        return Arrays.asList(new Object[][] { { false, false }, { false, true }, { true,
false }, { true, true } });
     }
 
     @Rule
@@ -66,6 +66,7 @@ public class KahaDBDurableMessageRecoveryTest {
     private URI brokerConnectURI;
 
     private boolean recoverIndex;
+    private boolean enableSubscriptionStats;
 
     @Before
     public void setUpBroker() throws Exception {
@@ -81,9 +82,10 @@ public class KahaDBDurableMessageRecoveryTest {
     /**
      * @param deleteIndex
      */
-    public KahaDBDurableMessageRecoveryTest(boolean recoverIndex) {
+    public KahaDBDurableMessageRecoveryTest(boolean recoverIndex, boolean enableSubscriptionStats)
{
         super();
         this.recoverIndex = recoverIndex;
+        this.enableSubscriptionStats = enableSubscriptionStats;
     }
 
     protected void startBroker(boolean recoverIndex) throws Exception {
@@ -105,6 +107,7 @@ public class KahaDBDurableMessageRecoveryTest {
         KahaDBPersistenceAdapter adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter();
 
         adapter.setForceRecoverIndex(forceRecoverIndex);
+        adapter.setEnableSubscriptionStatistics(enableSubscriptionStats);
 
         // set smaller size for test
         adapter.setJournalMaxFileLength(1024 * 20);
@@ -210,10 +213,12 @@ public class KahaDBDurableMessageRecoveryTest {
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1",
"sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1",
"sub2"), 3000, 500));
 
-        //Verify the pending size is less for sub1
-        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
-        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
-        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic,
"clientId1", "sub2"));
+        // Verify the pending size is less for sub1
+        final long sub1PendingSizeBeforeRestart = getPendingMessageSize(topic, "clientId1",
"sub1");
+        final long sub2PendingSizeBeforeRestart = getPendingMessageSize(topic, "clientId1",
"sub2");
+        assertTrue(sub1PendingSizeBeforeRestart > 0);
+        assertTrue(sub2PendingSizeBeforeRestart > 0);
+        assertTrue(sub1PendingSizeBeforeRestart < sub2PendingSizeBeforeRestart);
 
         subscriber1.close();
         subscriber2.close();
@@ -223,10 +228,9 @@ public class KahaDBDurableMessageRecoveryTest {
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, "clientId1",
"sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, "clientId1",
"sub2"), 3000, 500));
 
-        //Verify the pending size is less for sub1
-        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
-        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
-        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < getPendingMessageSize(topic,
"clientId1", "sub2"));
+        // Verify the pending size is less for sub1
+        assertEquals(sub1PendingSizeBeforeRestart, getPendingMessageSize(topic, "clientId1",
"sub1"));
+        assertEquals(sub2PendingSizeBeforeRestart, getPendingMessageSize(topic, "clientId1",
"sub2"));
 
         // Recreate subscriber and try and receive the other 8 messages
         session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
@@ -293,7 +297,7 @@ public class KahaDBDurableMessageRecoveryTest {
         subscriber2.close();
         restartBroker(recoverIndex);
 
-        //Manually recover subscription and verify proper messages are loaded
+        // Manually recover subscription and verify proper messages are loaded
         final Topic brokerTopic = (Topic) broker.getDestination(topic);
         final TopicMessageStore store = (TopicMessageStore) brokerTopic.getMessageStore();
         final AtomicInteger sub1Recovered = new AtomicInteger();
@@ -348,7 +352,7 @@ public class KahaDBDurableMessageRecoveryTest {
             }
         });
 
-        //Verify proper number of messages are recovered
+        // Verify proper number of messages are recovered
         assertEquals(8, sub1Recovered.get());
         assertEquals(10, sub2Recovered.get());
     }


Mime
View raw message