Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F896181F4 for ; Tue, 8 Dec 2015 20:12:13 +0000 (UTC) Received: (qmail 43619 invoked by uid 500); 8 Dec 2015 20:12:13 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 43578 invoked by uid 500); 8 Dec 2015 20:12:13 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 43568 invoked by uid 99); 8 Dec 2015 20:12:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 Dec 2015 20:12:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EB440E0593; Tue, 8 Dec 2015 20:12:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cshannon@apache.org To: commits@activemq.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: activemq git commit: https://issues.apache.org/jira/browse/AMQ-6082 Date: Tue, 8 Dec 2015 20:12:12 +0000 (UTC) Repository: activemq Updated Branches: refs/heads/master 5772e7bed -> 7a7c70ad7 https://issues.apache.org/jira/browse/AMQ-6082 Propertly re-setting the storeOpenWireVersion from the BrokerService on the KahaDB Metadata if a corrupted index is detected and the Metadata has to be recreated. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a7c70ad Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a7c70ad Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a7c70ad Branch: refs/heads/master Commit: 7a7c70ad7524594339c4388b897fa1eac6988928 Parents: 5772e7b Author: Christopher L. Shannon (cshannon) Authored: Tue Dec 8 20:10:11 2015 +0000 Committer: Christopher L. Shannon (cshannon) Committed: Tue Dec 8 20:10:11 2015 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/KahaDBStore.java | 12 +++- .../activemq/store/kahadb/MessageDatabase.java | 5 ++ .../kahadb/KahaDBStoreOpenWireVersionTest.java | 65 ++++++++++++++++++++ 3 files changed, 81 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/7a7c70ad/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 3e0968e..fa4672b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -66,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TransactionIdTransformer; import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand; import org.apache.activemq.store.kahadb.data.KahaDestination; import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType; @@ -192,8 +193,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { this.maxAsyncJobs = maxAsyncJobs; } + @Override - public void doStart() throws Exception { + protected void configureMetadata() { if (brokerService != null) { metadata.openwireVersion = brokerService.getStoreOpenWireVersion(); wireFormat.setVersion(metadata.openwireVersion); @@ -203,6 +205,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } } + } + + @Override + public void doStart() throws Exception { + //configure the metadata before start, right now + //this is just the open wire version + configureMetadata(); + super.doStart(); if (brokerService != null) { http://git-wip-us.apache.org/repos/asf/activemq/blob/7a7c70ad/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index cd9067d..936e047 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -399,6 +399,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe pageFile.delete(); } metadata = createMetadata(); + //The metadata was recreated after a detect corruption so we need to + //reconfigure anything that was configured on the old metadata on startup + configureMetadata(); pageFile = null; loadPageFile(); } @@ -2727,6 +2730,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return md; } + protected abstract void configureMetadata(); + public int getJournalMaxWriteBatchSize() { return journalMaxWriteBatchSize; } http://git-wip-us.apache.org/repos/asf/activemq/blob/7a7c70ad/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java index 5b272db..e59767a 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java @@ -20,6 +20,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.Iterator; +import java.util.Map.Entry; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -34,7 +39,15 @@ import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.kahadb.MessageDatabase.Metadata; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.store.kahadb.disk.journal.Journal; +import org.apache.activemq.store.kahadb.disk.page.PageFile; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.RecoverableRandomAccessFile; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -61,6 +74,8 @@ public class KahaDBStoreOpenWireVersionTest { broker.setAdvisorySupport(false); broker.setDataDirectory(storeDir); broker.setStoreOpenWireVersion(storeOpenWireVersion); + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruptJournalFiles(true); + broker.start(); broker.waitUntilStarted(); @@ -119,6 +134,56 @@ public class KahaDBStoreOpenWireVersionTest { doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION); } + /** + * This test shows that a corrupted index/rebuild will still + * honor the storeOpenWireVersion set on the BrokerService. + * This wasn't the case before AMQ-6082 + */ + @Test(timeout = 60000) + public void testStoreVersionCorrupt() throws Exception { + final int create = 6; + final int reload = 6; + + createBroker(create); + populateStore(); + + //blow up the index so it has to be recreated + corruptIndex(); + stopBroker(); + + createBroker(reload); + assertEquals(create, broker.getStoreOpenWireVersion()); + assertStoreIsUsable(); + } + + + private void corruptIndex() throws IOException { + KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore(); + final PageFile pageFile = store.getPageFile(); + final Metadata metadata = store.metadata; + + //blow up the index + try { + store.indexLock.writeLock().lock(); + pageFile.tx().execute(new Transaction.Closure() { + @Override + public void execute(Transaction tx) throws IOException { + for (Iterator> iterator = metadata.destinations.iterator(tx); iterator + .hasNext();) { + Entry entry = iterator.next(); + entry.getValue().orderIndex.nextMessageId = -100; + entry.getValue().orderIndex.defaultPriorityIndex.clear(tx); + entry.getValue().orderIndex.lowPriorityIndex.clear(tx); + entry.getValue().orderIndex.highPriorityIndex.clear(tx); + entry.getValue().messageReferences.clear(); + } + } + }); + } finally { + store.indexLock.writeLock().unlock(); + } + } + private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception { createBroker(create); populateStore();