activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6097
Date Fri, 18 Dec 2015 19:46:42 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 9ee92a1e1 -> 246ccb8e0


https://issues.apache.org/jira/browse/AMQ-6097

Fixing KahaDB so that the correct marshaller is used for the message
keys inside of the message order index.  This will ensure that message
size metrics are accurate.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/246ccb8e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/246ccb8e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/246ccb8e

Branch: refs/heads/master
Commit: 246ccb8e04515c6e85544b32035537f60d92b5b9
Parents: 9ee92a1
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Fri Dec 18 13:51:37 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Fri Dec 18 19:26:24 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 22 ++++++----
 .../AbstractKahaDBMessageStoreSizeTest.java     |  9 ++++
 .../kahadb/KahaDBMessageStoreSizeTest.java      | 44 ++++++++++++++++++++
 3 files changed, 68 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/246ccb8e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 6a98cce..5c0801b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1861,18 +1861,18 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         }
     }
 
-    static protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys>
{
-        static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller();
+    protected class MessageKeysMarshaller extends VariableMarshaller<MessageKeys> {
+        final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller();
 
         @Override
         public MessageKeys readPayload(DataInput dataIn) throws IOException {
-            return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn));
+            return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn));
         }
 
         @Override
         public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException
{
             dataOut.writeUTF(object.messageId);
-            LocationMarshaller.INSTANCE.writePayload(object.location, dataOut);
+            locationSizeMarshaller.writePayload(object.location, dataOut);
         }
     }
 
@@ -2178,6 +2178,13 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                 // modify so it is upgraded
                 rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
             }
+            //upgrade the order index
+            for (Iterator<Entry<Long, MessageKeys>> iterator = rc.orderIndex.iterator(tx);
iterator.hasNext(); ) {
+                Entry<Long, MessageKeys> entry = iterator.next();
+                //call get so that the last priority is updated
+                rc.orderIndex.get(tx, entry.getKey());
+                rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue());
+            }
         }
 
         // If it was a topic...
@@ -3062,6 +3069,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         Long lastLowKey;
         byte lastGetPriority;
         final List<Long> pendingAdditions = new LinkedList<Long>();
+        final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
 
         MessageKeys remove(Transaction tx, Long key) throws IOException {
             MessageKeys result = defaultPriorityIndex.remove(tx, key);
@@ -3076,13 +3084,13 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
 
         void load(Transaction tx) throws IOException {
             defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller);
             defaultPriorityIndex.load(tx);
             lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
             lowPriorityIndex.load(tx);
             highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
-            highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
+            highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
             highPriorityIndex.load(tx);
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/246ccb8e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
index 7d53cbd..5ba75fd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
@@ -20,10 +20,19 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 
+import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.store.AbstractMessageStoreSizeTest;
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.MessageStoreStatistics;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.filefilter.TrueFileFilter;

http://git-wip-us.apache.org/repos/asf/activemq/blob/246ccb8e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
index 43dc2f6..4513856 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
@@ -16,9 +16,18 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import static org.junit.Assert.assertEquals;
+
 import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map.Entry;
 
 import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.junit.Test;
 
 /**
  * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
@@ -39,6 +48,41 @@ public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTe
         messageStore.start();
     }
 
+    /**
+     * Make sure that the sizes stored in the KahaDB location index are the same as in
+     * the message order index.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLocationIndexMatchesOrderIndex() throws Exception {
+        final KahaDBStore kahaDbStore = (KahaDBStore) store;
+        writeMessages();
+
+        //Iterate over the order index and add up the size of the messages to compare
+        //to the location index
+        kahaDbStore.indexLock.readLock().lock();
+        try {
+            long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure<Long,
IOException>() {
+                @Override
+                public Long execute(Transaction tx) throws IOException {
+                    long size = 0;
+
+                    // Iterate through all index entries to get the size of each message
+                    StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(destination),
tx);
+                    for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx);
iterator.hasNext();) {
+                        size += iterator.next().getValue().location.getSize();
+                    }
+                   return size;
+                }
+            });
+            assertEquals("Order index size values don't match message size",
+                    size, messageStore.getMessageSize());
+        } finally {
+            kahaDbStore.indexLock.readLock().unlock();
+        }
+    }
+
     @Override
     protected String getVersion5Dir() {
         return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";


Mime
View raw message