activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject [3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6097
Date Tue, 22 Dec 2015 13:14:58 GMT
https://issues.apache.org/jira/browse/AMQ-6097

Fixing KahaDB so that the correct marshaller is used for the message
keys inside of the message order index.  This will ensure that message
size metrics are accurate.

(cherry picked from commit 246ccb8e04515c6e85544b32035537f60d92b5b9)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/51aa70ea
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/51aa70ea
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/51aa70ea

Branch: refs/heads/activemq-5.13.x
Commit: 51aa70eaae69f7fa9941c40d2f31b90c7f5a81ab
Parents: 871f0a6
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Fri Dec 18 13:51:37 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Dec 22 13:14:44 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 22 ++++++----
 .../AbstractKahaDBMessageStoreSizeTest.java     |  9 ++++
 .../kahadb/KahaDBMessageStoreSizeTest.java      | 44 ++++++++++++++++++++
 3 files changed, 68 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/51aa70ea/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 6a98cce..5c0801b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1861,18 +1861,18 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         }
     }
 
-    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys>
{
-        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
+    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
+        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
 
         @Override
         public MessageKeys readPayload(DataInput dataIn) throws IOException {
-            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
+            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
         }
 
         @Override
         public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException
{
             dataOut.writeUTF(object.messageId);
-            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+            locationSizeMarshaller.writePayload(object.location, dataOut);
         }
     }
 
@@ -2178,6 +2178,13 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 // modify so it is upgraded
                 rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
             }
+            //upgrade the order index
+            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx);
iterator.hasNext(); ) {
+                Entry<Long, MessageKeys> entry = iterator.next();
+                //call get so that the last priority is updated
+                rc.orderIndex.get(tx, entry.getKey());
+                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
+            }
         }
 
         // If it was a topic...
@@ -3062,6 +3069,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         Long lastLowKey;
         byte lastGetPriority;
         final List<Long> pendingAdditions = new LinkedList<Long>();
+        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
 
         MessageKeys remove(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.remove(tx, key);
@@ -3076,13 +3084,13 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
 
         void load(Transaction tx) throws IOException {
             defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
             defaultPriorityIndex.load(tx);
             lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
             lowPriorityIndex.load(tx);
             highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
             highPriorityIndex.load(tx);
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/51aa70ea/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
index 7d53cbd..5ba75fd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
@@ -20,10 +20,19 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.store.AbstractMessageStoreSizeTest;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;

http://git-wip-us.apache.org/repos/asf/activemq/blob/51aa70ea/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
index 43dc2f6..4513856 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
@@ -16,9 +16,18 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
 
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.junit.Test;
 
 /**
  * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
@@ -39,6 +48,41 @@ public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTe
         messageStore.start();
     }
 
+    /**
+     * Make sure that the sizes stored in the KahaDB location index are the same as in
+     * the message order index.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLocationIndexMatchesOrderIndex() throws Exception {
+        final KahaDBStore kahaDbStore = (KahaDBStore) store;
+        writeMessages();
+
+        //Iterate over the order index and add up the size of the messages to compare
+        //to the location index
+        kahaDbStore.indexLock.readLock().lock();
+        try {
+            long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure<Long,
IOException>() {
+                @Override
+                public Long execute(Transaction tx) throws IOException {
+                    long size = 0;
+
+                    // Iterate through all index entries to get the size of each message
+                    StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(destination),
tx);
+                    for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator.hasNext();) {
+                        size += iterator.next().getValue().location.getSize();
+                    }
+                   return size;
+                }
+            });
+            assertEquals("Order index size values don't match message size",
+                    size, messageStore.getMessageSize());
+        } finally {
+            kahaDbStore.indexLock.readLock().unlock();
+        }
+    }
+
     @Override
     protected String getVersion5Dir() {
         return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";


Mime
View raw message