Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A3D5118089 for ; Tue, 22 Dec 2015 13:26:45 +0000 (UTC) Received: (qmail 10185 invoked by uid 500); 22 Dec 2015 13:26:23 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 10100 invoked by uid 500); 22 Dec 2015 13:26:22 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 9960 invoked by uid 99); 22 Dec 2015 13:26:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 22 Dec 2015 13:26:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 40DB7E0454; Tue, 22 Dec 2015 13:26:21 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Date: Tue, 22 Dec 2015 13:26:23 -0000 Message-Id: In-Reply-To: <8ff134c780da45e8ab926f5585bc545c@git.apache.org> References: <8ff134c780da45e8ab926f5585bc545c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6097 https://issues.apache.org/jira/browse/AMQ-6097 Fixing KahaDB so that the correct marshaller is used for the message keys inside of the message order index. This will ensure that message size metrics are accurate. (cherry picked from commit 246ccb8e04515c6e85544b32035537f60d92b5b9) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/28f711e5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/28f711e5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/28f711e5 Branch: refs/heads/activemq-5.12.x Commit: 28f711e50e94cda4d1235da8290c148719487e40 Parents: 819e237 Author: Christopher L. Shannon (cshannon) Authored: Fri Dec 18 13:51:37 2015 +0000 Committer: Christopher L. Shannon (cshannon) Committed: Tue Dec 22 13:26:14 2015 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 22 ++++++---- .../AbstractKahaDBMessageStoreSizeTest.java | 9 ++++ .../kahadb/KahaDBMessageStoreSizeTest.java | 44 ++++++++++++++++++++ 3 files changed, 68 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/28f711e5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 3379333..3188cf6 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1820,18 +1820,18 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - static protected class MessageKeysMarshaller extends VariableMarshaller { - static final MessageKeysMarshaller INSTANCE = new MessageKeysMarshaller(); + protected class MessageKeysMarshaller extends VariableMarshaller { + final LocationSizeMarshaller locationSizeMarshaller = new LocationSizeMarshaller(); @Override public MessageKeys readPayload(DataInput dataIn) throws IOException { - return new MessageKeys(dataIn.readUTF(), LocationMarshaller.INSTANCE.readPayload(dataIn)); + return new MessageKeys(dataIn.readUTF(), locationSizeMarshaller.readPayload(dataIn)); } @Override public void writePayload(MessageKeys object, DataOutput dataOut) throws IOException { dataOut.writeUTF(object.messageId); - LocationMarshaller.INSTANCE.writePayload(object.location, dataOut); + locationSizeMarshaller.writePayload(object.location, dataOut); } } @@ -2137,6 +2137,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // modify so it is upgraded rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); } + //upgrade the order index + for (Iterator> iterator = rc.orderIndex.iterator(tx); iterator.hasNext(); ) { + Entry entry = iterator.next(); + //call get so that the last priority is updated + rc.orderIndex.get(tx, entry.getKey()); + rc.orderIndex.put(tx, rc.orderIndex.lastGetPriority(), entry.getKey(), entry.getValue()); + } } // If it was a topic... @@ -2993,6 +3000,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Long lastLowKey; byte lastGetPriority; final List pendingAdditions = new LinkedList(); + final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); MessageKeys remove(Transaction tx, Long key) throws IOException { MessageKeys result = defaultPriorityIndex.remove(tx, key); @@ -3007,13 +3015,13 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe void load(Transaction tx) throws IOException { defaultPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - defaultPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + defaultPriorityIndex.setValueMarshaller(messageKeysMarshaller); defaultPriorityIndex.load(tx); lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + lowPriorityIndex.setValueMarshaller(messageKeysMarshaller); lowPriorityIndex.load(tx); highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE); - highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE); + highPriorityIndex.setValueMarshaller(messageKeysMarshaller); highPriorityIndex.load(tx); } http://git-wip-us.apache.org/repos/asf/activemq/blob/28f711e5/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java index 7d53cbd..5ba75fd 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java @@ -20,10 +20,19 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.Map.Entry; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.store.AbstractMessageStoreSizeTest; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.MessageStoreStatistics; import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.TrueFileFilter; http://git-wip-us.apache.org/repos/asf/activemq/blob/28f711e5/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java index 43dc2f6..4513856 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java @@ -16,9 +16,18 @@ */ package org.apache.activemq.store.kahadb; +import static org.junit.Assert.assertEquals; + import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.Map.Entry; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.junit.Test; /** * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly @@ -39,6 +48,41 @@ public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTe messageStore.start(); } + /** + * Make sure that the sizes stored in the KahaDB location index are the same as in + * the message order index. + * + * @throws Exception + */ + @Test + public void testLocationIndexMatchesOrderIndex() throws Exception { + final KahaDBStore kahaDbStore = (KahaDBStore) store; + writeMessages(); + + //Iterate over the order index and add up the size of the messages to compare + //to the location index + kahaDbStore.indexLock.readLock().lock(); + try { + long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure() { + @Override + public Long execute(Transaction tx) throws IOException { + long size = 0; + + // Iterate through all index entries to get the size of each message + StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(destination), tx); + for (Iterator> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) { + size += iterator.next().getValue().location.getSize(); + } + return size; + } + }); + assertEquals("Order index size values don't match message size", + size, messageStore.getMessageSize()); + } finally { + kahaDbStore.indexLock.readLock().unlock(); + } + } + @Override protected String getVersion5Dir() { return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";