activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r741741 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ kahadb/src/main/java/org/apache/kahadb/index/ kahadb/src/main/java/org/apache/kahadb/journal/ kahadb/src/test/java/org/apache/kahadb/journal/
Date Fri, 06 Feb 2009 21:35:40 GMT
Author: chirino
Date: Fri Feb  6 21:35:39 2009
New Revision: 741741

URL: http://svn.apache.org/viewvc?rev=741741&view=rev
Log:
Change the way journal records are validated on recovery to be consistent.  We now use a checksum
to provide better consistency.

Removed:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
(original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
Fri Feb  6 21:35:39 2009
@@ -1277,7 +1277,6 @@
         Journal manager = new Journal();
         manager.setDirectory(directory);
         manager.setMaxFileLength(getJournalMaxFileLength());
-        manager.setUseNio(false);
         return manager;
     }
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java Fri Feb  6
21:35:39 2009
@@ -29,7 +29,6 @@
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.page.Transaction.Closure;
 import org.apache.kahadb.util.Marshaller;
 
 /**

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Fri Feb  6
21:35:39 2009
@@ -20,7 +20,6 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
-import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.LinkedNode;
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java Fri
Feb  6 21:35:39 2009
@@ -81,12 +81,12 @@
             if (location.getSize() == Location.NOT_SET) {
                 file.seek(location.getOffset());
                 location.setSize(file.readInt());
-                file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+                location.setType(file.readByte());
             } else {
-                file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+                file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
             }
 
-            byte[] data = new byte[location.getSize() - Journal.ITEM_HEAD_FOOT_SPACE];
+            byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE];
             file.readFully(data);
             return new ByteSequence(data, 0, data.length);
 
@@ -94,6 +94,11 @@
             throw new IOException("Invalid location: " + location + ", : " + e);
         }
     }
+    
+    public void read(long offset, byte data[]) throws IOException {
+       file.seek(offset);
+       file.readFully(data);
+    }
 
     public void readLocationDetails(Location location) throws IOException {
         WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
@@ -107,42 +112,42 @@
         }
     }
 
-    public boolean readLocationDetailsAndValidate(Location location) {
-        try {
-            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
-            if (asyncWrite != null) {
-                location.setSize(asyncWrite.location.getSize());
-                location.setType(asyncWrite.location.getType());
-            } else {
-                file.seek(location.getOffset());
-                location.setSize(file.readInt());
-                location.setType(file.readByte());
-
-                byte data[] = new byte[3];
-                file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
-                file.readFully(data);
-                if (data[0] != Journal.ITEM_HEAD_SOR[0]
-                    || data[1] != Journal.ITEM_HEAD_SOR[1]
-                    || data[2] != Journal.ITEM_HEAD_SOR[2]) {
-                    return false;
-                }
-                file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
-                file.readFully(data);
-                if (data[0] != Journal.ITEM_HEAD_EOR[0]
-                    || data[1] != Journal.ITEM_HEAD_EOR[1]
-                    || data[2] != Journal.ITEM_HEAD_EOR[2]) {
-                    return false;
-                }
-            }
-        } catch (IOException e) {
-            return false;
-        }
-        return true;
-    }
+//    public boolean readLocationDetailsAndValidate(Location location) {
+//        try {
+//            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+//            if (asyncWrite != null) {
+//                location.setSize(asyncWrite.location.getSize());
+//                location.setType(asyncWrite.location.getType());
+//            } else {
+//                file.seek(location.getOffset());
+//                location.setSize(file.readInt());
+//                location.setType(file.readByte());
+//
+//                byte data[] = new byte[3];
+//                file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
+//                file.readFully(data);
+//                if (data[0] != Journal.ITEM_HEAD_SOR[0]
+//                    || data[1] != Journal.ITEM_HEAD_SOR[1]
+//                    || data[2] != Journal.ITEM_HEAD_SOR[2]) {
+//                    return false;
+//                }
+//                file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
+//                file.readFully(data);
+//                if (data[0] != Journal.ITEM_HEAD_EOR[0]
+//                    || data[1] != Journal.ITEM_HEAD_EOR[1]
+//                    || data[2] != Journal.ITEM_HEAD_EOR[2]) {
+//                    return false;
+//                }
+//            }
+//        } catch (IOException e) {
+//            return false;
+//        }
+//        return true;
+//    }
 
     public void updateRecord(Location location, ByteSequence data, boolean sync) throws IOException
{
 
-        file.seek(location.getOffset() + Journal.ITEM_HEAD_SPACE);
+        file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
         int size = Math.min(data.getLength(), location.getSize());
         file.write(data.getData(), data.getOffset(), size);
         if (sync) {

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
(original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
Fri Feb  6 21:35:39 2009
@@ -30,7 +30,7 @@
  */
 public class DataFileAccessorPool {
 
-    private final Journal dataManager;
+    private final Journal journal;
     private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
     private boolean closed;
     private int maxOpenReadersPerFile = 5;
@@ -50,9 +50,9 @@
         public DataFileAccessor openDataFileReader() throws IOException {
             DataFileAccessor rc = null;
             if (pool.isEmpty()) {
-                rc = new DataFileAccessor(dataManager, file);
+                rc = new DataFileAccessor(journal, file);
             } else {
-                rc = (DataFileAccessor)pool.remove(pool.size() - 1);
+                rc = pool.remove(pool.size() - 1);
             }
             used = true;
             openCounter++;
@@ -91,12 +91,12 @@
     }
 
     public DataFileAccessorPool(Journal dataManager) {
-        this.dataManager = dataManager;
+        this.journal = dataManager;
     }
 
     synchronized void clearUsedMark() {
-        for (Iterator iter = pools.values().iterator(); iter.hasNext();) {
-            Pool pool = (Pool)iter.next();
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
             pool.clearUsedMark();
         }
     }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri
Feb  6 21:35:39 2009
@@ -21,6 +21,8 @@
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
@@ -36,10 +38,9 @@
  */
 class DataFileAppender {
 
-    protected static final byte[] RESERVED_SPACE = new byte[Journal.ITEM_HEAD_RESERVED_SPACE];
     protected static final int DEFAULT_MAX_BATCH_SIZE = 1024 * 1024 * 4;
 
-    protected final Journal dataManager;
+    protected final Journal journal;
     protected final Map<WriteKey, WriteCommand> inflightWrites;
     protected final Object enqueueMutex = new Object() {
     };
@@ -84,19 +85,21 @@
 
         public final LinkedNodeList<WriteCommand> writes = new LinkedNodeList<WriteCommand>();
         public final CountDownLatch latch = new CountDownLatch(1);
-        public int size;
+		private final int offset;
+        public int size = Journal.BATCH_CONTROL_RECORD_SIZE;
 
-        public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
+        public WriteBatch(DataFile dataFile, int offset, WriteCommand write) throws IOException
{
             this.dataFile = dataFile;
-            this.writes.addLast(write);
-            size += write.location.getSize();
+			this.offset = offset;
+            this.dataFile.incrementLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            this.size=Journal.BATCH_CONTROL_RECORD_SIZE;
+            journal.addToTotalLength(Journal.BATCH_CONTROL_RECORD_SIZE);
+            append(write);
         }
 
-        public boolean canAppend(DataFile dataFile, WriteCommand write) {
-            if (dataFile != this.dataFile) {
-                return false;
-            }
-            if (size + write.location.getSize() >= maxWriteBatchSize) {
+        public boolean canAppend(WriteCommand write) {
+            int newSize = size + write.location.getSize();
+			if (newSize >= maxWriteBatchSize || offset+newSize > journal.getMaxFileLength()
) {
                 return false;
             }
             return true;
@@ -104,7 +107,12 @@
 
         public void append(WriteCommand write) throws IOException {
             this.writes.addLast(write);
-            size += write.location.getSize();
+            write.location.setDataFileId(dataFile.getDataFileId());
+            write.location.setOffset(offset+size);
+            int s = write.location.getSize();
+			size += s;
+            dataFile.incrementLength(s);
+            journal.addToTotalLength(s);
         }
     }
 
@@ -135,8 +143,8 @@
      * @param fileId
      */
     public DataFileAppender(Journal dataManager) {
-        this.dataManager = dataManager;
-        this.inflightWrites = this.dataManager.getInflightWrites();
+        this.journal = dataManager;
+        this.inflightWrites = this.journal.getInflightWrites();
     }
 
     /**
@@ -153,7 +161,7 @@
     public Location storeItem(ByteSequence data, byte type, boolean sync) throws IOException
{
 
         // Write the packet our internal buffer.
-        int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
         final Location location = new Location();
         location.setSize(size);
@@ -168,12 +176,7 @@
         // by the data manager (which is basically just appending)
 
         synchronized (this) {
-            // Find the position where this item will land at.
-            DataFile dataFile = dataManager.allocateLocation(location);
-            if (!sync) {
-                inflightWrites.put(new WriteKey(location), write);
-            }
-            batch = enqueue(dataFile, write);
+            batch = enqueue(write);
         }
         location.setLatch(batch.latch);
         if (sync) {
@@ -182,6 +185,8 @@
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
+        } else {
+        	inflightWrites.put(new WriteKey(location), write);
         }
 
         return location;
@@ -189,7 +194,7 @@
 
     public Location storeItem(ByteSequence data, byte type, Runnable onComplete) throws IOException
{
         // Write the packet our internal buffer.
-        int size = data.getLength() + Journal.ITEM_HEAD_FOOT_SPACE;
+        int size = data.getLength() + Journal.RECORD_HEAD_SPACE;
 
         final Location location = new Location();
         location.setSize(size);
@@ -198,23 +203,15 @@
         WriteBatch batch;
         WriteCommand write = new WriteCommand(location, data, onComplete);
 
-        // Locate datafile and enqueue into the executor in sychronized block so
-        // that writes get equeued onto the executor in order that they were
-        // assigned
-        // by the data manager (which is basically just appending)
-
         synchronized (this) {
-            // Find the position where this item will land at.
-            DataFile dataFile = dataManager.allocateLocation(location);
-            inflightWrites.put(new WriteKey(location), write);
-            batch = enqueue(dataFile, write);
+            batch = enqueue(write);
         }
+        inflightWrites.put(new WriteKey(location), write);
         location.setLatch(batch.latch);
-
         return location;
     }
 
-    private WriteBatch enqueue(DataFile dataFile, WriteCommand write) throws IOException
{
+    private WriteBatch enqueue(WriteCommand write) throws IOException {
         synchronized (enqueueMutex) {
             WriteBatch rc = null;
             if (shutdown) {
@@ -237,35 +234,37 @@
                 thread.start();
             }
 
-            if (nextWriteBatch == null) {
-                nextWriteBatch = new WriteBatch(dataFile, write);
-                rc = nextWriteBatch;
-                enqueueMutex.notify();
-            } else {
-                // Append to current batch if possible..
-                if (nextWriteBatch.canAppend(dataFile, write)) {
-                    nextWriteBatch.append(write);
-                    rc = nextWriteBatch;
-                } else {
-                    // Otherwise wait for the queuedCommand to be null
-                    try {
-                        while (nextWriteBatch != null) {
-                            enqueueMutex.wait();
-                        }
-                    } catch (InterruptedException e) {
-                        throw new InterruptedIOException();
-                    }
-                    if (shutdown) {
-                        throw new IOException("Async Writter Thread Shutdown");
-                    }
-
-                    // Start a new batch.
-                    nextWriteBatch = new WriteBatch(dataFile, write);
-                    rc = nextWriteBatch;
-                    enqueueMutex.notify();
-                }
+            while ( true ) {
+	            if (nextWriteBatch == null) {
+	            	DataFile file = journal.getCurrentWriteFile();
+	            	if( file.getLength() > journal.getMaxFileLength() ) {
+	            		file = journal.rotateWriteFile();
+	            	}
+	            	
+	                nextWriteBatch = new WriteBatch(file, file.getLength(), write);
+	                rc = nextWriteBatch;
+	                enqueueMutex.notify();
+	                return rc;
+	            } else {
+	                // Append to current batch if possible..
+	                if (nextWriteBatch.canAppend(write)) {
+	                    nextWriteBatch.append(write);
+	                    return nextWriteBatch;
+	                } else {
+	                    // Otherwise wait for the queuedCommand to be null
+	                    try {
+	                        while (nextWriteBatch != null) {
+	                            enqueueMutex.wait();
+	                        }
+	                    } catch (InterruptedException e) {
+	                        throw new InterruptedIOException();
+	                    }
+	                    if (shutdown) {
+	                        throw new IOException("Async Writter Thread Shutdown");
+	                    }
+	                }
+	            }
             }
-            return rc;
         }
     }
 
@@ -331,67 +330,57 @@
                     }
                     dataFile = wb.dataFile;
                     file = dataFile.openRandomAccessFile();
-                    if( file.length() < dataManager.preferedFileLength ) {
-                        file.setLength(dataManager.preferedFileLength);
+                    if( file.length() < journal.preferedFileLength ) {
+                        file.setLength(journal.preferedFileLength);
                     }
                 }
 
                 WriteCommand write = wb.writes.getHead();
 
-                // Write all the data.
-                // Only need to seek to first location.. all others
-                // are in sequence.
-                file.seek(write.location.getOffset());
-
+                // Write an empty batch control record.
+                buff.reset();
+                buff.writeInt(Journal.BATCH_CONTROL_RECORD_SIZE);
+                buff.writeByte(Journal.BATCH_CONTROL_RECORD_TYPE);
+                buff.write(Journal.BATCH_CONTROL_RECORD_MAGIC);
+                buff.writeInt(0);
+                buff.writeLong(0);
+                
                 boolean forceToDisk = false;
+                while (write != null) {
+                    forceToDisk |= write.sync | write.onComplete != null;
+                    buff.writeInt(write.location.getSize());
+                    buff.writeByte(write.location.getType());
+                    buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
+                    write = write.getNext();
+                }
 
-                // 
-                // is it just 1 big write?
-                ReplicationTarget replicationTarget = dataManager.getReplicationTarget();
-                if (wb.size == write.location.getSize() && replicationTarget==null)
{
-                    forceToDisk = write.sync | write.onComplete != null;
-
-                    // Just write it directly..
-                    file.writeInt(write.location.getSize());
-                    file.writeByte(write.location.getType());
-                    file.write(RESERVED_SPACE);
-                    file.write(Journal.ITEM_HEAD_SOR);
-                    file.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
-                    file.write(Journal.ITEM_HEAD_EOR);
-
-                } else {
-
-                    // We are going to do 1 big write.
-                    while (write != null) {
-                        forceToDisk |= write.sync | write.onComplete != null;
-
-                        buff.writeInt(write.location.getSize());
-                        buff.writeByte(write.location.getType());
-                        buff.write(RESERVED_SPACE);
-                        buff.write(Journal.ITEM_HEAD_SOR);
-                        buff.write(write.data.getData(), write.data.getOffset(), write.data.getLength());
-                        buff.write(Journal.ITEM_HEAD_EOR);
-
-                        write = write.getNext();
-                    }
-
-                    // Now do the 1 big write.
-                    ByteSequence sequence = buff.toByteSequence();
-                    file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
-                    
-                    if( replicationTarget!=null ) {
-                    	replicationTarget.replicate(wb.writes.getHead().location, sequence,
forceToDisk);
-                    }
-                    
-                    buff.reset();
+                ByteSequence sequence = buff.toByteSequence();
+                
+                // Now we can fill in the batch control record properly. 
+                buff.reset();
+                buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
+                buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+                if( journal.isChecksum() ) {
+	                Checksum checksum = new Adler32();
+	                checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE,
sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+	                buff.writeLong(checksum.getValue());
                 }
 
+                // Now do the 1 big write.
+                file.seek(wb.offset);
+                file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
+                
+                ReplicationTarget replicationTarget = journal.getReplicationTarget();
+                if( replicationTarget!=null ) {
+                	replicationTarget.replicate(wb.writes.getHead().location, sequence, forceToDisk);
+                }
+                
                 if (forceToDisk) {
                     file.getFD().sync();
                 }
 
                 WriteCommand lastWrite = wb.writes.getTail();
-                dataManager.setLastAppendLocation(lastWrite.location);
+                journal.setLastAppendLocation(lastWrite.location);
 
                 // Now that the data is on disk, remove the writes from the in
                 // flight

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Fri Feb  6
21:35:39 2009
@@ -19,6 +19,7 @@
 import java.io.File;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,12 +32,15 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.journal.DataFileAppender.WriteCommand;
 import org.apache.kahadb.journal.DataFileAppender.WriteKey;
 import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.LinkedNodeList;
 import org.apache.kahadb.util.Scheduler;
 
@@ -47,24 +51,17 @@
  */
 public class Journal {
 
-    public static final int CONTROL_RECORD_MAX_LENGTH = 1024;
-    public static final int ITEM_HEAD_RESERVED_SPACE = 21;
-    // ITEM_HEAD_SPACE = length + type+ reserved space + SOR
-    public static final int ITEM_HEAD_SPACE = 4 + 1 + ITEM_HEAD_RESERVED_SPACE + 3;
-    public static final int ITEM_HEAD_OFFSET_TO_SOR = ITEM_HEAD_SPACE - 3;
-    public static final int ITEM_FOOT_SPACE = 3; // EOR
-
-    public static final int ITEM_HEAD_FOOT_SPACE = ITEM_HEAD_SPACE + ITEM_FOOT_SPACE;
-
-    public static final byte[] ITEM_HEAD_SOR = new byte[] {
-        'S', 'O', 'R'
-    }; // 
-    public static final byte[] ITEM_HEAD_EOR = new byte[] {
-        'E', 'O', 'R'
-    }; // 
+    private static final int MAX_BATCH_SIZE = 32*1024*1024;
 
-    public static final byte DATA_ITEM_TYPE = 1;
-    public static final byte REDO_ITEM_TYPE = 2;
+	// ITEM_HEAD_SPACE = length + type+ reserved space + SOR
+    public static final int RECORD_HEAD_SPACE = 4 + 1;
+    
+    public static final byte USER_RECORD_TYPE = 1;
+    public static final byte BATCH_CONTROL_RECORD_TYPE = 2;
+    // Batch Control Item holds a 4 byte size of the batch and a 8 byte checksum of the batch.

+    public static final byte[] BATCH_CONTROL_RECORD_MAGIC = bytes("WRITE BATCH");
+    public static final int BATCH_CONTROL_RECORD_SIZE = RECORD_HEAD_SPACE+BATCH_CONTROL_RECORD_MAGIC.length+4+8;
+    
     public static final String DEFAULT_DIRECTORY = ".";
     public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
     public static final String DEFAULT_FILE_PREFIX = "db-";
@@ -82,8 +79,7 @@
     protected String filePrefix = DEFAULT_FILE_PREFIX;
     protected String fileSuffix = DEFAULT_FILE_SUFFIX;
     protected boolean started;
-    protected boolean useNio = true;
-
+    
     protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
     protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
 
@@ -99,8 +95,8 @@
     protected final AtomicLong totalLength = new AtomicLong();
     protected boolean archiveDataLogs;
 	private ReplicationTarget replicationTarget;
+    protected boolean checksum;
 
-    @SuppressWarnings("unchecked")
     public synchronized void start() throws IOException {
         if (started) {
             return;
@@ -111,11 +107,7 @@
         started = true;
         preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
 
-        if (useNio) {
-            appender = new NIODataFileAppender(this);
-        } else {
-            appender = new DataFileAppender(this);
-        }
+        appender = new DataFileAppender(this);
 
         File[] files = directory.listFiles(new FilenameFilter() {
             public boolean accept(File dir, String n) {
@@ -148,26 +140,14 @@
             }
         }
 
-        // Need to check the current Write File to see if there was a partial
-        // write to it.
-        if (!dataFiles.isEmpty()) {
-
-            // See if the lastSyncedLocation is valid..
-            Location l = lastAppendLocation.get();
-            if (l != null && l.getDataFileId() != dataFiles.getTail().getDataFileId())
{
-                l = null;
-            }
-
-            // If we know the last location that was ok.. then we can skip lots
-            // of checking
-            try {
-                l = recoveryCheck(dataFiles.getTail(), l);
-                lastAppendLocation.set(l);
-            } catch (IOException e) {
-                LOG.warn("recovery check failed", e);
-            }
+    	getCurrentWriteFile();
+        try {
+        	Location l = recoveryCheck(dataFiles.getTail());
+            lastAppendLocation.set(l);
+        } catch (IOException e) {
+            LOG.warn("recovery check failed", e);
         }
-
+        
         cleanupTask = new Runnable() {
             public void run() {
                 cleanup();
@@ -178,43 +158,97 @@
         LOG.trace("Startup took: "+(end-start)+" ms");
     }
 
-    protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException
{
-        if (location == null) {
-            location = new Location();
-            location.setDataFileId(dataFile.getDataFileId());
-            location.setOffset(0);
-        }
-        DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
+    private static byte[] bytes(String string) {
+    	try {
+			return string.getBytes("UTF-8");
+		} catch (UnsupportedEncodingException e) {
+			throw new RuntimeException(e);
+		}
+	}
+
+	protected Location recoveryCheck(DataFile dataFile) throws IOException {
+    	byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
+    	DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
+    	
+        Location location = new Location();
+        location.setDataFileId(dataFile.getDataFileId());
+        location.setOffset(0);
+
+    	DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
         try {
-            while (reader.readLocationDetailsAndValidate(location)) {
-                location.setOffset(location.getOffset() + location.getSize());
-            }
-        } finally {
+            while( true ) {
+	        	reader.read(location.getOffset(), controlRecord);
+	        	controlIs.restart();
+	        	
+	        	// Assert that it's  a batch record.
+	        	if( controlIs.readInt() != BATCH_CONTROL_RECORD_SIZE ) {
+	        		break;
+	        	}
+	        	if( controlIs.readByte() != BATCH_CONTROL_RECORD_TYPE ) {
+	        		break;
+	        	}
+	        	for( int i=0; i < BATCH_CONTROL_RECORD_MAGIC.length; i++ ) {
+	        		if( controlIs.readByte() != BATCH_CONTROL_RECORD_MAGIC[i] ) {
+	        			break;
+	        		}
+	        	}
+	        	
+	        	int size = controlIs.readInt();
+	        	if( size > MAX_BATCH_SIZE ) {
+	        		break;
+	        	}
+	        	
+	        	if( isChecksum() ) {
+		        	
+	        		long expectedChecksum = controlIs.readLong();	        	
+		        	
+	        		byte data[] = new byte[size];
+		        	reader.read(location.getOffset()+BATCH_CONTROL_RECORD_SIZE, data);
+		        	
+		        	Checksum checksum = new Adler32();
+	                checksum.update(data, 0, data.length);
+	                
+	                if( expectedChecksum!=checksum.getValue() ) {
+	                	break;
+	                }
+	                
+	        	}
+                
+	        	
+                location.setOffset(location.getOffset()+BATCH_CONTROL_RECORD_SIZE+size);
+            }
+            
+        } catch (IOException e) {
+		} finally {
             accessorPool.closeDataFileAccessor(reader);
         }
+        
         dataFile.setLength(location.getOffset());
         return location;
     }
 
-    synchronized DataFile allocateLocation(Location location) throws IOException {
-        if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() + location.getSize())
> maxFileLength)) {
-            int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue()
+ 1 : 1;
-
-            File file = getFile(nextNum);
-            DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
-            // actually allocate the disk space
-            fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
-            fileByFileMap.put(file, nextWriteFile);
-            dataFiles.addLast(nextWriteFile);
-        }
-        DataFile currentWriteFile = dataFiles.getTail();
-        location.setOffset(currentWriteFile.getLength());
-        location.setDataFileId(currentWriteFile.getDataFileId().intValue());
-        int size = location.getSize();
-        currentWriteFile.incrementLength(size);
-        totalLength.addAndGet(size);
-        return currentWriteFile;
-    }
+	void addToTotalLength(int size) {
+		totalLength.addAndGet(size);
+	}
+    
+    
+    synchronized DataFile getCurrentWriteFile() throws IOException {
+        if (dataFiles.isEmpty()) {
+            rotateWriteFile();
+        }
+        return dataFiles.getTail();
+    }
+
+    synchronized DataFile rotateWriteFile() {
+		int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1
: 1;
+		File file = getFile(nextNum);
+		DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
+		// actually allocate the disk space
+		fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
+		fileByFileMap.put(file, nextWriteFile);
+		dataFiles.addLast(nextWriteFile);
+		return nextWriteFile;
+	}
 
 	public File getFile(int nextNum) {
 		String fileName = filePrefix + nextNum + fileSuffix;
@@ -285,11 +319,7 @@
 
         // reopen open file handles...
         accessorPool = new DataFileAccessorPool(this);
-        if (useNio) {
-            appender = new NIODataFileAppender(this);
-        } else {
-            appender = new DataFileAppender(this);
-        }
+        appender = new DataFileAppender(this);
         return result;
     }
 
@@ -411,7 +441,7 @@
 
             if (cur.getType() == 0) {
                 return null;
-            } else if (cur.getType() > 0) {
+            } else if (cur.getType() == USER_RECORD_TYPE) {
                 // Only return user records.
                 return cur;
             }
@@ -496,10 +526,6 @@
         return loc;
     }
 
-    public synchronized Location write(ByteSequence data, byte type, boolean sync) throws
IOException, IllegalStateException {
-        return appender.storeItem(data, type, sync);
-    }
-
     public void update(Location location, ByteSequence data, boolean sync) throws IOException
{
         DataFile dataFile = getDataFile(location);
         DataFileAccessor updater = accessorPool.openDataFileAccessor(dataFile);
@@ -538,14 +564,6 @@
         this.lastAppendLocation.set(lastSyncedLocation);
     }
 
-    public boolean isUseNio() {
-        return useNio;
-    }
-
-    public void setUseNio(boolean useNio) {
-        this.useNio = useNio;
-    }
-
     public File getDirectoryArchive() {
         return directoryArchive;
     }
@@ -614,5 +632,13 @@
         this.fileSuffix = fileSuffix;
     }
 
+	public boolean isChecksum() {
+		return checksum;
+	}
+
+	public void setChecksum(boolean checksumWrites) {
+		this.checksum = checksumWrites;
+	}
+
 
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/Location.java Fri Feb  6
21:35:39 2009
@@ -71,13 +71,6 @@
         this.size = size;
     }
 
-    /**
-     * @return the size of the payload of the record.
-     */
-    public int getPaylodSize() {
-        return size - Journal.ITEM_HEAD_FOOT_SPACE;
-    }
-
     public int getOffset() {
         return offset;
     }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java Fri
Feb  6 21:35:39 2009
@@ -20,8 +20,6 @@
 import java.io.IOException;
 import java.io.RandomAccessFile;
 
-import org.apache.kahadb.util.IOHelper;
-
 /**
  * Allows you to open a data file in read only mode.  Useful when working with 
  * archived data files.

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java Fri
Feb  6 21:35:39 2009
@@ -23,23 +23,18 @@
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * An AsyncDataManager that works in read only mode against multiple data directories.
  * Useful for reading back archived data files.
  */
 public class ReadOnlyJournal extends Journal {
     
-    private static final Log LOG = LogFactory.getLog(ReadOnlyJournal.class);
     private final ArrayList<File> dirs;
 
     public ReadOnlyJournal(final ArrayList<File> dirs) {
         this.dirs = dirs;
     }
 
-    @SuppressWarnings("unchecked")
     public synchronized void start() throws IOException {
         if (started) {
             return;

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java?rev=741741&r1=741740&r2=741741&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java Fri Feb
 6 21:35:39 2009
@@ -41,7 +41,6 @@
     }
     
     protected void configure(Journal dataManager) {
-        dataManager.setUseNio(false);
     }
 
     @Override



Mime
View raw message