Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DC3DE18723 for ; Fri, 18 Dec 2015 19:46:42 +0000 (UTC) Received: (qmail 89380 invoked by uid 500); 18 Dec 2015 19:46:42 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 89346 invoked by uid 500); 18 Dec 2015 19:46:42 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 89337 invoked by uid 99); 18 Dec 2015 19:46:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 18 Dec 2015 19:46:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8BDA1E0441; Fri, 18 Dec 2015 19:46:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: <6722d0099ca143b6921a6114ecddbed6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6097 Date: Fri, 18 Dec 2015 19:46:42 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 9ee92a1e1 -> 246ccb8e0 https://issues.apache.org/jira/browse/AMQ-6097 Fixing KahaDB so that the correct marshaller is used for the message keys inside of the message order index. This will ensure that message size metrics are accurate. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/246ccb8e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/246ccb8e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/246ccb8e Branch: refs/heads/master Commit: 246ccb8e04515c6e85544b32035537f60d92b5b9 Parents: 9ee92a1 Author: Christopher L. Shannon (cshannon) Authored: Fri Dec 18 13:51:37 2015 +0000 Committer: Christopher L. Shannon (cshannon) Committed: Fri Dec 18 19:26:24 2015 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 22 ++++++---- .../AbstractKahaDBMessageStoreSizeTest.java | 9 ++++ .../kahadb/KahaDBMessageStoreSizeTest.java | 44 ++++++++++++++++++++ 3 files changed, 68 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/246ccb8e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 6a98cce..5c0801b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1861,18 +1861,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - static protected class MessageKeysMarshaller extends VariableMarshaller { - static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); + protected class MessageKeysMarshaller extends VariableMarshaller { + final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); @Override public MessageKeys readPayload(DataInput dataIn) throws IOException { - return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); + return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); } @Override public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { dataOut.writeUTF(object.messageId); - LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); + locationSizeMarshaller.writePayload(object.location, dataOut); } } @@ -2178,6 +2178,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // modify so it is upgraded rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); } + //upgrade the order index + for (Iterator> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { + Entry entry = iterator.next(); + //call get so that the last priority is updated + rc.orderIndex.get(tx, entry.getKey()); + rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); + } } // If it was a topic... @@ -3062,6 +3069,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Long lastLowKey; byte lastGetPriority; final List pendingAdditions = new LinkedList(); + final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); MessageKeys remove(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.remove(tx, key); @@ -3076,13 +3084,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe void load(Transaction tx) throws IOException { defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); defaultPriorityIndex.load(tx); lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); lowPriorityIndex.load(tx); highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + highPriorityIndex.setValueMarshaller(messageKeysMarshaller); highPriorityIndex.load(tx); } http://git-wip-us.apache.org/repos/asf/activemq/blob/246ccb8e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java index 7d53cbd..5ba75fd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java @@ -20,10 +20,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.store.AbstractMessageStoreSizeTest; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; http://git-wip-us.apache.org/repos/asf/activemq/blob/246ccb8e/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java index 43dc2f6..4513856 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java @@ -16,9 +16,18 @@ */ package org.apache.activemq.store.kahadb; +import static org.junit.Assert.assertEquals; + import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.junit.Test; /** * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly @@ -39,6 +48,41 @@ public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTe messageStore.start(); } + /** + * Make sure that the sizes stored in the KahaDB location index are the same as in + * the message order index. + * + * @throws Exception + */ + @Test + public void testLocationIndexMatchesOrderIndex() throws Exception { + final KahaDBStore kahaDbStore = (KahaDBStore) store; + writeMessages(); + + //Iterate over the order index and add up the size of the messages to compare + //to the location index + kahaDbStore.indexLock.readLock().lock(); + try { + long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure() { + @Override + public Long execute(Transaction tx) throws IOException { + long size = 0; + + // Iterate through all index entries to get the size of each message + StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(destination), tx); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + size += iterator.next().getValue().location.getSize(); + } + return size; + } + }); + assertEquals("Order index size values don't match message size", + size, messageStore.getMessageSize()); + } finally { + kahaDbStore.indexLock.readLock().unlock(); + } + } + @Override protected String getVersion5Dir() { return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";