From commits-return-11818-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Sep 23 16:21:54 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 61975 invoked from network); 23 Sep 2009 16:21:54 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 23 Sep 2009 16:21:54 -0000 Received: (qmail 70515 invoked by uid 500); 23 Sep 2009 16:21:54 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 70461 invoked by uid 500); 23 Sep 2009 16:21:54 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 70452 invoked by uid 99); 23 Sep 2009 16:21:54 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Sep 2009 16:21:54 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Sep 2009 16:21:44 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id D990E23888DB; Wed, 23 Sep 2009 16:21:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r818158 - in /activemq/branches/activemq-5.3: ./ activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Date: Wed, 23 Sep 2009 16:21:22 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090923162122.D990E23888DB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Sep 23 16:21:22 2009 New Revision: 818158 URL: http://svn.apache.org/viewvc?rev=818158&view=rev Log: Merging http://svn.apache.org/viewvc?view=rev&revision=818155 Modified: activemq/branches/activemq-5.3/ (props changed) activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Propchange: activemq/branches/activemq-5.3/ ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Wed Sep 23 16:21:22 2009 @@ -1 +1 @@ -/activemq/trunk:816278-816279,816298,818138 +/activemq/trunk:816278-816279,816298,818138,818155 Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java?rev=818158&r1=818157&r2=818158&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java Wed Sep 23 16:21:22 2009 @@ -349,4 +349,19 @@ this.letter.setIgnoreMissingJournalfiles(ignoreMissingJournalfiles); } + public boolean isChecksumJournalFiles() { + return letter.isChecksumJournalFiles(); + } + + public boolean isCheckForCorruptJournalFiles() { + return letter.isCheckForCorruptJournalFiles(); + } + + public void setChecksumJournalFiles(boolean checksumJournalFiles) { + letter.setChecksumJournalFiles(checksumJournalFiles); + } + + public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { + letter.setCheckForCorruptJournalFiles(checkForCorruptJournalFiles); + } } Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=818158&r1=818157&r2=818158&view=diff ============================================================================== --- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original) +++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Wed Sep 23 16:21:22 2009 @@ -22,16 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; +import java.util.*; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; @@ -60,6 +51,7 @@ import org.apache.kahadb.index.BTreeVisitor; import org.apache.kahadb.journal.Journal; import org.apache.kahadb.journal.Location; +import org.apache.kahadb.journal.DataFile; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; @@ -151,6 +143,8 @@ private LockFile lockFile; private boolean ignoreMissingJournalfiles = false; private int indexCacheSize = 100; + private boolean checkForCorruptJournalFiles = false; + private boolean checksumJournalFiles = false; public MessageDatabase() { } @@ -406,6 +400,15 @@ } } + long end = System.currentTimeMillis(); + if( undoCounter > 0 ) { + // The rolledback operations are basically in flight journal writes. To avoid getting these the end user + // should do sync writes to the journal. + LOG.info("Rolled back " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); + } + + undoCounter = 0; + start = System.currentTimeMillis(); // Lets be extra paranoid here and verify that all the datafiles being referenced // by the indexes still exists. @@ -445,40 +448,64 @@ missingJournalFiles.removeAll( journal.getFileMap().keySet() ); if( !missingJournalFiles.isEmpty() ) { - if( ignoreMissingJournalfiles ) { + LOG.info("Some journal files are missing: "+missingJournalFiles); + } - for (StoredDestination sd : storedDestinations.values()) { + ArrayList> missingPredicates = new ArrayList>(); + for (Integer missing : missingJournalFiles) { + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(missing,0), new Location(missing+1,0))); + } + + if ( checkForCorruptJournalFiles ) { + Collection dataFiles = journal.getFileMap().values(); + for (DataFile dataFile : dataFiles) { + int id = dataFile.getDataFileId(); + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id,dataFile.getLength()), new Location(id+1,0))); + Sequence seq = dataFile.getCorruptedBlocks().getHead(); + while( seq!=null ) { + missingPredicates.add(new BTreeVisitor.BetweenVisitor(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast()+1))); + seq = seq.getNext(); + } + } + } - final ArrayList matches = new ArrayList(); - for (Integer missing : missingJournalFiles) { - sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor(new Location(missing,0), new Location(missing+1,0)) { - @Override - protected void matched(Location key, Long value) { - matches.add(value); - } - }); + if( !missingPredicates.isEmpty() ) { + for (StoredDestination sd : storedDestinations.values()) { + + final ArrayList matches = new ArrayList(); + sd.locationIndex.visit(tx, new BTreeVisitor.OrVisitor(missingPredicates) { + protected void matched(Location key, Long value) { + matches.add(value); } + }); + + // If somes message references are affected by the missing data files... + if( !matches.isEmpty() ) { + // We either 'gracefully' recover dropping the missing messages or + // we error out. + if( ignoreMissingJournalfiles ) { + // Update the index to remove the references to the missing data + for (Long sequenceId : matches) { + MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); + sd.locationIndex.remove(tx, keys.location); + sd.messageIdIndex.remove(tx, keys.messageId); + undoCounter++; + // TODO: do we need to modify the ack positions for the pub sub case? + } - for (Long sequenceId : matches) { - MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); - sd.locationIndex.remove(tx, keys.location); - sd.messageIdIndex.remove(tx, keys.messageId); - undoCounter++; - // TODO: do we need to modify the ack positions for the pub sub case? + } else { + throw new IOException("Detected missing/corrupt journal files. "+matches.size()+" messages affected."); } } - - } else { - throw new IOException("Detected missing journal files: "+missingJournalFiles); } } - - long end = System.currentTimeMillis(); + + end = System.currentTimeMillis(); if( undoCounter > 0 ) { // The rolledback operations are basically in flight journal writes. To avoid getting these the end user // should do sync writes to the journal. - LOG.info("Rolled back " + undoCounter + " operations from the index in " + ((end - start) / 1000.0f) + " seconds."); + LOG.info("Detected missing/corrupt journal files. Dropped " + undoCounter + " messages from the index in " + ((end - start) / 1000.0f) + " seconds."); } } @@ -1339,6 +1366,8 @@ Journal manager = new Journal(); manager.setDirectory(directory); manager.setMaxFileLength(getJournalMaxFileLength()); + manager.setCheckForCorruptionOnStartup(checkForCorruptJournalFiles); + manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); return manager; } @@ -1452,4 +1481,20 @@ public void setIndexCacheSize(int indexCacheSize) { this.indexCacheSize = indexCacheSize; } + + public boolean isCheckForCorruptJournalFiles() { + return checkForCorruptJournalFiles; + } + + public void setCheckForCorruptJournalFiles(boolean checkForCorruptJournalFiles) { + this.checkForCorruptJournalFiles = checkForCorruptJournalFiles; + } + + public boolean isChecksumJournalFiles() { + return checksumJournalFiles; + } + + public void setChecksumJournalFiles(boolean checksumJournalFiles) { + this.checksumJournalFiles = checksumJournalFiles; + } }