From commits-return-11635-apmail-activemq-commits-archive=activemq.apache.org@activemq.apache.org Wed Sep 02 23:02:42 2009 Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 53744 invoked from network); 2 Sep 2009 23:02:42 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 2 Sep 2009 23:02:42 -0000 Received: (qmail 91582 invoked by uid 500); 2 Sep 2009 23:02:42 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 91523 invoked by uid 500); 2 Sep 2009 23:02:42 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 91514 invoked by uid 99); 2 Sep 2009 23:02:42 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Sep 2009 23:02:42 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 02 Sep 2009 23:02:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C76C923888D2; Wed, 2 Sep 2009 23:02:10 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r810728 - in /activemq/sandbox/activemq-flow: activemq-util/src/main/java/org/apache/activemq/util/ activemq-util/src/main/java/org/apache/activemq/util/buffer/ activemq-util/src/main/java/org/apache/activemq/util/list/ kahadb/ kahadb/src/m... Date: Wed, 02 Sep 2009 23:02:10 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090902230210.C76C923888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Wed Sep 2 23:02:09 2009 New Revision: 810728 URL: http://svn.apache.org/viewvc?rev=810728&view=rev Log: Merged in kahadb enhancements from trunk. Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java activemq/sandbox/activemq-flow/kahadb/ (props changed) activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java (original) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java Wed Sep 2 23:02:09 2009 @@ -50,22 +50,30 @@ return; } - lockCounter++; - if( lockCounter!=1 ) { + if( lockCounter>0 ) { return; } IOHelper.mkdirs(file.getParentFile()); - readFile = new RandomAccessFile(file, "rw"); if (lock == null) { + readFile = new RandomAccessFile(file, "rw"); + IOException reason = null; try { lock = readFile.getChannel().tryLock(); } catch (OverlappingFileLockException e) { - throw IOExceptionSupport.create("File '" + file + "' could not be locked.",e); + reason = IOExceptionSupport.create("File '" + file + "' could not be locked.",e); } - if (lock == null) { + if (lock != null) { + lockCounter++; + } else { + // new read file for next attempt + closeReadFile(); + if (reason != null) { + throw reason; + } throw new IOException("File '" + file + "' could not be locked."); } + } } @@ -89,6 +97,14 @@ } lock = null; } + closeReadFile(); + + if( deleteOnUnlock ) { + file.delete(); + } + } + + private void closeReadFile() { // close the file. if (readFile != null) { try { @@ -98,9 +114,6 @@ readFile = null; } - if( deleteOnUnlock ) { - file.delete(); - } } } Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java (original) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java Wed Sep 2 23:02:09 2009 @@ -17,6 +17,8 @@ package org.apache.activemq.util.buffer; +import com.sun.org.apache.bcel.internal.util.ByteSequence; + import java.util.List; @@ -156,6 +158,25 @@ return -1; } + public int indexOf(Buffer needle, int pos) { + int max = length - needle.length; + for (int i = pos; i < max; i++) { + if (matches(needle, i)) { + return i; + } + } + return -1; + } + + private boolean matches(Buffer needle, int pos) { + for (int i = 0; i < needle.length; i++) { + if( data[offset + pos+ i] != needle.data[needle.offset + i] ) { + return false; + } + } + return true; + } + final public static Buffer join(List items, Buffer seperator) { if (items.isEmpty()) return new Buffer(seperator.data, 0, 0); Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java (original) +++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java Wed Sep 2 23:02:09 2009 @@ -177,7 +177,18 @@ Sequence rc = removeFirstSequence(1); return rc.first; } - + + + public Sequence removeLastSequence() { + if (isEmpty()) { + return null; + } + + Sequence rc = getTail(); + rc.unlink(); + return rc; + } + /** * Removes and returns the first sequence that is count range large. * @@ -256,5 +267,19 @@ } return rc; } - + + public boolean contains(int first, int last) { + if (isEmpty()) { + return false; + } + Sequence sequence = getHead(); + while (sequence != null) { + if (sequence.first <= first ) { + return last <= sequence.last ; + } + sequence = sequence.getNext(); + } + return false; + } + } \ No newline at end of file Propchange: activemq/sandbox/activemq-flow/kahadb/ ------------------------------------------------------------------------------ svn:mergeinfo = /activemq/trunk/kahadb:780475-810696 Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original) +++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Wed Sep 2 23:02:09 2009 @@ -20,181 +20,287 @@ /** * Interface used to selectively visit the entries in a BTree. - * + * * @param * @param */ -public interface BTreeVisitor { +public interface BTreeVisitor { /** - * Do you want to visit the range of BTree entries between the first and and - * second key? - * - * @param first - * if null indicates the range of values before the second key. - * @param second - * if null indicates the range of values after the first key. - * @return true if you want to visit the values between the first and second - * key. + * Do you want to visit the range of BTree entries between the first and and second key? + * + * @param first if null indicates the range of values before the second key. + * @param second if null indicates the range of values after the first key. + * @return true if you want to visit the values between the first and second key. */ boolean isInterestedInKeysBetween(Key first, Key second); /** * The keys and values of a BTree leaf node. - * + * * @param keys * @param values */ void visit(List keys, List values); /** - * If the visitor wishes to - * - * @return + * @return true if the visitor has quenched it's thirst for more results */ boolean isSatiated(); - abstract class GTVisitor, Value> implements BTreeVisitor { - final private Key value; - int matches = Integer.MAX_VALUE; - boolean limited; + public interface Predicate { + boolean isInterestedInKeysBetween(Key first, Key second); + boolean isInterestedInKey(Key key); + } - public GTVisitor(Key value) { - this.value = value; + abstract class PredicateVisitor implements BTreeVisitor, Predicate { + public static final int UNLIMITED=-1; + private int limit; + + public PredicateVisitor(int limit) { + this.limit = limit; + } + + final public void visit(List keys, List values) { + for( int i=0; i < keys.size() && !isSatiated(); i++) { + Key key = keys.get(i); + if( isInterestedInKey(key) ) { + if(limit > 0 ) + limit--; + matched(key, values.get(i)); + } + } + } + + protected void matched(Key key, Value value) { } - public GTVisitor(Key value, int limit) { - this.value = value; - limited = true; - matches = limit; + public boolean isSatiated() { + return limit==0; } + } - public boolean isInterestedInKeysBetween(Key first, Key second) { - return second == null || second.compareTo(value) > 0; + class OrVisitor extends PredicateVisitor { + private final List> conditions; + + public OrVisitor(List> conditions) { + this(conditions, UNLIMITED); } - public void visit(List keys, List values) { - for (int i = 0; i < keys.size() && !isSatiated(); i++) { - Key key = keys.get(i); - if (key.compareTo(value) > 0) { - matched(key, values.get(i)); - if (limited) matches--; + public OrVisitor(List> conditions, int limit) { + super(limit); + this.conditions = conditions; + } + + final public boolean isInterestedInKeysBetween(Key first, Key second) { + for (Predicate condition : conditions) { + if( condition.isInterestedInKeysBetween(first, second) ) { + return true; } } - } + return false; + } - public boolean isSatiated() { - return limited && matches <= 0; + final public boolean isInterestedInKey(Key key) { + for (Predicate condition : conditions) { + if( condition.isInterestedInKey(key) ) { + return true; + } + } + return false; } - abstract protected void matched(Key key, Value value); + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + boolean first=true; + for (Predicate condition : conditions) { + if( !first ) { + sb.append(" OR "); + } + first=false; + sb.append("("); + sb.append(condition); + sb.append(")"); + } + return sb.toString(); + } } - abstract class GTEVisitor, Value> implements BTreeVisitor { - final private Key value; - int matches = Integer.MAX_VALUE; - boolean limited; + class AndVisitor extends PredicateVisitor { + private final List> conditions; - public GTEVisitor(Key value) { - this.value = value; + public AndVisitor(List> conditions) { + this(conditions, UNLIMITED); } - - public GTEVisitor(Key value, int limit) { - this.value = value; - limited = true; - matches = limit; + public AndVisitor(List> conditions, int limit) { + super(limit); + this.conditions = conditions; } - public boolean isInterestedInKeysBetween(Key first, Key second) { - return second == null || second.compareTo(value) >= 0; + final public boolean isInterestedInKeysBetween(Key first, Key second) { + for (Predicate condition : conditions) { + if( !condition.isInterestedInKeysBetween(first, second) ) { + return false; + } + } + return true; + } + + final public boolean isInterestedInKey(Key key) { + for (Predicate condition : conditions) { + if( !condition.isInterestedInKey(key) ) { + return false; + } + } + return true; } - public void visit(List keys, List values) { - for (int i = 0; i < keys.size() && !isSatiated(); i++) { - Key key = keys.get(i); - if (key.compareTo(value) >= 0) { - matched(key, values.get(i)); - if (limited) matches--; + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + boolean first=true; + for (Predicate condition : conditions) { + if( !first ) { + sb.append(" AND "); } + first=false; + sb.append("("); + sb.append(condition); + sb.append(")"); } + return sb.toString(); } + } - public boolean isSatiated() { - return limited && matches <= 0; + class BetweenVisitor, Value> extends PredicateVisitor { + private final Key first; + private final Key last; + + public BetweenVisitor(Key first, Key last) { + this(first, last, UNLIMITED); } - abstract protected void matched(Key key, Value value); - } + public BetweenVisitor(Key first, Key last, int limit) { + super(limit); + this.first = first; + this.last = last; + } - abstract class LTVisitor, Value> implements BTreeVisitor { - final private Key value; - int matches = Integer.MAX_VALUE; - boolean limited; + final public boolean isInterestedInKeysBetween(Key left, Key right) { + return (right==null || right.compareTo(first)>=0) + && (left==null || left.compareTo(last)<0); + } - public LTVisitor(Key value) { - this.value = value; + final public boolean isInterestedInKey(Key key) { + return key.compareTo(first) >=0 && key.compareTo(last) <0; } - - public LTVisitor(Key value, int limit) { - this.value = value; - limited = true; - matches = limit; + + @Override + public String toString() { + return first+" <= key < "+last; } + } - public boolean isInterestedInKeysBetween(Key first, Key second) { - return first == null || first.compareTo(value) < 0; + class GTVisitor, Value> extends PredicateVisitor { + final private Key value; + + public GTVisitor(Key value) { + this(value, UNLIMITED); + } + public GTVisitor(Key value, int limit) { + super(limit); + this.value = value; + } + + final public boolean isInterestedInKeysBetween(Key first, Key second) { + return second==null || isInterestedInKey(second); + } + + final public boolean isInterestedInKey(Key key) { + return key.compareTo(value)>0; + } + + @Override + public String toString() { + return "key > "+ value; } + } - public void visit(List keys, List values) { - for (int i = 0; i < keys.size() && !isSatiated(); i++) { - Key key = keys.get(i); - if (key.compareTo(value) < 0) { - matched(key, values.get(i)); - if (limited) matches--; - } - } + class GTEVisitor, Value> extends PredicateVisitor { + final private Key value; + + public GTEVisitor(Key value) { + this(value, UNLIMITED); } - public boolean isSatiated() { - return limited && matches <= 0; + public GTEVisitor(Key value, int limit) { + super(limit); + this.value = value; + } + + final public boolean isInterestedInKeysBetween(Key first, Key second) { + return second==null || isInterestedInKey(second); + } + + final public boolean isInterestedInKey(Key key) { + return key.compareTo(value)>=0; } - abstract protected void matched(Key key, Value value); + @Override + public String toString() { + return "key >= "+ value; + } } - abstract class LTEVisitor, Value> implements BTreeVisitor { - final private Key value; - int matches = Integer.MAX_VALUE; - boolean limited; + class LTVisitor, Value> extends PredicateVisitor { + final private Key value; - public LTEVisitor(Key value) { - this.value = value; + public LTVisitor(Key value) { + this(value, UNLIMITED); } + + public LTVisitor(Key value, int limit) { + super(limit); + this.value = value; + } + + final public boolean isInterestedInKeysBetween(Key first, Key second) { + return first==null || isInterestedInKey(first); + } + + final public boolean isInterestedInKey(Key key) { + return key.compareTo(value)<0; + } + + @Override + public String toString() { + return "key < "+ value; + } + } + + class LTEVisitor, Value> extends PredicateVisitor { + final private Key value; + public LTEVisitor(Key value) { + this(value, UNLIMITED); + } public LTEVisitor(Key value, int limit) { + super(limit); this.value = value; - limited = true; - matches = limit; - } - - public boolean isInterestedInKeysBetween(Key first, Key second) { - return first == null || first.compareTo(value) <= 0; } - public void visit(List keys, List values) { - for (int i = 0; i < keys.size() && !isSatiated(); i++) { - Key key = keys.get(i); - if (key.compareTo(value) <= 0) { - matched(key, values.get(i)); - if (limited) matches--; - } - } - } + final public boolean isInterestedInKeysBetween(Key first, Key second) { + return first==null || isInterestedInKey(first); + } - public boolean isSatiated() { - return limited && matches <= 0; + final public boolean isInterestedInKey(Key key) { + return key.compareTo(value)<=0; } - abstract protected void matched(Key key, Value value); + @Override + public String toString() { + return "key <= "+ value; + } } } \ No newline at end of file Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original) +++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Wed Sep 2 23:02:09 2009 @@ -22,6 +22,7 @@ import org.apache.activemq.util.IOHelper; import org.apache.activemq.util.list.LinkedNode; +import org.apache.activemq.util.list.SequenceSet; /** * DataFile @@ -33,6 +34,7 @@ protected final File file; protected final Integer dataFileId; protected int length; + protected final SequenceSet corruptedBlocks = new SequenceSet(); DataFile(File file, int number, int preferedSize) { this.file = file; @@ -80,6 +82,10 @@ IOHelper.moveFile(file,targetDirectory); } + public SequenceSet getCorruptedBlocks() { + return corruptedBlocks; + } + public int compareTo(DataFile df) { return dataFileId - df.dataFileId; } Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java (original) +++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java Wed Sep 2 23:02:09 2009 @@ -96,11 +96,16 @@ } } - public void read(long offset, byte data[]) throws IOException { + public void readFully(long offset, byte data[]) throws IOException { file.seek(offset); file.readFully(data); } + public int read(long offset, byte data[]) throws IOException { + file.seek(offset); + return file.read(data); + } + public void readLocationDetails(Location location) throws IOException { WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location)); if (asyncWrite != null) { Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=810728&r1=810727&r2=810728&view=diff ============================================================================== --- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original) +++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Wed Sep 2 23:02:09 2009 @@ -20,15 +20,7 @@ import java.io.FilenameFilter; import java.io.IOException; import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TreeMap; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -38,7 +30,9 @@ import org.apache.activemq.util.Scheduler; import org.apache.activemq.util.buffer.Buffer; import org.apache.activemq.util.buffer.DataByteArrayInputStream; +import org.apache.activemq.util.buffer.DataByteArrayOutputStream; import org.apache.activemq.util.list.LinkedNodeList; +import org.apache.activemq.util.list.Sequence; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.kahadb.journal.DataFileAppender.WriteCommand; @@ -61,7 +55,20 @@ // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch. public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH"); public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8; - + public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader(); + + private static byte[] createBatchControlRecordHeader() { + try { + DataByteArrayOutputStream os = new DataByteArrayOutputStream(); + os.writeInt(BATCH_CONTROL_RECORD_SIZE); + os.writeByte(BATCH_CONTROL_RECORD_TYPE); + os.write(BATCH_CONTROL_RECORD_MAGIC); + return os.toBuffer().toByteArray(); + } catch (IOException e) { + throw new RuntimeException("Could not create batch control record header."); + } + } + public static final String DEFAULT_DIRECTORY = "."; public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive"; public static final String DEFAULT_FILE_PREFIX = "db-"; @@ -96,6 +103,7 @@ protected boolean archiveDataLogs; private ReplicationTarget replicationTarget; protected boolean checksum; + protected boolean checkForCorruptionOnStartup; public synchronized void start() throws IOException { if (started) { @@ -137,17 +145,20 @@ for (DataFile df : l) { dataFiles.addLast(df); fileByFileMap.put(df.getFile(), df); + + if( isCheckForCorruptionOnStartup() ) { + lastAppendLocation.set(recoveryCheck(df)); + } } } getCurrentWriteFile(); - try { - Location l = recoveryCheck(dataFiles.getTail()); - lastAppendLocation.set(l); - } catch (IOException e) { - LOG.warn("recovery check failed", e); + + if( lastAppendLocation.get()==null ) { + DataFile df = dataFiles.getTail(); + lastAppendLocation.set(recoveryCheck(df)); } - + cleanupTask = new Runnable() { public void run() { cleanup(); @@ -177,56 +188,108 @@ DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile); try { while( true ) { - reader.read(location.getOffset(), controlRecord); - controlIs.restart(); - - // Assert that it's a batch record. - if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) { - break; - } - if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) { - break; - } - for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) { - if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) { - break; - } - } - - int size = controlIs.readInt(); - if( size > MAX_BATCH_SIZE ) { - break; - } - - if( isChecksum() ) { - - long expectedChecksum = controlIs.readLong(); - - byte data[] = new byte[size]; - reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data); - - Checksum checksum = new Adler32(); - checksum.update(data, 0, data.length); - - if( expectedChecksum!=checksum.getValue() ) { - break; - } - - } - - - location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); + int size = checkBatchRecord(reader, location.getOffset()); + if ( size>=0 ) { + location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size); + } else { + + // Perhaps it's just some corruption... scan through the file to find the next valid batch record. We + // may have subsequent valid batch records. + int nextOffset = findNextBatchRecord(reader, location.getOffset()+1); + if( nextOffset >=0 ) { + Sequence sequence = new Sequence(location.getOffset(), nextOffset - 1); + LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"' between offsets: "+sequence); + dataFile.corruptedBlocks.add(sequence); + location.setOffset(nextOffset); + } else { + break; + } + } } } catch (IOException e) { } finally { accessorPool.closeDataFileAccessor(reader); } - + dataFile.setLength(location.getOffset()); + + if( !dataFile.corruptedBlocks.isEmpty() ) { + // Is the end of the data file corrupted? + if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() ) { + dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst()); + } + } + return location; } + private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException { + Buffer header = new Buffer(BATCH_CONTROL_RECORD_HEADER); + byte data[] = new byte[1024*4]; + Buffer bs = new Buffer(data, 0, reader.read(offset, data)); + + int pos = 0; + while( true ) { + pos = bs.indexOf(header, pos); + if( pos >= 0 ) { + return offset+pos; + } else { + // need to load the next data chunck in.. + if( bs.length != data.length ) { + // If we had a short read then we were at EOF + return -1; + } + offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length; + bs = new Buffer(data, 0, reader.read(offset, data)); + pos=0; + } + } + } + + + public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException { + byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE]; + DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord); + + reader.readFully(offset, controlRecord); + + // Assert that it's a batch record. + for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) { + if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) { + return -1; + } + } + + int size = controlIs.readInt(); + if( size > MAX_BATCH_SIZE ) { + return -1; + } + + if( isChecksum() ) { + + long expectedChecksum = controlIs.readLong(); + if( expectedChecksum == 0 ) { + // Checksuming was not enabled when the record was stored. + // we can't validate the record :( + return size; + } + + byte data[] = new byte[size]; + reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data); + + Checksum checksum = new Adler32(); + checksum.update(data, 0, data.length); + + if( expectedChecksum!=checksum.getValue() ) { + return -1; + } + + } + return size; + } + + void addToTotalLength(int size) { totalLength.addAndGet(size); } @@ -640,5 +703,11 @@ this.checksum = checksumWrites; } + public boolean isCheckForCorruptionOnStartup() { + return checkForCorruptionOnStartup; + } + public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) { + this.checkForCorruptionOnStartup = checkForCorruptionOnStartup; + } }