activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r818158 - in /activemq/branches/activemq-5.3: ./ activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Date Wed, 23 Sep 2009 16:21:22 GMT
Author: chirino
Date: Wed Sep 23 16:21:22 2009
New Revision: 818158

URL: http://svn.apache.org/viewvc?rev=818158&view=rev
Log:
Merging http://svn.apache.org/viewvc?view=rev&revision=818155

Modified:
    activemq/branches/activemq-5.3/   (props changed)
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
    activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Propchange: activemq/branches/activemq-5.3/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 23 16:21:22 2009
@@ -1 +1 @@
-/activemq/trunk:816278-816279,816298,818138
+/activemq/trunk:816278-816279,816298,818138,818155

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=818158&r1=818157&r2=818158&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java
Wed Sep 23 16:21:22 2009
@@ -349,4 +349,19 @@
         this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles);
     }
 
+    public boolean isChecksumJournalFiles() {
+        return letter.isChecksumJournalFiles();
+    }
+
+    public boolean isCheckForCorruptJournalFiles() {
+        return letter.isCheckForCorruptJournalFiles();
+    }
+
+    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
+        letter.setChecksumJournalFiles(checksumJournalFiles);
+    }
+
+    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
+        letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles);
+    }
 }

Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=818158&r1=818157&r2=818158&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Wed Sep 23 16:21:22 2009
@@ -22,16 +22,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeMap;
-import java.util.TreeSet;
+import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -60,6 +51,7 @@
 import org.apache.kahadb.index.BTreeVisitor;
 import org.apache.kahadb.journal.Journal;
 import org.apache.kahadb.journal.Location;
+import org.apache.kahadb.journal.DataFile;
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
@@ -151,6 +143,8 @@
     private LockFile lockFile;
     private boolean ignoreMissingJournalfiles = false;
     private int indexCacheSize = 100;
+    private boolean checkForCorruptJournalFiles = false;
+    private boolean checksumJournalFiles = false;
 
     public MessageDatabase() {
     }
@@ -406,6 +400,15 @@
 			}
         }
 
+        long end = System.currentTimeMillis();
+        if( undoCounter > 0 ) {
+        	// The rolledback operations are basically in flight journal writes.  To avoid getting
these the end user
+        	// should do sync writes to the journal.
+	        LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end -
start) / 1000.0f) + " seconds.");
+        }
+
+        undoCounter = 0;
+        start = System.currentTimeMillis();
 
         // Lets be extra paranoid here and verify that all the datafiles being referenced
         // by the indexes still exists.
@@ -445,40 +448,64 @@
         missingJournalFiles.removeAll( journal.getFileMap().keySet() );
 
         if( !missingJournalFiles.isEmpty() ) {
-            if( ignoreMissingJournalfiles ) {
+            LOG.info("Some journal files are missing: "+missingJournalFiles);
+        }
 
-                for (StoredDestination sd : storedDestinations.values()) {
+        ArrayList<BTreeVisitor.Predicate<Location>> missingPredicates = new ArrayList<BTreeVisitor.Predicate<Location>>();
+        for (Integer missing : missingJournalFiles) {
+            missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(missing,0), new Location(missing+1,0)));
+        }
+
+        if ( checkForCorruptJournalFiles ) {
+            Collection<DataFile> dataFiles = journal.getFileMap().values();
+            for (DataFile dataFile : dataFiles) {
+                int id = dataFile.getDataFileId();
+                missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id,dataFile.getLength()), new Location(id+1,0)));
+                Sequence seq = dataFile.getCorruptedBlocks().getHead();
+                while( seq!=null ) {
+                    missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new
Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1)));
+                    seq = seq.getNext();
+                }
+            }
+        }
 
-                    final ArrayList<Long> matches = new ArrayList<Long>();
-                    for (Integer missing : missingJournalFiles) {
-                        sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor<Location,
Long>(new Location(missing,0), new Location(missing+1,0)) {
-                            @Override
-                            protected void matched(Location key, Long value) {
-                                matches.add(value);
-                            }
-                        });
+        if( !missingPredicates.isEmpty() ) {
+            for (StoredDestination sd : storedDestinations.values()) {
+
+                final ArrayList<Long> matches = new ArrayList<Long>();
+                sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor<Location, Long>(missingPredicates)
{
+                    protected void matched(Location key, Long value) {
+                        matches.add(value);
                     }
+                });
+
+                // If somes message references are affected by the missing data files...
+                if( !matches.isEmpty() ) {
 
+                    // We either 'gracefully' recover dropping the missing messages or
+                    // we error out.
+                    if( ignoreMissingJournalfiles ) {
+                        // Update the index to remove the references to the missing data
+                        for (Long sequenceId : matches) {
+                            MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
+                            sd.locationIndex.remove(tx, keys.location);
+                            sd.messageIdIndex.remove(tx, keys.messageId);
+                            undoCounter++;
+                            // TODO: do we need to modify the ack positions for the pub sub
case?
+                        }
 
-                    for (Long sequenceId : matches) {
-                        MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
-                        sd.locationIndex.remove(tx, keys.location);
-                        sd.messageIdIndex.remove(tx, keys.messageId);
-                        undoCounter++;
-                        // TODO: do we need to modify the ack positions for the pub sub case?
+                    } else {
+                        throw new IOException("Detected missing/corrupt journal files. "+matches.size()+"
messages affected.");
                     }
                 }
-                
-            } else {
-                throw new IOException("Detected missing journal files: "+missingJournalFiles);
             }
         }
-
-        long end = System.currentTimeMillis();
+        
+        end = System.currentTimeMillis();
         if( undoCounter > 0 ) {
         	// The rolledback operations are basically in flight journal writes.  To avoid getting
these the end user
         	// should do sync writes to the journal.
-	        LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end
- start) / 1000.0f) + " seconds.");
+	        LOG.info("Detected missing/corrupt journal files.  Dropped " + undoCounter + " messages
from the index in " + ((end - start) / 1000.0f) + " seconds.");
         }
 	}
 
@@ -1339,6 +1366,8 @@
         Journal manager = new Journal();
         manager.setDirectory(directory);
         manager.setMaxFileLength(getJournalMaxFileLength());
+        manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles);
+        manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
         manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
         return manager;
     }
@@ -1452,4 +1481,20 @@
     public void setIndexCacheSize(int indexCacheSize) {
         this.indexCacheSize = indexCacheSize;
     }
+
+    public boolean isCheckForCorruptJournalFiles() {
+        return checkForCorruptJournalFiles;
+    }
+
+    public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) {
+        this.checkForCorruptJournalFiles = checkForCorruptJournalFiles;
+    }
+
+    public boolean isChecksumJournalFiles() {
+        return checksumJournalFiles;
+    }
+
+    public void setChecksumJournalFiles(boolean checksumJournalFiles) {
+        this.checksumJournalFiles = checksumJournalFiles;
+    }
 }



Mime
View raw message