activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cshan...@apache.org
Subject activemq git commit: https://issues.apache.org/jira/browse/AMQ-6082
Date Tue, 08 Dec 2015 20:12:12 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 5772e7bed -> 7a7c70ad7


https://issues.apache.org/jira/browse/AMQ-6082

Propertly re-setting the storeOpenWireVersion from the BrokerService
on the KahaDB Metadata if a corrupted index is detected and the
Metadata has to be recreated.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7a7c70ad
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7a7c70ad
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7a7c70ad

Branch: refs/heads/master
Commit: 7a7c70ad7524594339c4388b897fa1eac6988928
Parents: 5772e7b
Author: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Authored: Tue Dec 8 20:10:11 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shannon@gmail.com>
Committed: Tue Dec 8 20:10:11 2015 +0000

----------------------------------------------------------------------
 .../activemq/store/kahadb/KahaDBStore.java      | 12 +++-
 .../activemq/store/kahadb/MessageDatabase.java  |  5 ++
 .../kahadb/KahaDBStoreOpenWireVersionTest.java  | 65 ++++++++++++++++++++
 3 files changed, 81 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7a7c70ad/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index 3e0968e..fa4672b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -66,6 +66,7 @@ import org.apache.activemq.store.PersistenceAdapter;
 import org.apache.activemq.store.TopicMessageStore;
 import org.apache.activemq.store.TransactionIdTransformer;
 import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
 import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
 import org.apache.activemq.store.kahadb.data.KahaDestination;
 import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
@@ -192,8 +193,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
         this.maxAsyncJobs = maxAsyncJobs;
     }
 
+
     @Override
-    public void doStart() throws Exception {
+    protected void configureMetadata() {
         if (brokerService != null) {
             metadata.openwireVersion = brokerService.getStoreOpenWireVersion();
             wireFormat.setVersion(metadata.openwireVersion);
@@ -203,6 +205,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter
{
             }
 
         }
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        //configure the metadata before start, right now
+        //this is just the open wire version
+        configureMetadata();
+
         super.doStart();
 
         if (brokerService != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/7a7c70ad/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index cd9067d..936e047 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -399,6 +399,9 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                     pageFile.delete();
                 }
                 metadata = createMetadata();
+                //The metadata was recreated after a detect corruption so we need to
+                //reconfigure anything that was configured on the old metadata on startup
+                configureMetadata();
                 pageFile = null;
                 loadPageFile();
             }
@@ -2727,6 +2730,8 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         return md;
     }
 
+    protected abstract void configureMetadata();
+
     public int getJournalMaxWriteBatchSize() {
         return journalMaxWriteBatchSize;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/7a7c70ad/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
index 5b272db..e59767a 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java
@@ -20,6 +20,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map.Entry;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -34,7 +39,15 @@ import javax.jms.Topic;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.kahadb.MessageDatabase.Metadata;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.disk.journal.DataFile;
+import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.page.PageFile;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -61,6 +74,8 @@ public class KahaDBStoreOpenWireVersionTest {
         broker.setAdvisorySupport(false);
         broker.setDataDirectory(storeDir);
         broker.setStoreOpenWireVersion(storeOpenWireVersion);
+        ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).setCheckForCorruptJournalFiles(true);
+
         broker.start();
         broker.waitUntilStarted();
 
@@ -119,6 +134,56 @@ public class KahaDBStoreOpenWireVersionTest {
         doTestStoreVersionConfigrationOverrides(INITIAL_STORE_VERSION, RELOAD_STORE_VERSION);
     }
 
+    /**
+     * This test shows that a corrupted index/rebuild will still
+     * honor the storeOpenWireVersion set on the BrokerService.
+     * This wasn't the case before AMQ-6082
+     */
+    @Test(timeout = 60000)
+    public void testStoreVersionCorrupt() throws Exception {
+        final int create = 6;
+        final int reload = 6;
+
+        createBroker(create);
+        populateStore();
+
+        //blow up the index so it has to be recreated
+        corruptIndex();
+        stopBroker();
+
+        createBroker(reload);
+        assertEquals(create, broker.getStoreOpenWireVersion());
+        assertStoreIsUsable();
+    }
+
+
+    private void corruptIndex() throws IOException {
+        KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
+        final PageFile pageFile = store.getPageFile();
+        final Metadata metadata = store.metadata;
+
+        //blow up the index
+        try {
+            store.indexLock.writeLock().lock();
+            pageFile.tx().execute(new Transaction.Closure<IOException>() {
+                @Override
+                public void execute(Transaction tx) throws IOException {
+                    for (Iterator<Entry<String, StoredDestination>> iterator
= metadata.destinations.iterator(tx); iterator
+                            .hasNext();) {
+                        Entry<String, StoredDestination> entry = iterator.next();
+                        entry.getValue().orderIndex.nextMessageId = -100;
+                        entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
+                        entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
+                        entry.getValue().orderIndex.highPriorityIndex.clear(tx);
+                        entry.getValue().messageReferences.clear();
+                    }
+                }
+            });
+        } finally {
+            store.indexLock.writeLock().unlock();
+        }
+    }
+
     private void doTestStoreVersionConfigrationOverrides(int create, int reload) throws Exception
{
         createBroker(create);
         populateStore();


Mime
View raw message