activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gtu...@apache.org
Subject activemq git commit: AMQ-6378 force recovery on corrupt metadata location info, relates to AMQ-6376
Date Wed, 27 Jul 2016 14:59:41 GMT
Repository: activemq
Updated Branches:
  refs/heads/master 51b413309 -> 822e2be90


AMQ-6378 force recovery on corrupt metadata location info, relates to AMQ-6376


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/822e2be9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/822e2be9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/822e2be9

Branch: refs/heads/master
Commit: 822e2be90e393a95cbdb998d2407b65e03bdd146
Parents: 51b4133
Author: gtully <gary.tully@gmail.com>
Authored: Wed Jul 27 15:56:28 2016 +0100
Committer: gtully <gary.tully@gmail.com>
Committed: Wed Jul 27 15:56:28 2016 +0100

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  8 ++-
 .../kahadb/disk/journal/DataFileAccessor.java   |  2 +-
 .../JournalCorruptionEofIndexRecoveryTest.java  | 63 ++++++++++++++++++--
 .../JournalCorruptionIndexRecoveryTest.java     | 15 ++---
 .../store/kahadb/JournalFdRecoveryTest.java     | 23 +------
 5 files changed, 73 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/822e2be9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index ef806e3..eb3a5ee 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -751,8 +751,8 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
 
     private Location recoverProducerAudit() throws IOException {
         if (metadata.producerSequenceIdTrackerLocation != null) {
-            KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
             try {
+                KahaProducerAuditCommand audit = (KahaProducerAuditCommand) load(metadata.producerSequenceIdTrackerLocation);
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAudit().newInput());
                 int maxNumProducers = getMaxFailoverProducersToTrack();
                 int maxAuditDepth = getFailoverProducersAuditDepth();
@@ -773,8 +773,8 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
     @SuppressWarnings("unchecked")
     private Location recoverAckMessageFileMap() throws IOException {
         if (metadata.ackMessageFileMapLocation != null) {
-            KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
             try {
+                KahaAckMessageFileMapCommand audit = (KahaAckMessageFileMapCommand) load(metadata.ackMessageFileMapLocation);
                 ObjectInputStream objectIn = new ObjectInputStream(audit.getAckMessageFileMap().newInput());
                 metadata.ackMessageFileMap = (Map<Integer, Set<Integer>>) objectIn.readObject();
                 return getNextInitializedLocation(metadata.ackMessageFileMapLocation);
@@ -3228,6 +3228,10 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         return journal;
     }
 
+    protected Metadata getMetadata() {
+        return metadata;
+    }
+
     public boolean isFailIfDatabaseIsLocked() {
         return failIfDatabaseIsLocked;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/822e2be9/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index de68cf0..71c2195 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -89,7 +89,7 @@ final class DataFileAccessor {
             return new ByteSequence(data, 0, data.length);
 
         } catch (RuntimeException e) {
-            throw new IOException("Invalid location: " + location + ", : " + e, e);
+            throw new IOException("Invalid location: " + location + " : " + e, e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/822e2be9/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index 6cffe3d..221b087 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -28,6 +28,7 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -42,6 +43,7 @@ import org.apache.activemq.store.kahadb.MessageDatabase.MessageKeys;
 import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.store.kahadb.disk.page.Transaction;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
@@ -232,6 +234,48 @@ public class JournalCorruptionEofIndexRecoveryTest {
         assertEquals("Drain", numToSend, drainQueue(numToSend));
     }
 
+
+    @Test
+    public void testRecoveryAfterProducerAuditLocationCorrupt() throws Exception {
+        doTestRecoveryAfterLocationCorrupt(false);
+    }
+
+    @Test
+    public void testRecoveryAfterAckMapLocationCorrupt() throws Exception {
+        doTestRecoveryAfterLocationCorrupt(true);
+    }
+
+    private void doTestRecoveryAfterLocationCorrupt(boolean aOrB) throws Exception {
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFiles = getNumberOfJournalFiles();
+
+        assertTrue("more than x files: " + numFiles, numFiles > 4);
+
+        KahaDBStore store = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
+        store.checkpointCleanup(true);
+        Location toCorrupt = aOrB ? store.getMetadata().ackMessageFileMapLocation : store.getMetadata().producerSequenceIdTrackerLocation;
+        corruptLocation(toCorrupt);
+
+        restartBroker(false, false);
+
+        assertEquals("missing no message", 50, broker.getAdminView().getTotalMessageCount());
+        assertEquals("Drain", 50, drainQueue(50));
+    }
+
+    private void corruptLocation(Location toCorrupt) throws IOException {
+
+        DataFile dataFile = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().get(new
Integer(toCorrupt.getDataFileId()));
+
+        RecoverableRandomAccessFile randomAccessFile = dataFile.openRandomAccessFile();
+
+        randomAccessFile.seek(toCorrupt.getOffset());
+        randomAccessFile.writeInt(3);
+        dataFile.closeRandomAccessFile(randomAccessFile);
+    }
+
     private void corruptBatchCheckSumSplash(int id) throws Exception{
         Collection<DataFile> files =
                 ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values();
@@ -367,19 +411,30 @@ public class JournalCorruptionEofIndexRecoveryTest {
     }
 
     private int drainQueue(int max) throws Exception {
+        return drain(cf, destination, max);
+    }
+
+    public static int drain(ConnectionFactory cf, Destination destination, int max) throws
Exception {
         Connection connection = cf.createConnection();
         connection.start();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(destination);
+        MessageConsumer consumer = null;
         int count = 0;
         try {
-            while (count < max && consumer.receive(5000) != null) {
+            consumer = session.createConsumer(destination);
+            while (count < max && consumer.receive(4000) != null) {
                 count++;
             }
         } catch (JMSException ok) {
         } finally {
-            consumer.close();
-            connection.close();
+            if (consumer != null) {
+                try {
+                    consumer.close();
+                } catch (JMSException ok) {}
+            }
+            try {
+                connection.close();
+            } catch (JMSException ok) {}
         }
         return count;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/822e2be9/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
index 2e34686..325357d 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionIndexRecoveryTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.store.kahadb;
 
+import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -36,6 +37,7 @@ import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.store.kahadb.disk.journal.DataFile;
 import org.apache.activemq.store.kahadb.disk.journal.Journal;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.junit.After;
@@ -265,16 +267,7 @@ public class JournalCorruptionIndexRecoveryTest {
     }
 
     private int drainQueue(int max) throws Exception {
-        Connection connection = cf.createConnection();
-        connection.start();
-        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer = session.createConsumer(destination);
-        int count = 0;
-        while (count < max && consumer.receive(5000) != null) {
-            count++;
-        }
-        consumer.close();
-        connection.close();
-        return count;
+        return drain(cf, destination, max);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/822e2be9/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
index 2d398e2..4121e49 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalFdRecoveryTest.java
@@ -43,6 +43,7 @@ import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
@@ -247,26 +248,8 @@ public class JournalFdRecoveryTest {
     }
 
     private int tryConsume(Destination destination, int numToGet) throws Exception {
-        int got = 0;
-        Connection connection = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri()).createConnection();
-        connection.start();
-        try {
-            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            MessageConsumer consumer = session.createConsumer(destination);
-            for (int i = 0; i < numToGet; i++) {
-                if (consumer.receive(4000) == null) {
-                    // give up on timeout or error
-                    break;
-                }
-                got++;
-
-            }
-        } catch (JMSException ok) {
-        } finally {
-            connection.close();
-        }
-
-        return got;
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+        return  drain(cf, destination, numToGet);
     }
 
     private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception
{


Mime
View raw message