activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject [2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6083 - ignoreMissingJournalfiles false - check for missing acks and corruption anywhere and error out so that corruption does not go unnoticed - fix and test
Date Wed, 09 Dec 2015 14:21:12 GMT
https://issues.apache.org/jira/browse/AMQ-6083 - ignoreMissingJournalfiles false - check for
missing acks and corruption anywhere and error out so that corruption does not go unnoticed
- fix and test

(cherry picked from commit 5db5f3e39a682640b64bc904f30cbd2e0e3de4da)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/99fce5ba
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/99fce5ba
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/99fce5ba

Branch: refs/heads/activemq-5.13.x
Commit: 99fce5bae986fdd70af0b1af9f24abc6dc4a2b07
Parents: 0952866
Author: gtully <gary.tully@gmail.com>
Authored: Wed Dec 9 12:30:03 2015 +0000
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Wed Dec 9 09:20:48 2015 -0500

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  | 45 ++++++++--
 .../store/kahadb/disk/index/BTreeVisitor.java   |  2 +-
 .../JournalCorruptionEofIndexRecoveryTest.java  | 32 ++++++-
 .../org/apache/activemq/bugs/AMQ2832Test.java   | 87 +++++++++++++-------
 .../org/apache/activemq/bugs/AMQ4212Test.java   |  2 +-
 5 files changed, 130 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/99fce5ba/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 936e047..2e17a69 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -825,14 +825,21 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
         while (!ss.isEmpty()) {
             missingJournalFiles.add((int) ss.removeFirst());
         }
+
+        for (Entry<Integer, Set<Integer>> entry : metadata.ackMessageFileMap.entrySet())
{
+            missingJournalFiles.add(entry.getKey());
+            for (Integer i : entry.getValue()) {
+                missingJournalFiles.add(i);
+            }
+        }
+
         missingJournalFiles.removeAll(journal.getFileMap().keySet());
 
         if (!missingJournalFiles.isEmpty()) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("Some journal files are missing: " + missingJournalFiles);
-            }
+            LOG.warn("Some journal files are missing: " + missingJournalFiles);
         }
 
+        ArrayList<BTreeVisitor.Predicate<Location>> knownCorruption = new ArrayList<BTreeVisitor.Predicate<Location>>();
         ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
         for (Integer missing : missingJournalFiles) {
             missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(missing, 0), new Location(missing + 1, 0)));
@@ -842,10 +849,13 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             Collection<DataFile> dataFiles = journal.getFileMap().values();
             for (DataFile dataFile : dataFiles) {
                 int id = dataFile.getDataFileId();
+                // eof to next file id
                 missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id, dataFile.getLength()), new Location(id + 1, 0)));
                 Sequence seq = dataFile.getCorruptedBlocks().getHead();
                 while (seq != null) {
-                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1)));
+                    BTreeVisitor.BetweenVisitor visitor = new BTreeVisitor.BetweenVisitor<Location,
Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
+                    missingPredicates.add(visitor);
+                    knownCorruption.add(visitor);
                     seq = seq.getNext();
                 }
             }
@@ -862,7 +872,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                     }
                 });
 
-                // If somes message references are affected by the missing data files...
+                // If some message references are affected by the missing data files...
                 if (!matches.isEmpty()) {
 
                     // We either 'gracefully' recover dropping the missing messages or
@@ -879,12 +889,25 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                             // TODO: do we need to modify the ack positions for the pub sub
case?
                         }
                     } else {
-                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+"
messages affected.");
+                        LOG.error("[" + sdEntry.getKey() + "] references corrupt locations.
" + matches.size() + " messages affected.");
+                        throw new IOException("Detected missing/corrupt journal files referenced
by:[" + sdEntry.getKey() + "] " +matches.size()+" messages affected.");
                     }
                 }
             }
         }
 
