Author: chirino
Date: Wed Sep 23 16:16:39 2009
New Revision: 818155
URL: http://svn.apache.org/viewvc?rev=818155&view=rev
Log:
AMQ-2405: Adding more robust/graceful disk corruption recovery logic.
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=818155&r1=818154&r2=818155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Wed Sep 23 16:16:39 2009
@@ -349,4 +349,19 @@
this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
}
+ public boolean isChecksumJournalFiles() {
+ return letter.isChecksumJournalFiles();
+ }
+
+ public boolean isCheckForCorruptJournalFiles() {
+ return letter.isCheckForCorruptJournalFiles();
+ }
+
+ public void setChecksumJournalFiles(boolean checksumJournalFiles) {
+ letter.setChecksumJournalFiles(checksumJournalFiles);
+ }
+
+ public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
+ letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=818155&r1=818154&r2=818155&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Sep 23 16:16:39 2009
@@ -22,16 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +51,7 @@
import org.apache.kahadb.index.BTreeVisitor;
import org.apache.kahadb.journal.Journal;
import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.DataFile;
import org.apache.kahadb.page.Page;
import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.page.Transaction;
@@ -151,6 +143,8 @@
private LockFile lockFile;
private boolean ignoreMissingJournalfiles = false;
private int indexCacheSize = 100;
+ private boolean checkForCorruptJournalFiles = false;
+ private boolean checksumJournalFiles = false;
public MessageDatabase() {
}
@@ -406,6 +400,15 @@
}
}
+ long end = System.currentTimeMillis();
+ if( undoCounter > 0 ) {
+ // The rolledback operations are basically in flight journal writes. To avoid getting
these the end user
+ // should do sync writes to the journal.
+ LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end -
start) / 1000.0f) + " seconds.");
+ }
+
+ undoCounter = 0;
+ start = System.currentTimeMillis();
// Lets be extra paranoid here and verify that all the datafiles being referenced
// by the indexes still exists.
@@ -445,40 +448,64 @@
missingJournalFiles.removeAll( journal.getFileMap().keySet() );
if( !missingJournalFiles.isEmpty() ) {
- if( ignoreMissingJournalfiles ) {
+ LOG.info("Some journal files are missing: "+missingJournalFiles);
+ }
- for (StoredDestination sd : storedDestinations.values()) {
+ ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
+ for (Integer missing : missingJournalFiles) {
+ missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(missing,0), new Location(missing+1,0)));
+ }
+
+ if ( checkForCorruptJournalFiles ) {
+ Collection<DataFile> dataFiles = journal.getFileMap().values();
+ for (DataFile dataFile : dataFiles) {
+ int id = dataFile.getDataFileId();
+ missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id,dataFile.getLength()), new Location(id+1,0)));
+ Sequence seq = dataFile.getCorruptedBlocks().getHead();
+ while( seq!=null ) {
+ missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
+ seq = seq.getNext();
+ }
+ }
+ }
- final ArrayList<Long> matches = new ArrayList<Long>();
- for (Integer missing : missingJournalFiles) {
- sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor<Location,
Long>(new Location(missing,0), new Location(missing+1,0)) {
- @Override
- protected void matched(Location key, Long value) {
- matches.add(value);
- }
- });
+ if( !missingPredicates.isEmpty() ) {
+ for (StoredDestination sd : storedDestinations.values()) {
+
+ final ArrayList<Long> matches = new ArrayList<Long>();
+ sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates)
{
+ protected void matched(Location key, Long value) {
+ matches.add(value);
}
+ });
+
+ // If somes message references are affected by the missing data files...
+ if( !matches.isEmpty() ) {
+ // We either 'gracefully' recover dropping the missing messages or
+ // we error out.
+ if( ignoreMissingJournalfiles ) {
+ // Update the index to remove the references to the missing data
+ for (Long sequenceId : matches) {
+ MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+ sd.locationIndex.remove(tx, keys.location);
+ sd.messageIdIndex.remove(tx, keys.messageId);
+ undoCounter++;
+ // TODO: do we need to modify the ack positions for the pub sub
case?
+ }
- for (Long sequenceId : matches) {
- MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
- sd.locationIndex.remove(tx, keys.location);
- sd.messageIdIndex.remove(tx, keys.messageId);
- undoCounter++;
- // TODO: do we need to modify the ack positions for the pub sub case?
+ } else {
+ throw new IOException("Detected missing/corrupt journal files. "+matches.size()+"
messages affected.");
}
}
-
- } else {
- throw new IOException("Detected missing journal files: "+missingJournalFiles);
}
}
-
- long end = System.currentTimeMillis();
+
+ end = System.currentTimeMillis();
if( undoCounter > 0 ) {
// The rolledback operations are basically in flight journal writes. To avoid getting
these the end user
// should do sync writes to the journal.
- LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end
- start) / 1000.0f) + " seconds.");
+ LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages
from the index in " + ((end - start) / 1000.0f) + " seconds.");
}
}
@@ -1339,6 +1366,8 @@
Journal manager = new Journal();
manager.setDirectory(directory);
manager.setMaxFileLength(getJournalMaxFileLength());
+ manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
+ manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
return manager;
}
@@ -1452,4 +1481,20 @@
public void setIndexCacheSize(int indexCacheSize) {
this.indexCacheSize = indexCacheSize;
}
+
+ public boolean isCheckForCorruptJournalFiles() {
+ return checkForCorruptJournalFiles;
+ }
+
+ public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
+ this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
+ }
+
+ public boolean isChecksumJournalFiles() {
+ return checksumJournalFiles;
+ }
+
+ public void setChecksumJournalFiles(boolean checksumJournalFiles) {
+ this.checksumJournalFiles = checksumJournalFiles;
+ }
}
|