activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r810728 - in /activemq/sandbox/activemq-flow: activemq-util/src/main/java/org/apache/activemq/util/ activemq-util/src/main/java/org/apache/activemq/util/buffer/ activemq-util/src/main/java/org/apache/activemq/util/list/ kahadb/ kahadb/src/m...
Date Wed, 02 Sep 2009 23:02:10 GMT
Author: chirino
Date: Wed Sep  2 23:02:09 2009
New Revision: 810728

URL: http://svn.apache.org/viewvc?rev=810728&view=rev
Log:
Merged in kahadb enhancements from trunk.


Modified:
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
    activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java
    activemq/sandbox/activemq-flow/kahadb/   (props changed)
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
    activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/LockFile.java
Wed Sep  2 23:02:09 2009
@@ -50,22 +50,30 @@
             return;
         }
 
-        lockCounter++;
-        if( lockCounter!=1 ) {
+        if( lockCounter>0 ) {
             return;
         }
         
         IOHelper.mkdirs(file.getParentFile());
-        readFile = new RandomAccessFile(file, "rw");        
         if (lock == null) {
+            readFile = new RandomAccessFile(file, "rw");
+            IOException reason = null;
             try {
                 lock = readFile.getChannel().tryLock();
             } catch (OverlappingFileLockException e) {
-                throw IOExceptionSupport.create("File '" + file + "' could not be locked.",e);
+                reason = IOExceptionSupport.create("File '" + file + "' could not be locked.",e);
             }
-            if (lock == null) {
+            if (lock != null) {
+                lockCounter++;
+            } else {
+                // new read file for next attempt
+                closeReadFile();
+                if (reason != null) {
+                    throw reason;
+                }
                 throw new IOException("File '" + file + "' could not be locked.");
             }
+              
         }
     }
 
@@ -89,6 +97,14 @@
             }
             lock = null;
         }
+        closeReadFile();
+        
+        if( deleteOnUnlock ) {
+            file.delete();
+        }
+    }
+
+    private void closeReadFile() {
         // close the file.
         if (readFile != null) {
             try {
@@ -98,9 +114,6 @@
             readFile = null;
         }
         
-        if( deleteOnUnlock ) {
-            file.delete();
-        }
     }
 
 }

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/buffer/Buffer.java
Wed Sep  2 23:02:09 2009
@@ -17,6 +17,8 @@
 
 package org.apache.activemq.util.buffer;
 
+import com.sun.org.apache.bcel.internal.util.ByteSequence;
+
 import java.util.List;
 
 
@@ -156,6 +158,25 @@
         return -1;
     }
 