+        if (!ignoreMissingJournalfiles) {
+            if (!knownCorruption.isEmpty()) {
+                LOG.error("Detected corrupt journal files. " + knownCorruption);
+                throw new IOException("Detected corrupt journal files. " + knownCorruption);
+            }
+
+            if (!missingJournalFiles.isEmpty()) {
+                LOG.error("Detected missing journal files. " + missingJournalFiles);
+                throw new IOException("Detected missing journal files. " + missingJournalFiles);
+            }
+        }
+
         if( undoCounter > 0 ) {
             // The rolledback operations are basically in flight journal writes.  To avoid
getting these the end user
             // should do sync writes to the journal.
@@ -1714,6 +1737,7 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
             // check we are not deleting file with ack for in-use journal files
             if (LOG.isTraceEnabled()) {
                 LOG.trace("gc candidates: " + gcCandidateSet);
+                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
             }
             Iterator<Integer> candidates = gcCandidateSet.iterator();
             while (candidates.hasNext()) {
@@ -1743,6 +1767,15 @@ public abstract class MessageDatabase extends ServiceSupport implements
BrokerSe
                     LOG.debug("Cleanup removing the data files: " + gcCandidateSet);
                 }
                 journal.removeDataFiles(gcCandidateSet);
+                boolean ackMessageFileMapMod = false;
+                for (Integer candidate : gcCandidateSet) {
+                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values())
{
+                        ackMessageFileMapMod |= ackFiles.remove(candidate);
+                    }
+                }
+                if (ackMessageFileMapMod) {
+                    checkpointUpdate(tx, false);
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/99fce5ba/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeVisitor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeVisitor.java
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeVisitor.java
index 6030e61..768de68 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeVisitor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/index/BTreeVisitor.java
@@ -166,7 +166,7 @@ public interface BTreeVisitor<Key,Value> {
 
         @Override
         public String toString() {
-            return first+" <= key < "+last;
+            return first+" >= key < "+last;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/99fce5ba/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
index a39496d..07e5e2e 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.store.kahadb;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 import java.io.File;
 import java.io.IOException;
@@ -40,6 +42,7 @@ import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +56,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
     private BrokerService broker = null;
     private String connectionUri;
     private KahaDBPersistenceAdapter adapter;
+    private boolean ignoreMissingJournalFiles = false;
 
     private final Destination destination = new ActiveMQQueue("Test");
     private final String KAHADB_DIRECTORY = "target/activemq-data/";
@@ -118,7 +122,7 @@ public class JournalCorruptionEofIndexRecoveryTest {
         adapter.setCleanupInterval(5000);
 
         adapter.setCheckForCorruptJournalFiles(true);
-        adapter.setIgnoreMissingJournalfiles(true);
+        adapter.setIgnoreMissingJournalfiles(ignoreMissingJournalFiles);
 
         adapter.setPreallocationStrategy("zeros");
         adapter.setPreallocationScope("entire_journal");
@@ -132,6 +136,32 @@ public class JournalCorruptionEofIndexRecoveryTest {
         }
     }
 
+    @Before
+    public void reset() throws Exception {
+        ignoreMissingJournalFiles = true;
+    }
+
+    @Test
+    public void testNoRestartOnCorruptJournal() throws Exception {
+        ignoreMissingJournalFiles = false;
+
+        startBroker();
+
+        produceMessagesToConsumeMultipleDataFiles(50);
+
+        int numFiles = getNumberOfJournalFiles();
+
+        assertTrue("more than x files: " + numFiles, numFiles > 2);
+
+        corruptBatchEndEof(3);
+
+        try {
+            restartBroker(true);
+            fail("Expect failure to start with corrupt journal");
+        } catch (Exception expected) {
+        }
+    }
+
     @Test
     public void testRecoveryAfterCorruptionEof() throws Exception {
         startBroker();

http://git-wip-us.apache.org/repos/asf/activemq/blob/99fce5ba/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
index 22ad6ab..1b3a4e5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2832Test.java
@@ -20,7 +20,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.TimeUnit;
@@ -131,35 +133,62 @@ public class AMQ2832Test {
     */
     @Test
     public void testAckChain() throws Exception {
-       startBroker();
-
-       StagedConsumer consumer = new StagedConsumer();
-       // file #1
-       produceMessagesToConsumeMultipleDataFiles(5);
-       // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
-       consumer.receive(3);
-
-       // send messages by consuming and acknowledging every message right after sent in
order to get KahadbAdd and Remove command to be saved together
-       // this is necessary in order to get KahaAddMessageCommand to be saved in one db file
and the corresponding KahaRemoveMessageCommand in the next one
-       produceAndConsumeImmediately(20, consumer);
-       consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
-
-       // now we have 3 files written and started with #4
-       consumer.close();
-
-       broker.stop();
-       broker.waitUntilStopped();
-
-       recoverBroker();
-
-       consumer = new StagedConsumer();
-       Message message = consumer.receive(1);
-       assertNotNull("One message stays unacked from db-1.log", message);
-       message.acknowledge();
-       message = consumer.receive(1);
-       assertNull("There should not be any unconsumed messages any more", message);
-       consumer.close();
-   }
+        startBroker();
+
+        makeAckChain();
+
+        broker.stop();
+        broker.waitUntilStopped();
+
+        recoverBroker();
+
+        StagedConsumer consumer = new StagedConsumer();
+        Message message = consumer.receive(1);
+        assertNotNull("One message stays unacked from db-1.log", message);
+        message.acknowledge();
+        message = consumer.receive(1);
+        assertNull("There should not be any unconsumed messages any more", message);
+        consumer.close();
+    }
+
+    private void makeAckChain() throws Exception {
+        StagedConsumer consumer = new StagedConsumer();
+        // file #1
+        produceMessagesToConsumeMultipleDataFiles(5);
+        // acknowledge first 2 messages and leave the 3rd one unacknowledged blocking db-1.log
+        consumer.receive(3);
+
+        // send messages by consuming and acknowledging every message right after sent in
order to get KahadbAdd and Remove command to be saved together
+        // this is necessary in order to get KahaAddMessageCommand to be saved in one db
file and the corresponding KahaRemoveMessageCommand in the next one
+        produceAndConsumeImmediately(20, consumer);
+        consumer.receive(2).acknowledge(); // consume and ack the last 2 unconsumed
+
+        // now we have 3 files written and started with #4
+        consumer.close();
+    }
+
+    @Test
+    public void testNoRestartOnMissingAckDataFile() throws Exception {
+        startBroker();
+
+        // reuse scenario from previous test
+        makeAckChain();
+
+        File dataDir = broker.getPersistenceAdapter().getDirectory();
+        broker.stop();
+        broker.waitUntilStopped();
+
+        File secondLastDataFile = new File(dataDir, "db-3.log");
+        LOG.info("Whacking data file with acks: " + secondLastDataFile);
+        secondLastDataFile.delete();
+
+        try {
+            doStartBroker(false, false);
+            fail("Expect failure to start with corrupt journal");
+        } catch (IOException expected) {
+        }
+    }
+
 
    private void produceAndConsumeImmediately(int numOfMsgs, StagedConsumer consumer) throws
Exception {
       for (int i = 0; i < numOfMsgs; i++) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/99fce5ba/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
index 141a881..1d34174 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java
@@ -108,7 +108,7 @@ public class AMQ4212Test {
     }
 
     @Test
-    public void testDirableSubPrefetchRecovered() throws Exception {
+    public void testDurableSubPrefetchRecovered() throws Exception {
 
         ActiveMQQueue queue = new ActiveMQQueue("MyQueue");
         ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic");


Mime
View raw message