+    public int indexOf(Buffer needle, int pos) {
+        int max = length - needle.length;
+        for (int i = pos; i < max; i++) {
+            if (matches(needle, i)) {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+    private boolean matches(Buffer needle, int pos) {
+        for (int i = 0; i < needle.length; i++) {
+            if( data[offset + pos+ i] != needle.data[needle.offset + i] ) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     final public static Buffer join(List<Buffer> items, Buffer seperator) {
         if (items.isEmpty())
             return new Buffer(seperator.data, 0, 0);

Modified: activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java
(original)
+++ activemq/sandbox/activemq-flow/activemq-util/src/main/java/org/apache/activemq/util/list/SequenceSet.java
Wed Sep  2 23:02:09 2009
@@ -177,7 +177,18 @@
         Sequence rc = removeFirstSequence(1);
         return rc.first;
     }
-    
+
+
+    public Sequence removeLastSequence() {
+        if (isEmpty()) {
+            return null;
+        }
+
+        Sequence rc = getTail();
+        rc.unlink();
+        return rc;
+    }
+
     /**
      * Removes and returns the first sequence that is count range large.
      *
@@ -256,5 +267,19 @@
         }
         return rc;
     }
-   
+
+    public boolean contains(int first, int last) {
+        if (isEmpty()) {
+            return false;
+        }
+        Sequence sequence = getHead();
+        while (sequence != null) {
+            if (sequence.first <= first ) {
+                return last <= sequence.last ;
+            }
+            sequence = sequence.getNext();
+        }
+        return false;
+    }
+
 }
\ No newline at end of file

Propchange: activemq/sandbox/activemq-flow/kahadb/
------------------------------------------------------------------------------
    svn:mergeinfo = /activemq/trunk/kahadb:780475-810696

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
(original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
Wed Sep  2 23:02:09 2009
@@ -20,181 +20,287 @@
 
 /**
  * Interface used to selectively visit the entries in a BTree.
- * 
+ *
  * @param <Key>
  * @param <Value>
  */
-public interface BTreeVisitor<Key, Value> {
+public interface BTreeVisitor<Key,Value> {
 
     /**
-     * Do you want to visit the range of BTree entries between the first and and
-     * second key?
-     * 
-     * @param first
-     *            if null indicates the range of values before the second key.
-     * @param second
-     *            if null indicates the range of values after the first key.
-     * @return true if you want to visit the values between the first and second
-     *         key.
+     * Do you want to visit the range of BTree entries between the first and and second key?
+     *
+     * @param first if null indicates the range of values before the second key.
+     * @param second if null indicates the range of values after the first key.
+     * @return true if you want to visit the values between the first and second key.
      */
     boolean isInterestedInKeysBetween(Key first, Key second);
 
     /**
      * The keys and values of a BTree leaf node.
-     * 
+     *
      * @param keys
      * @param values
      */
     void visit(List<Key> keys, List<Value> values);
 
     /**
-     * If the visitor wishes to
-     * 
-     * @return
+     * @return true if the visitor has quenched it's thirst for more results
      */
     boolean isSatiated();
 
-    abstract class GTVisitor<Key extends Comparable<? super Key>, Value> implements
BTreeVisitor<Key, Value> {
-        final private Key value;
-        int matches = Integer.MAX_VALUE;
-        boolean limited;
+    public interface Predicate<Key> {
+        boolean isInterestedInKeysBetween(Key first, Key second);
+        boolean isInterestedInKey(Key key);
+    }
 
-        public GTVisitor(Key value) {
-            this.value = value;
+    abstract class PredicateVisitor<Key, Value> implements BTreeVisitor<Key, Value>,
Predicate<Key> {
+        public static final int UNLIMITED=-1;
+		private int limit;
+
+		public PredicateVisitor(int limit) {
+		    this.limit = limit;
+		}
+
+		final public void visit(List<Key> keys, List<Value> values) {
+			for( int i=0; i < keys.size() && !isSatiated(); i++) {
+				Key key = keys.get(i);
+				if( isInterestedInKey(key) ) {
+				    if(limit > 0 )
+				        limit--;
+					matched(key, values.get(i));
+				}
+			}
+		}
+
+		protected void matched(Key key, Value value) {
         }
 
-        public GTVisitor(Key value, int limit) {
-            this.value = value;
-            limited = true;
-            matches = limit;
+        public boolean isSatiated() {
+            return limit==0;
         }
+    }
 
-        public boolean isInterestedInKeysBetween(Key first, Key second) {
-            return second == null || second.compareTo(value) > 0;
+    class OrVisitor<Key, Value> extends PredicateVisitor<Key, Value> {
+        private final List<Predicate<Key>> conditions;
+
+        public OrVisitor(List<Predicate<Key>> conditions) {
+            this(conditions, UNLIMITED);
         }
 
-        public void visit(List<Key> keys, List<Value> values) {
-            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
-                Key key = keys.get(i);
-                if (key.compareTo(value) > 0) {
-                    matched(key, values.get(i));
-                    if (limited) matches--;
+        public OrVisitor(List<Predicate<Key>> conditions, int limit) {
+            super(limit);
+            this.conditions = conditions;
+        }
+
+		final public boolean isInterestedInKeysBetween(Key first, Key second) {
+            for (Predicate<Key> condition : conditions) {
+                if( condition.isInterestedInKeysBetween(first, second) ) {
+                    return true;
                 }
             }
-        }
+            return false;
+		}
 
-        public boolean isSatiated() {
-            return limited && matches <= 0;
+        final public boolean isInterestedInKey(Key key) {
+            for (Predicate<Key> condition : conditions) {
+                if( condition.isInterestedInKey(key) ) {
+                    return true;
+                }
+            }
+            return false;
         }
 
-        abstract protected void matched(Key key, Value value);
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            boolean first=true;
+            for (Predicate<Key> condition : conditions) {
+                if( !first ) {
+                    sb.append(" OR ");
+                }
+                first=false;
+                sb.append("(");
+                sb.append(condition);
+                sb.append(")");
+            }
+            return sb.toString();
+        }
     }
 
-    abstract class GTEVisitor<Key extends Comparable<? super Key>, Value> implements
BTreeVisitor<Key, Value> {
-        final private Key value;
-        int matches = Integer.MAX_VALUE;
-        boolean limited;
+    class AndVisitor<Key, Value> extends PredicateVisitor<Key, Value> {
+        private final List<Predicate<Key>> conditions;
 
-        public GTEVisitor(Key value) {
-            this.value = value;
+        public AndVisitor(List<Predicate<Key>> conditions) {
+            this(conditions, UNLIMITED);
         }
-        
-        public GTEVisitor(Key value, int limit) {
-            this.value = value;
-            limited = true;
-            matches = limit;
+        public AndVisitor(List<Predicate<Key>> conditions, int limit) {
+            super(limit);
+            this.conditions = conditions;
         }
 
-        public boolean isInterestedInKeysBetween(Key first, Key second) {
-            return second == null || second.compareTo(value) >= 0;
+		final public boolean isInterestedInKeysBetween(Key first, Key second) {
+            for (Predicate<Key> condition : conditions) {
+                if( !condition.isInterestedInKeysBetween(first, second) ) {
+                    return false;
+                }
+            }
+            return true;
+		}
+
+        final public boolean isInterestedInKey(Key key) {
+            for (Predicate<Key> condition : conditions) {
+                if( !condition.isInterestedInKey(key) ) {
+                    return false;
+                }
+            }
+            return true;
         }
 
-        public void visit(List<Key> keys, List<Value> values) {
-            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
-                Key key = keys.get(i);
-                if (key.compareTo(value) >= 0) {
-                    matched(key, values.get(i));
-                    if (limited) matches--;
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            boolean first=true;
+            for (Predicate<Key> condition : conditions) {
+                if( !first ) {
+                    sb.append(" AND ");
                 }
+                first=false;
+                sb.append("(");
+                sb.append(condition);
+                sb.append(")");
             }
+            return sb.toString();
         }
+    }
 
-        public boolean isSatiated() {
-            return limited && matches <= 0;
+    class BetweenVisitor<Key extends Comparable<? super Key>, Value> extends
PredicateVisitor<Key, Value> {
+		private final Key first;
+        private final Key last;
+
+        public BetweenVisitor(Key first, Key last) {
+            this(first, last, UNLIMITED);
         }
 
-        abstract protected void matched(Key key, Value value);
-    }
+        public BetweenVisitor(Key first, Key last, int limit) {
+            super(limit);
+			this.first = first;
+            this.last = last;
+        }
 
-    abstract class LTVisitor<Key extends Comparable<? super Key>, Value> implements
BTreeVisitor<Key, Value> {
-        final private Key value;
-        int matches = Integer.MAX_VALUE;
-        boolean limited;
+		final public boolean isInterestedInKeysBetween(Key left, Key right) {
+        	return (right==null || right.compareTo(first)>=0)
+                    && (left==null || left.compareTo(last)<0);
+		}
 
-        public LTVisitor(Key value) {
-            this.value = value;
+        final public boolean isInterestedInKey(Key key) {
+            return key.compareTo(first) >=0 && key.compareTo(last) <0;
         }
-        
-        public LTVisitor(Key value, int limit) {
-            this.value = value;
-            limited = true;
-            matches = limit;
+
+        @Override
+        public String toString() {
+            return first+" <= key < "+last;
         }
+    }
 
-        public boolean isInterestedInKeysBetween(Key first, Key second) {
-            return first == null || first.compareTo(value) < 0;
+    class GTVisitor<Key extends Comparable<? super Key>, Value> extends PredicateVisitor<Key,
Value> {
+		final private Key value;
+
+		public GTVisitor(Key value) {
+			this(value, UNLIMITED);
+		}
+		public GTVisitor(Key value, int limit) {
+		    super(limit);
+			this.value = value;
+		}
+
+		final public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return second==null || isInterestedInKey(second);
+		}
+
+        final public boolean isInterestedInKey(Key key) {
+            return key.compareTo(value)>0;
+        }
+
+        @Override
+        public String toString() {
+            return "key > "+ value;
         }
+    }
 
-        public void visit(List<Key> keys, List<Value> values) {
-            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
-                Key key = keys.get(i);
-                if (key.compareTo(value) < 0) {
-                    matched(key, values.get(i));
-                    if (limited) matches--;
-                }
-            }
+    class GTEVisitor<Key extends Comparable<? super Key>, Value> extends PredicateVisitor<Key,
Value> {
+		final private Key value;
+
+        public GTEVisitor(Key value) {
+            this(value, UNLIMITED);
         }
 
-        public boolean isSatiated() {
-            return limited && matches <= 0;
+		public GTEVisitor(Key value, int limit) {
+            super(limit);
+			this.value = value;
+		}
+
+		final public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return second==null || isInterestedInKey(second);
+		}
+
+        final public boolean isInterestedInKey(Key key) {
+            return key.compareTo(value)>=0;
         }
 
-        abstract protected void matched(Key key, Value value);
+        @Override
+        public String toString() {
+            return "key >= "+ value;
+        }
     }
 
-    abstract class LTEVisitor<Key extends Comparable<? super Key>, Value> implements
BTreeVisitor<Key, Value> {
-        final private Key value;
-        int matches = Integer.MAX_VALUE;
-        boolean limited;
+    class LTVisitor<Key extends Comparable<? super Key>, Value> extends PredicateVisitor<Key,
Value> {
+		final private Key value;
 
-        public LTEVisitor(Key value) {
-            this.value = value;
+        public LTVisitor(Key value) {
+            this(value, UNLIMITED);
         }
+        
+		public LTVisitor(Key value, int limit) {
+            super(limit);
+			this.value = value;
+		}
+
+		final public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return first==null || isInterestedInKey(first);
+		}
+
+        final public boolean isInterestedInKey(Key key) {
+            return key.compareTo(value)<0;
+        }
+
+        @Override
+        public String toString() {
+            return "key < "+ value;
+        }
+    }
+
+    class LTEVisitor<Key extends Comparable<? super Key>, Value> extends PredicateVisitor<Key,
Value> {
+		final private Key value;
 
+		public LTEVisitor(Key value) {
+            this(value, UNLIMITED);
+		}
         public LTEVisitor(Key value, int limit) {
+            super(limit);
             this.value = value;
-            limited = true;
-            matches = limit;
-        }
-        
-        public boolean isInterestedInKeysBetween(Key first, Key second) {
-            return first == null || first.compareTo(value) <= 0;
         }
 
-        public void visit(List<Key> keys, List<Value> values) {
-            for (int i = 0; i < keys.size() && !isSatiated(); i++) {
-                Key key = keys.get(i);
-                if (key.compareTo(value) <= 0) {
-                    matched(key, values.get(i));
-                    if (limited) matches--;
-                }
-            }
-        }
+		final public boolean isInterestedInKeysBetween(Key first, Key second) {
+        	return first==null || isInterestedInKey(first);
+		}
 
-        public boolean isSatiated() {
-            return limited && matches <= 0;
+        final public boolean isInterestedInKey(Key key) {
+            return key.compareTo(value)<=0;
         }
 
-        abstract protected void matched(Key key, Value value);
+        @Override
+        public String toString() {
+            return "key <= "+ value;
+        }
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
(original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
Wed Sep  2 23:02:09 2009
@@ -22,6 +22,7 @@
 
 import org.apache.activemq.util.IOHelper;
 import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.SequenceSet;
 
 /**
  * DataFile
@@ -33,6 +34,7 @@
     protected final File file;
     protected final Integer dataFileId;
     protected int length;
+    protected final SequenceSet corruptedBlocks = new SequenceSet();
 
     DataFile(File file, int number, int preferedSize) {
         this.file = file;
@@ -80,6 +82,10 @@
         IOHelper.moveFile(file,targetDirectory);
     }
 
+    public SequenceSet getCorruptedBlocks() {
+        return corruptedBlocks;
+    }
+    
     public int compareTo(DataFile df) {
         return dataFileId - df.dataFileId;
     }

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
(original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
Wed Sep  2 23:02:09 2009
@@ -96,11 +96,16 @@
         }
     }
     
-    public void read(long offset, byte data[]) throws IOException {
+    public void readFully(long offset, byte data[]) throws IOException {
        file.seek(offset);
        file.readFully(data);
     }
 
+    public int read(long offset, byte data[]) throws IOException {
+       file.seek(offset);
+       return file.read(data);
+    }
+
     public void readLocationDetails(Location location) throws IOException {
         WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
         if (asyncWrite != null) {

Modified: activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=810728&r1=810727&r2=810728&view=diff
==============================================================================
--- activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
(original)
+++ activemq/sandbox/activemq-flow/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
Wed Sep  2 23:02:09 2009
@@ -20,15 +20,7 @@
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -38,7 +30,9 @@
 import org.apache.activemq.util.Scheduler;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.util.buffer.DataByteArrayInputStream;
+import org.apache.activemq.util.buffer.DataByteArrayOutputStream;
 import org.apache.activemq.util.list.LinkedNodeList;
+import org.apache.activemq.util.list.Sequence;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
@@ -61,7 +55,20 @@
     // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.

     public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
     public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
-    
+    public static final byte[] BATCH_CONTROL_RECORD_HEADER = createBatchControlRecordHeader();
+
+    private static byte[] createBatchControlRecordHeader() {
+        try {
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+            os.writeInt(BATCH_CONTROL_RECORD_SIZE);
+            os.writeByte(BATCH_CONTROL_RECORD_TYPE);
+            os.write(BATCH_CONTROL_RECORD_MAGIC);
+            return os.toBuffer().toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException("Could not create batch control record header.");
+        }
+    }
+
     public static final String DEFAULT_DIRECTORY = ".";
     public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
     public static final String DEFAULT_FILE_PREFIX = "db-";
@@ -96,6 +103,7 @@
     protected boolean archiveDataLogs;
 	private ReplicationTarget replicationTarget;
     protected boolean checksum;
+    protected boolean checkForCorruptionOnStartup;
 
     public synchronized void start() throws IOException {
         if (started) {
@@ -137,17 +145,20 @@
             for (DataFile df : l) {
                 dataFiles.addLast(df);
                 fileByFileMap.put(df.getFile(), df);
+
+                if( isCheckForCorruptionOnStartup() ) {
+                    lastAppendLocation.set(recoveryCheck(df));
+                }
             }
         }
 
     	getCurrentWriteFile();
-        try {
-        	Location l = recoveryCheck(dataFiles.getTail());
-            lastAppendLocation.set(l);
-        } catch (IOException e) {
-            LOG.warn("recovery check failed", e);
+
+        if( lastAppendLocation.get()==null ) {
+            DataFile df = dataFiles.getTail();
+            lastAppendLocation.set(recoveryCheck(df));
         }
-        
+
         cleanupTask = new Runnable() {
             public void run() {
                 cleanup();
@@ -177,56 +188,108 @@
     	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
             while( true ) {
-	        	reader.read(location.getOffset(), controlRecord);
-	        	controlIs.restart();
-	        	
-	        	// Assert that it's  a batch record.
-	        	if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
-	        		break;
-	        	}
-	        	if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) {
-	        		break;
-	        	}
-	        	for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) {
-	        		if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) {
-	        			break;
-	        		}
-	        	}
-	        	
-	        	int size = controlIs.readInt();
-	        	if( size > MAX_BATCH_SIZE ) {
-	        		break;
-	        	}
-	        	
-	        	if( isChecksum() ) {
-		        	
-	        		long expectedChecksum = controlIs.readLong();	        	
-		        	
-	        		byte data[] = new byte[size];
-		        	reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
-		        	
-		        	Checksum checksum = new Adler32();
-	                checksum.update(data, 0, data.length);
-	                
-	                if( expectedChecksum!=checksum.getValue() ) {
-	                	break;
-	                }
-	                
-	        	}
-                
-	        	
-                location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+                int size = checkBatchRecord(reader, location.getOffset());
+                if ( size>=0 ) {
+                    location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+                } else {
+
+                    // Perhaps it's just some corruption... scan through the file to find
the next valid batch record.  We
+                    // may have subsequent valid batch records.
+                    int nextOffset = findNextBatchRecord(reader, location.getOffset()+1);
+                    if( nextOffset >=0 ) {
+                        Sequence sequence = new Sequence(location.getOffset(), nextOffset
- 1);
+                        LOG.info("Corrupt journal records found in '"+dataFile.getFile()+"'
between offsets: "+sequence);
+                        dataFile.corruptedBlocks.add(sequence);
+                        location.setOffset(nextOffset);
+                    } else {
+                        break;
+                    }
+                }
             }
             
         } catch (IOException e) {
 		} finally {
             accessorPool.closeDataFileAccessor(reader);
         }
-        
+
         dataFile.setLength(location.getOffset());
+
+        if( !dataFile.corruptedBlocks.isEmpty() ) {
+            // Is the end of the data file corrupted?
+            if( dataFile.corruptedBlocks.getTail().getLast()+1 == location.getOffset() )
{
+                dataFile.setLength((int) dataFile.corruptedBlocks.removeLastSequence().getFirst());
+            }
+        }
+
         return location;
     }
 
+    private int findNextBatchRecord(DataFileAccessor reader, int offset) throws IOException
{
+        Buffer header = new Buffer(BATCH_CONTROL_RECORD_HEADER);
+        byte data[] = new byte[1024*4];
+        Buffer bs = new Buffer(data, 0, reader.read(offset, data));
+
+        int pos = 0;
+        while( true ) {
+            pos = bs.indexOf(header, pos);
+            if( pos >= 0 ) {
+                return offset+pos;
+            } else {
+                // need to load the next data chunck in..
+                if( bs.length != data.length ) {
+                    // If we had a short read then we were at EOF
+                    return -1;
+                }
+                offset += bs.length-BATCH_CONTROL_RECORD_HEADER.length;
+                bs = new Buffer(data, 0, reader.read(offset, data));
+                pos=0;
+            }
+        }
+    }
+
+
+    public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
+        byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+        DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
+
+        reader.readFully(offset, controlRecord);
+
+        // Assert that it's  a batch record.
+        for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
+            if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
+                return -1;
+            }
+        }
+
+        int size = controlIs.readInt();
+        if( size > MAX_BATCH_SIZE ) {
+            return -1;
+        }
+
+        if( isChecksum() ) {
+
+            long expectedChecksum = controlIs.readLong();
+            if( expectedChecksum == 0 ) {
+                // Checksuming was not enabled when the record was stored.
+                // we can't validate the record :(
+                return size;
+            }
+
+            byte data[] = new byte[size];
+            reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
+
+            Checksum checksum = new Adler32();
+            checksum.update(data, 0, data.length);
+
+            if( expectedChecksum!=checksum.getValue() ) {
+                return -1;
+            }
+
+        }
+        return size;
+    }
+
+
 	void addToTotalLength(int size) {
 		totalLength.addAndGet(size);
 	}
@@ -640,5 +703,11 @@
 		this.checksum = checksumWrites;
 	}
 
+    public boolean isCheckForCorruptionOnStartup() {
+        return checkForCorruptionOnStartup;
+    }
 
+    public void setCheckForCorruptionOnStartup(boolean checkForCorruptionOnStartup) {
+        this.checkForCorruptionOnStartup = checkForCorruptionOnStartup;
+    }
 }



Mime
View raw message