activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r694320 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/store/ main/java/org/apache/kahadb/util/ test/java/org/apache/kahadb/page/
Date Thu, 11 Sep 2008 15:56:09 GMT
Author: chirino
Date: Thu Sep 11 08:56:07 2008
New Revision: 694320

URL: http://svn.apache.org/viewvc?rev=694320&view=rev
Log:
Updated the PageFile implemenation so that Transaction Commit/Rollback occurs as Unit of Work (UoW).  
UoW is only garenatteed to be preserved on restart if enableRecoveryFile==true


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Page.java Thu Sep 11 08:56:07 2008
@@ -35,7 +35,7 @@
  */
 public class Page<T> {
 
-    public static final int PAGE_HEADER_SIZE = 17;
+    public static final int PAGE_HEADER_SIZE = 21;
 
     public static final byte PAGE_FREE_TYPE = 0;
     public static final byte PAGE_PART_TYPE = 1;
@@ -46,6 +46,9 @@
     // The following fields are persisted
     byte type = PAGE_FREE_TYPE;
     long txId;
+    // A field reserved to hold checksums..  Not in use (yet)
+    int checksum;
+    
     // Points to the next page in the chunk stream
     long next;
     T data;
@@ -94,12 +97,14 @@
         os.writeByte(type);
         os.writeLong(txId);
         os.writeLong(next);
+        os.writeInt(checksum);
     }
 
     void read(DataInput is) throws IOException {
         type = is.readByte();
         txId = is.readLong();
         next = is.readLong();
+        checksum = is.readInt();
     }
 
     public long getPageId() {
@@ -130,5 +135,13 @@
         return "[Page:" + getPageId()+", type: "+type+"]";
     }
 
+    public int getChecksum() {
+        return checksum;
+    }
+
+    public void setChecksum(int checksum) {
+        this.checksum = checksum;
+    }
+
 
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Sep 11 08:56:07 2008
@@ -28,21 +28,28 @@
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.TreeMap;
+import java.util.Map.Entry;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
+import org.apache.kahadb.util.Sequence;
 import org.apache.kahadb.util.SequenceSet;
 
 /**
@@ -60,44 +67,113 @@
     
     // 4k Default page size.
     public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
-    private static final int MAX_PAGES_IN_RECOVERY_BUFFER=1000;
-    private static final int CONFIG_SPACE_SIZE=1024*4;
+    private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
+    private static final int PAGE_FILE_HEADER_SIZE=1024*4;
 
     // Recovery header is (long offset)
-    private static final int RECOVERY_HEADER_SIZE=8;  
     private static final Log LOG = LogFactory.getLog(PageFile.class);
-    
-    private final String name;
+
+    // A PageFile will use a couple of files in this directory
     private File directory;
-    RandomAccessFile readFile;
+    // And the file names in that directory will be based on this name.
+    private final String name;
+    
+    // File handle used for reading pages..
+    private RandomAccessFile readFile;
+    // File handle used for writing pages..
     private RandomAccessFile writeFile;
+    // File handle used for writing pages..
+    private RandomAccessFile recoveryFile;
 
-    int pageSize = DEFAULT_PAGE_SIZE;
-    private int recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER;
-    private int initialPageOffset;
-
-    long nextFreePageId;
+    // The size of pages
+    private int pageSize = DEFAULT_PAGE_SIZE;
     
-    SequenceSet freeList = new SequenceSet();
-    AtomicBoolean loaded = new AtomicBoolean();
+    // The minimum number of space allocated to the recovery file in number of pages.
+    private int recoveryFileMinPageCount = 1000;
+    // The max size that we let the recovery file grow to.. ma exceed the max, but the file will get resize 
+    // to this max size as soon as  possible.
+    private int recoveryFileMaxPageCount = 10000;
+    // The number of pages in the current recovery buffer
+    private int recoveryPageCount;
+
+    private AtomicBoolean loaded = new AtomicBoolean();
+
+    // We keep a cache of pages recently used?
     private LRUCache<Long, Page> pageCache;
-    
-    private boolean enableRecoveryBuffer=false;
-    private boolean enableSyncedWrites=false;
+    // The cache of recently used pages.
     private boolean enablePageCaching=true;
-    boolean enableAsyncWrites=false;
-    
+    // How many pages will we keep in the cache?
     private int pageCacheSize = 100;
     
-    TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
+    // Should first log the page write to the recovery buffer? Avoids partial
+    // page write failures..
+    private boolean enableRecoveryFile=false;
+    // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
+    private boolean enableSyncedWrites=false;
+    // Will writes be done in an async thread?
+    private boolean enableAsyncWrites=false;
+
+    // These are used if enableAsyncWrites==true 
+    private AtomicBoolean stopWriter = new AtomicBoolean();
     private Thread writerThread;
-    AtomicBoolean stopWriter = new AtomicBoolean();
     private CountDownLatch checkpointLatch;
 
-    AtomicLong nextTxid = new AtomicLong();
+    // Keeps track of writes that are being written to disk.
+    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
+
+    // Keeps track of free pages.
+    private long nextFreePageId;
+    private SequenceSet freeList = new SequenceSet();
+    
+    private AtomicLong nextTxid = new AtomicLong();
+    
+    // Persistent settings stored in the page file. 
     private MetaData metaData;
     
     /**
+     * Use to keep track of updated pages which have not yet been committed.
+     */
+    static class PageWrite {
+        Page page;
+        byte[] current;
+        byte[] diskBound;
+
+        public PageWrite(Page page, byte[] data) {
+            this.page=page;
+            current=data;
+        }
+                
+        public void setCurrent(Page page, byte[] data) {
+            this.page=page;
+            current=data;
+        }
+
+        @Override
+        public String toString() {
+            return "[PageWrite:"+page.getPageId()+"]";
+        }
+
+        @SuppressWarnings("unchecked")
+        public Page getPage() {
+            return page;
+        }
+        
+        void begin() {
+            diskBound = current;
+            current = null;
+        }
+        
+        /**
+         * @return true if there is no pending writes to do.
+         */
+        boolean done() {
+            diskBound=null;
+            return current == null;
+        }
+
+    }
+    
+    /**
      * The MetaData object hold the persistent data associated with a PageFile object. 
      */
     public static class MetaData {
@@ -155,43 +231,6 @@
         }
     }
 
-    /**
-     * Internally used by the double write buffer implementation used in this class. 
-     */
-    class PageWrite<T> {
-        Page<T> page;
-        byte[] current;
-        byte[] diskBound;
-
-        public PageWrite(Page<T> page, byte[] data) {
-            setCurrent(page, data);
-        }
-        
-        public void setCurrent(Page<T> page, byte[] data) {
-            this.page=page;
-            current=data;
-        }
-
-        void begin() {
-            diskBound = current;
-            current = null;
-        }
-        
-        /**
-         * @return true if there is no pending writes to do.
-         */
-        boolean done() {
-            diskBound=null;
-            return current == null;
-        }
-        
-        @Override
-        public String toString() {
-            return "PageWrite{pageId="+page.getPageId()+"}";
-        }
-
-    }
-
     public Transaction tx() {
         assertLoaded();
         return new Transaction(this);
@@ -220,16 +259,19 @@
         if( loaded.get() ) {
             throw new IllegalStateException("Cannot delete page file data when the page file is loaded");
         }
-        File mainPageFile = getMainPageFile();
-        if( mainPageFile.exists() ) {
-            if( !mainPageFile.delete() ) {
-                throw new IOException("Could not delete: "+mainPageFile.getPath());
-            }
-        }
-        File freeFile = getFreeFile();
-        if( freeFile.exists() ) {
-            if( !freeFile.delete() ) {
-                throw new IOException("Could not delete: "+freeFile.getPath());
+        delete(getMainPageFile());
+        delete(getFreeFile());
+        delete(getRecoveryFile());
+    }
+
+    /**
+     * @param file
+     * @throws IOException
+     */
+    private void delete(File file) throws IOException {
+        if( file.exists() ) {
+            if( !file.delete() ) {
+                throw new IOException("Could not delete: "+file.getPath());
             }
         }
     }
@@ -272,18 +314,18 @@
                 storeMetaData();
             }
 
-            recoveryBufferSize = (pageSize+RECOVERY_HEADER_SIZE) * MAX_PAGES_IN_RECOVERY_BUFFER;
-            initialPageOffset = CONFIG_SPACE_SIZE+recoveryBufferSize;
+            if( enableRecoveryFile ) {
+                recoveryFile = new RandomAccessFile(getRecoveryFile(), "rw");
+            }
             
-            long lastTxId=0;
             if(  metaData.isCleanShutdown() ) {
-                lastTxId = metaData.getLastTxId();
+                nextTxid.set(metaData.getLastTxId()+1);
                 if( metaData.getFreePages()>0 ) {
                     loadFreeList();
                 } 
             } else {
                 LOG.debug("Recovering page file...");
-                lastTxId = redoRecoveryUpdates();
+                nextTxid.set(redoRecoveryUpdates());
                 
                 // Scan all to find the free pages.
                 freeList = new SequenceSet();
@@ -295,17 +337,15 @@
                 }
                 
             }
-            nextTxid.set( lastTxId+1 );
-            LOG.debug("Last transaction id: "+lastTxId);
             
             metaData.setCleanShutdown(false);
             storeMetaData();
             getFreeFile().delete();
             
-            if( writeFile.length() < initialPageOffset) {
-                writeFile.setLength(initialPageOffset);
+            if( writeFile.length() < PAGE_FILE_HEADER_SIZE) {
+                writeFile.setLength(PAGE_FILE_HEADER_SIZE);
             }
-            nextFreePageId=(writeFile.length()-initialPageOffset)/pageSize;
+            nextFreePageId=(writeFile.length()-PAGE_FILE_HEADER_SIZE)/pageSize;
             startWriter();
                 
         } else {
@@ -325,7 +365,7 @@
      */
     public void unload() throws IOException {
         if (loaded.compareAndSet(true, false)) {
-            checkpoint();
+            flush();
             try {
                 stopWriter();
             } catch (InterruptedException e) {
@@ -348,6 +388,10 @@
                 readFile = null;
                 writeFile.close();
                 writeFile=null;
+                if( enableRecoveryFile ) {
+                    recoveryFile.close();
+                    recoveryFile=null;
+                }
                 freeList.clear();
                 if( pageCache!=null ) {
                     pageCache=null;
@@ -361,15 +405,17 @@
         }
     }
         
+    public boolean isLoaded() {
+        return loaded.get();
+    }
+
     /**
-     * Flushes all write buffers to disk and returns the transaction id of the last write done to disk.  The 
-     * transaction id can be used for recovery purposes since it always incrementing.
+     * Flush and sync all write buffers to disk.
      * 
-     * @return the last transaction id that was fully written to disk.
      * @throws IOException
      *         If an disk error occurred.
      */
-    public long checkpoint() throws IOException {
+    public void flush() throws IOException {
 
         if( enableAsyncWrites && stopWriter.get() ) {
             throw new IOException("Page file already stopped: checkpointing is not allowed");
@@ -379,7 +425,7 @@
         CountDownLatch checkpointLatch;
         synchronized( writes ) {
             if( writes.isEmpty()) {                
-                return nextTxid.get()-1;
+                return;
             }
             if( this.checkpointLatch == null ) {
                 this.checkpointLatch = new CountDownLatch(1);
@@ -395,7 +441,6 @@
         }        
         try {
             checkpointLatch.await();        
-            return nextTxid.get()-1;
         } catch (InterruptedException e) {
             throw new InterruptedIOException();
         }
@@ -405,402 +450,133 @@
     public String toString() {
         return "Page File: "+getMainPageFile();
     }
-      
+    
     ///////////////////////////////////////////////////////////////////
-    // Internal Double write implementation follows...
+    // Private Implementation Methods
     ///////////////////////////////////////////////////////////////////
-        
-    boolean canStartWriteBatch() {
-        int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER);
-        
-        if( enableAsyncWrites ) {
-            // The constant 10 here controls how soon write batches start going to disk..
-            // would be nice to figure out how to auto tune that value.  Make to small and
-            // we reduce through put because we are locking the write mutex too offen doing writes
-            return capacityUsed >= 10 || checkpointLatch!=null;
-        } else {
-            return capacityUsed >= 80 || checkpointLatch!=null;
-        }
+    private File getMainPageFile() {
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
     }
-
     
-    /**
-     * 
-     * @param timeout
-     * @param unit
-     * @return true if a write was done.
-     * @throws InterruptedException 
-     * @throws IOException 
-     */
-    boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
-                
-        int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
-        ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
-        
-        synchronized( writes ) {            
-            // If there is not enough to write, wait for a notification...
-            if( !canStartWriteBatch() && timeout>=0 ) {
-                releaseCheckpointWaiter();
-                try {
-                    writes.wait(unit.toMillis(timeout));
-                } catch (InterruptedException e) {
-                    throw new InterruptedIOException();
-                }
-            }
-            if( writes.isEmpty() ) {
-                releaseCheckpointWaiter();
-                return false;
-            }
-            
-            // build a write batch from the current write cache. 
-            for (PageWrite write : writes.values()) {
-                
-                int l = write.current.length+RECOVERY_HEADER_SIZE;
-                
-                // Will it fit in the batch???
-                if( batchLength + l > recoveryBufferSize ) {
-                    break; // nope.. stop adding to the batch.
-                }
-                
-                batch.add(write);
-                batchLength +=l;
-                
-                // Move the current write to the diskBound write, this lets folks update the 
-                // page again without blocking for this write.
-                write.begin();
-            }
+    private File getFreeFile() {
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
+    } 
+
+    private File getRecoveryFile() {
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+".rec");
+    } 
+
+    private long toOffset(long pageId) {
+        return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
+    }
+
+    private void loadMetaData() throws IOException {
+
+        ByteArrayInputStream is;
+        MetaData v1 = new MetaData();
+        MetaData v2 = new MetaData();
+        try {
+            Properties p = new Properties();
+            byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
+            readFile.seek(0);
+            readFile.readFully(d);
+            is = new ByteArrayInputStream(d);
+            p.load(is);
+            IntrospectionSupport.setProperties(v1, p);
+        } catch (IOException e) {
+            v1 = null;
         }
-       long txId = nextTxid.get();
         
- 
-       if (enableRecoveryBuffer) {
-            // Now the batch array has all the writes, write the batch to the
-            // recovery buffer.
-            writeFile.seek(CONFIG_SPACE_SIZE);
-            // write txid of the batch
-            writeFile.writeLong(txId); 
-            // write the recovery record counter.
-            writeFile.writeInt(batch.size()); 
-            for (PageWrite w : batch) {
-                writeFile.writeLong(w.page.getPageId());
-                writeFile.write(w.diskBound, 0, pageSize);
-            }
-            if( enableSyncedWrites ) {
-                // Sync to make sure recovery buffer writes land on disk..
-                writeFile.getFD().sync();
-            }
+        try {
+            Properties p = new Properties();
+            byte[] d = new byte[PAGE_FILE_HEADER_SIZE/2];
+            readFile.seek(PAGE_FILE_HEADER_SIZE/2);
+            readFile.readFully(d);
+            is = new ByteArrayInputStream(d);
+            p.load(is);
+            IntrospectionSupport.setProperties(v2, p);
+        } catch (IOException e) {
+            v2 = null;
         }
-       
         
-        StringBuilder pageOffsets = new StringBuilder();
-        for (PageWrite w : batch) {
-            if( pageOffsets.length()!=0 ) {
-                pageOffsets.append(", ");
-            }
-            pageOffsets.append(w.page.getPageId());
-            writeFile.seek(toOffset(w.page.getPageId()));
-            writeFile.write(w.diskBound, 0, pageSize);
-        }
+        if( v1==null && v2==null ) {
+            throw new IOException("Could not load page file meta data");
+        } 
         
-        // Sync again
-        if( enableSyncedWrites ) {
-            writeFile.getFD().sync();
+        if( v1 == null || v1.metaDataTxId<0 ) {
+            metaData = v2;
+        } else if( v2==null || v1.metaDataTxId<0 ) {
+            metaData = v1;
+        } else if( v1.metaDataTxId==v2.metaDataTxId ) {
+            metaData = v1; // use the first since the 2nd could be a partial..
+        } else {
+            metaData = v2; // use the second cause the first is probably a partial.
         }
+    }
+    
+    private void storeMetaData() throws IOException {
+        // Convert the metadata into a property format
+        metaData.metaDataTxId++;
+        Properties p = new Properties();
+        IntrospectionSupport.getProperties(metaData, p, null);
         
-//        LOG.debug("write done: "+txId+", pages: "+pageOffsets);
-        nextTxid.incrementAndGet();
-
-        synchronized( writes ) {
-            for (PageWrite w : batch) {
-                // If there are no more pending writes, then remove it from the write cache.
-                if( w.done() ) {
-                    writes.remove(w.page.getPageId());
-                }
-            }
-            if( writes.isEmpty() ) {
-                releaseCheckpointWaiter();
-            }
+        ByteArrayOutputStream os = new ByteArrayOutputStream(PAGE_FILE_HEADER_SIZE);
+        p.store(os, "");
+        if( os.size() > PAGE_FILE_HEADER_SIZE/2) { 
+            throw new IOException("Configuation is to larger than: "+PAGE_FILE_HEADER_SIZE/2);
         }
+        // Fill the rest with space...
+        byte[] filler = new byte[(PAGE_FILE_HEADER_SIZE/2)-os.size()];
+        Arrays.fill(filler, (byte)' ');
+        os.write(filler);
+        os.flush();
         
-        return true;
+        byte[] d = os.toByteArray();
+
+        // So we don't loose it.. write it 2 times...
+        writeFile.seek(0);
+        writeFile.write(d);
+        writeFile.getFD().sync();
+        writeFile.seek(PAGE_FILE_HEADER_SIZE/2);
+        writeFile.write(d);
+        writeFile.getFD().sync();
     }
 
-    private void releaseCheckpointWaiter() {
-        if( checkpointLatch!=null ) {
-            checkpointLatch.countDown();
-            checkpointLatch=null;
-        }
-    }       
+    private void storeFreeList() throws IOException {
+        FileOutputStream os = new FileOutputStream(getFreeFile());
+        DataOutputStream dos = new DataOutputStream(os);
+        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
+        dos.close();
+    }
+
+    private void loadFreeList() throws IOException {
+        freeList.clear();
+        FileInputStream is = new FileInputStream(getFreeFile());
+        DataInputStream dis = new DataInputStream(is);
+        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
+        dis.close();
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors 
+    ///////////////////////////////////////////////////////////////////
     
     /**
-     * @return the last transaction id successfully written to disk.
-     * @throws IOException
+     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
+     * 
+     * @return is the recovery buffer enabled.
      */
-    private long redoRecoveryUpdates() throws IOException {
-        
-        if( readFile.length() < CONFIG_SPACE_SIZE+12 ) {
-            return 0;
-        }
-
-        // How many recovery records do we have in the recovery buffer?
-        readFile.seek(CONFIG_SPACE_SIZE);
-        long rc = readFile.readLong();
-        int recordCounter = readFile.readInt();
-        
-        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
-        try {
-            for (int i = 0; i < recordCounter; i++) {
-                long offset = readFile.readLong();
-                byte []data = new byte[pageSize];
-                if( readFile.read(data, 0, pageSize) != pageSize ) {
-                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
-                }
-                batch.put(offset, data);
-            }
-        } catch (IllegalStateException e) {
-            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
-            // as the pages should still be consistent.
-            return rc-1;
-        }
-        
-        // We need to detect if the recovery buffer write was fully synced to disk.  If it was we should see some of it's partial writes in the page file. 
-        // If we don't see any (even partial writes) to the page file, then we consider that the recovery buffer was the one that was partially written to.
-        // FYI: This depends on the header changing on every write.  It occurs because the header contains a txid which changes on every write.
-        boolean redoNeeded = false;
-        byte header[] = new byte[Page.PAGE_HEADER_SIZE]; 
-        byte header2[] = new byte[Page.PAGE_HEADER_SIZE]; 
-        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
-            try {
-                readFile.seek(e.getKey());
-                readFile.readFully(header);
-                System.arraycopy(e.getValue(), 0, header2, 0, Page.PAGE_HEADER_SIZE);
-                if( Arrays.equals(header, header2) ) {
-                    redoNeeded = true;
-                    break;
-                }
-            } catch (IOException ignore) {
-                // not all records may have been written..
-            }
-        }
-
-        // Stop here if we don't need to redo...
-        if( !redoNeeded ) {
-            return rc-1;
-        }
-        
-        
-        // Re-apply all the writes in the recovery buffer.
-        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
-            writeFile.seek(e.getKey());
-            e.getValue();
-            writeFile.write(e.getValue());
-        }
-        
-        // And sync it to disk
-        writeFile.getFD().sync(); 
-        return rc;
-    }
-
-    private void startWriter() {
-        synchronized( writes ) {
-            if( enableAsyncWrites ) {
-                stopWriter.set(false);
-                writerThread = new Thread("Page Writer") {
-                    @Override
-                    public void run() {
-                        try {
-                            while( !stopWriter.get() ) {
-                                writeBatch(1000, TimeUnit.MILLISECONDS);
-                            }
-                        } catch (Throwable e) {
-                            e.printStackTrace();
-                        } finally {
-                            releaseCheckpointWaiter();
-                        }
-                    }
-                };
-                writerThread.start();
-            }
-        }
-    }
- 
-    private void stopWriter() throws InterruptedException {
-        synchronized( writes ) {
-            if( enableAsyncWrites ) {
-                stopWriter.set(true);
-                writerThread.join();
-            }
-        }
-    }
-
-    ///////////////////////////////////////////////////////////////////
-    // Misc Internal Operations
-    ///////////////////////////////////////////////////////////////////
-
-    private File getMainPageFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
-    }
-    
-    private File getFreeFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
-    } 
-    
-    long toOffset(long pageId) {
-        return initialPageOffset+(pageId*pageSize);
-    }
-
-    /**
-     * @throws IllegalStateException if the page file is not loaded.
-     */
-    void assertLoaded() throws IllegalStateException {
-        if( !loaded.get() ) {
-            throw new IllegalStateException("PageFile is not loaded");
-        }
-    }
-
-    ///////////////////////////////////////////////////////////////////
-    // Internal Cache Related operations
-    ///////////////////////////////////////////////////////////////////
-    
-    @SuppressWarnings("unchecked") <T> Page<T> getFromCache(long pageId) {
-        synchronized(writes) {
-            PageWrite<T> pageWrite = writes.get(pageId);
-            if( pageWrite != null ) {
-                return pageWrite.page;
-            }
-        }
-
-        Page<T> result = null;
-        if (enablePageCaching) {
-            result = pageCache.get(pageId);
-        }
-        return result;
-    }
-
-    void addToCache(Page page) {
-        if (enablePageCaching) {
-            pageCache.put(page.getPageId(), page);
-        }
-    }
-
-    void removeFromCache(Page page) {
-        if (enablePageCaching) {
-            pageCache.remove(page.getPageId());
-        }
-    }
-
-    ///////////////////////////////////////////////////////////////////
-    // Internal MetaData Related Operations
-    ///////////////////////////////////////////////////////////////////
-    
-    private void loadMetaData() throws IOException {
-
-        ByteArrayInputStream is;
-        MetaData v1 = new MetaData();
-        MetaData v2 = new MetaData();
-        try {
-            Properties p = new Properties();
-            byte[] d = new byte[CONFIG_SPACE_SIZE/2];
-            readFile.seek(0);
-            readFile.readFully(d);
-            is = new ByteArrayInputStream(d);
-            p.load(is);
-            IntrospectionSupport.setProperties(v1, p);
-        } catch (IOException e) {
-            v1 = null;
-        }
-        
-        try {
-            Properties p = new Properties();
-            byte[] d = new byte[CONFIG_SPACE_SIZE/2];
-            readFile.seek(CONFIG_SPACE_SIZE/2);
-            readFile.readFully(d);
-            is = new ByteArrayInputStream(d);
-            p.load(is);
-            IntrospectionSupport.setProperties(v2, p);
-        } catch (IOException e) {
-            v2 = null;
-        }
-        
-        if( v1==null && v2==null ) {
-            throw new IOException("Could not load page file meta data");
-        } 
-        
-        if( v1 == null || v1.metaDataTxId<0 ) {
-            metaData = v2;
-        } else if( v2==null || v1.metaDataTxId<0 ) {
-            metaData = v1;
-        } else if( v1.metaDataTxId==v2.metaDataTxId ) {
-            metaData = v1; // use the first since the 2nd could be a partial..
-        } else {
-            metaData = v2; // use the second cause the first is probably a partial.
-        }
-    }
-    
-    private void storeMetaData() throws IOException {
-
-        // Convert the metadata into a property format
-        metaData.metaDataTxId++;
-        Properties p = new Properties();
-        IntrospectionSupport.getProperties(metaData, p, null);
-        
-        ByteArrayOutputStream os = new ByteArrayOutputStream(CONFIG_SPACE_SIZE);
-        p.store(os, "");
-        if( os.size() > CONFIG_SPACE_SIZE/2) { 
-            throw new IOException("Configuation is to larger than: "+CONFIG_SPACE_SIZE/2);
-        }
-        // Fill the rest with space...
-        byte[] filler = new byte[(CONFIG_SPACE_SIZE/2)-os.size()];
-        Arrays.fill(filler, (byte)' ');
-        os.write(filler);
-        os.flush();
-        
-        byte[] d = os.toByteArray();
-
-        // So we don't loose it.. write it 2 times...
-        writeFile.seek(0);
-        writeFile.write(d);
-        writeFile.getFD().sync();
-        writeFile.seek(CONFIG_SPACE_SIZE/2);
-        writeFile.write(d);
-        writeFile.getFD().sync();
-    }
-
-    private void storeFreeList() throws IOException {
-        FileOutputStream os = new FileOutputStream(getFreeFile());
-        DataOutputStream dos = new DataOutputStream(os);
-        SequenceSet.Marshaller.INSTANCE.writePayload(freeList, dos);
-        dos.close();
-    }
-
-    private void loadFreeList() throws IOException {
-        freeList.clear();
-        FileInputStream is = new FileInputStream(getFreeFile());
-        DataInputStream dis = new DataInputStream(is);
-        freeList = SequenceSet.Marshaller.INSTANCE.readPayload(dis);
-        dis.close();
-    }
-    
-
-    ///////////////////////////////////////////////////////////////////
-    // Property Accessors 
-    ///////////////////////////////////////////////////////////////////
-    
-    /**
-     * Is the recovery buffer used to double buffer page writes.  Enabled by default.
-     * 
-     * @return is the recovery buffer enabled.
-     */
-    public boolean isEnableRecoveryBuffer() {
-        return enableRecoveryBuffer;
+    public boolean isEnableRecoveryFile() {
+        return enableRecoveryFile;
     }
 
     /**
      * Sets if the recovery buffer uses to double buffer page writes.  Enabled by default.  Disabling this
      * may potentially cause partial page writes which can lead to page file corruption.
      */
-    public void setEnableRecoveryBuffer(boolean doubleBuffer) {
-        this.enableRecoveryBuffer = doubleBuffer;
+    public void setEnableRecoveryFile(boolean doubleBuffer) {
+        assertNotLoaded();
+        this.enableRecoveryFile = doubleBuffer;
     }
 
     /**
@@ -815,6 +591,7 @@
      * @param syncWrites
      */
     public void setEnableSyncedWrites(boolean syncWrites) {
+        assertNotLoaded();
         this.enableSyncedWrites = syncWrites;
     }
     
@@ -842,9 +619,7 @@
      *         once the page file is loaded.
      */
     public void setPageSize(int pageSize) throws IllegalStateException {
-        if (loaded.get() && pageSize != this.pageSize) {
-            throw new IllegalStateException("Pages already loaded - can't reset page size");
-        }
+        assertNotLoaded();
         this.pageSize = pageSize;
     }
     
@@ -859,6 +634,7 @@
      * @param allows you to enable read page caching
      */
     public void setEnablePageCaching(boolean enablePageCaching) {
+        assertNotLoaded();
         this.enablePageCaching = enablePageCaching;
     }
 
@@ -873,6 +649,7 @@
      * @param Sets the maximum number of pages that will get stored in the read page cache.
      */
     public void setPageCacheSize(int pageCacheSize) {
+        assertNotLoaded();
         this.pageCacheSize = pageCacheSize;
     }
 
@@ -881,6 +658,7 @@
     }
 
     public void setEnableAsyncWrites(boolean enableAsyncWrites) {
+        assertNotLoaded();
         this.enableAsyncWrites = enableAsyncWrites;
     }
 
@@ -888,4 +666,420 @@
         return readFile.length();
     }
     
+    /**
+     * @return the number of pages allocated in the PageFile
+     */
+    public long getPageCount() {
+        return nextFreePageId;
+    }
+
+    public int getRecoveryFileMinPageCount() {
+        return recoveryFileMinPageCount;
+    }
+
+    public void setRecoveryFileMinPageCount(int recoveryFileMinPageCount) {
+        assertNotLoaded();
+        this.recoveryFileMinPageCount = recoveryFileMinPageCount;
+    }
+
+    public int getRecoveryFileMaxPageCount() {
+        return recoveryFileMaxPageCount;
+    }
+
+    public void setRecoveryFileMaxPageCount(int recoveryFileMaxPageCount) {
+        assertNotLoaded();
+        this.recoveryFileMaxPageCount = recoveryFileMaxPageCount;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Package Protected Methods exposed to Transaction
+    ///////////////////////////////////////////////////////////////////
+
+    /**
+     * @throws IllegalStateException if the page file is not loaded.
+     */
+    void assertLoaded() throws IllegalStateException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("PageFile is not loaded");
+        }
+    }
+    void assertNotLoaded() throws IllegalStateException {
+        if( loaded.get() ) {
+            throw new IllegalStateException("PageFile is loaded");
+        }
+    }
+        
+    /** 
+     * Allocates a block of free pages that you can write data to.
+     * 
+     * @param count the number of sequential pages to allocate
+     * @return the first page of the sequential set. 
+     * @throws IOException
+     *         If an disk error occurred.
+     * @throws IllegalStateException
+     *         if the PageFile is not loaded
+     */
+    <T> Page<T> allocate(int count) throws IOException {
+        assertLoaded();
+        if (count <= 0) {
+            throw new IllegalArgumentException("The allocation count must be larger than zero");
+        }
+
+        Sequence seq = freeList.removeFirstSequence(count);
+
+        // We may need to create new free pages...
+        if (seq == null) {
+
+            Page<T> first = null;
+            int c = count;
+            while (c > 0) {
+                Page<T> page = new Page<T>(nextFreePageId++);
+                page.makeFree(getNextWriteTransactionId());
+
+                if (first == null) {
+                    first = page;
+                }
+
+                addToCache(page);
+                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
+                page.write(out);
+                write(page, out.getData());
+
+                // LOG.debug("allocate writing: "+page.getPageId());
+                c--;
+            }
+
+            return first;
+        }
+
+        Page<T> page = new Page<T>(seq.getFirst());
+        page.makeFree(0);
+        // LOG.debug("allocated: "+page.getPageId());
+        return page;
+    }
+
+    long getNextWriteTransactionId() {
+        return nextTxid.incrementAndGet();
+    }
+
+    void readPage(long pageId, byte[] data) throws IOException {
+        readFile.seek(toOffset(pageId));
+        readFile.readFully(data);
+    }
+
+    public void freePage(long pageId) {
+        freeList.add(pageId);
+        if( enablePageCaching ) {
+            pageCache.remove(pageId);
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private <T> void write(Page<T> page, byte[] data) throws IOException {
+        final PageWrite write = new PageWrite(page, data);
+        Entry<Long, PageWrite> entry = new Entry<Long, PageWrite>(){
+            public Long getKey() {
+                return write.getPage().getPageId();
+            }
+            public PageWrite getValue() {
+                return write;
+            }
+            public PageWrite setValue(PageWrite value) {
+                return null;
+            }
+        };
+        Entry<Long, PageWrite>[] entries = new Map.Entry[]{entry};
+        write(Arrays.asList(entries));
+    }
+
+    void write(Collection<Map.Entry<Long, PageWrite>> updates) throws IOException {
+        synchronized( writes ) {
+            
+            for (Map.Entry<Long, PageWrite> entry : updates) {
+                Long key = entry.getKey();
+                PageWrite value = entry.getValue();
+                PageWrite write = writes.get(key);
+                if( write==null ) {
+                    writes.put(key, value);
+                } else {
+                    write.setCurrent(value.page, value.current);
+                }
+            }
+            
+            // Once we start approaching capacity, notify the writer to start writing
+            if( canStartWriteBatch() ) {
+                if( enableAsyncWrites  ) {
+                    writes.notify();
+                } else {
+                    while( canStartWriteBatch() ) {
+                        writeBatch(-1, TimeUnit.MILLISECONDS);
+                    }
+                }
+            }
+        }            
+    }
+    
+    private boolean canStartWriteBatch() {
+        int capacityUsed = ((writes.size() * 100)/1000);
+        if( enableAsyncWrites ) {
+            // The constant 10 here controls how soon write batches start going to disk..
+            // would be nice to figure out how to auto tune that value.  Make to small and
+            // we reduce through put because we are locking the write mutex too often doing writes
+            return capacityUsed >= 10 || checkpointLatch!=null;
+        } else {
+            return capacityUsed >= 80 || checkpointLatch!=null;
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Cache Related operations
+    ///////////////////////////////////////////////////////////////////
+    @SuppressWarnings("unchecked")
+    <T> Page<T> getFromCache(long pageId) {
+        synchronized(writes) {
+            PageWrite pageWrite = writes.get(pageId);
+            if( pageWrite != null ) {
+                return pageWrite.page;
+            }
+        }
+
+        Page<T> result = null;
+        if (enablePageCaching) {
+            result = pageCache.get(pageId);
+        }
+        return result;
+    }
+
+    void addToCache(Page page) {
+        if (enablePageCaching) {
+            pageCache.put(page.getPageId(), page);
+        }
+    }
+
+    void removeFromCache(Page page) {
+        if (enablePageCaching) {
+            pageCache.remove(page.getPageId());
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal Double write implementation follows...
+    ///////////////////////////////////////////////////////////////////
+    
+    /**
+     * 
+     * @param timeout
+     * @param unit
+     * @return true if a write was done.
+     * @throws InterruptedException 
+     * @throws IOException 
+     */
+    private boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
+                
+        ArrayList<PageWrite> batch;
+        synchronized( writes ) {   
+            
+            // If there is not enough to write, wait for a notification...
+            if( !canStartWriteBatch() && timeout>=0 ) {
+                releaseCheckpointWaiter();
+                try {
+                    writes.wait(unit.toMillis(timeout));
+                } catch (InterruptedException e) {
+                    throw new InterruptedIOException();
+                }
+            }
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+                return false;
+            }
+
+            batch = new ArrayList<PageWrite>(writes.size());
+            // build a write batch from the current write cache. 
+            for (PageWrite write : writes.values()) {
+                batch.add(write);
+                // Move the current write to the diskBound write, this lets folks update the 
+                // page again without blocking for this write.
+                write.begin();
+            }
+        }
+        
+ 
+       if (enableRecoveryFile) {
+           
+           // Using Adler-32 instead of CRC-32 because it's much faster and it's 
+           // weakness for short messages with few hundred bytes is not a factor in this case since we know 
+           // our write batches are going to much larger.
+           Checksum checksum = new Adler32();
+           for (PageWrite w : batch) {
+               checksum.update(w.diskBound, 0, pageSize);
+           }
+           
+           // Can we shrink the recovery buffer??
+           if( recoveryPageCount > recoveryFileMaxPageCount ) {
+               int t = Math.max(recoveryFileMinPageCount, batch.size());
+               recoveryFile.setLength(recoveryFileSizeForPages(t));
+           }
+           
+            // Record the page writes in the recovery buffer.
+            recoveryFile.seek(0);
+            // Store the next tx id...
+            recoveryFile.writeLong(nextTxid.get());
+            // Store the checksum for thw write batch so that on recovery we know if we have a consistent 
+            // write batch on disk.
+            recoveryFile.writeLong(checksum.getValue());
+            // Write the # of pages that will follow
+            recoveryFile.writeInt(batch.size());
+            
+            
+            // Write the pages.
+            recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+            for (PageWrite w : batch) {
+                recoveryFile.writeLong(w.page.getPageId());
+                recoveryFile.write(w.diskBound, 0, pageSize);
+            }
+            
+            if (enableSyncedWrites) {
+                // Sync to make sure recovery buffer writes land on disk..
+                recoveryFile.getFD().sync();
+            }
+            
+            recoveryPageCount = batch.size();
+        }
+       
+        
+        for (PageWrite w : batch) {
+            writeFile.seek(toOffset(w.page.getPageId()));
+            writeFile.write(w.diskBound, 0, pageSize);
+        }
+        
+        // Sync again
+        if( enableSyncedWrites ) {
+            writeFile.getFD().sync();
+        }
+        
+        synchronized( writes ) {
+            for (PageWrite w : batch) {
+                // If there are no more pending writes, then remove it from the write cache.
+                if( w.done() ) {
+                    writes.remove(w.page.getPageId());
+                }
+            }
+            if( writes.isEmpty() ) {
+                releaseCheckpointWaiter();
+            }
+        }
+        
+        return true;
+    }
+
+    private long recoveryFileSizeForPages(int pageCount) {
+        return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
+    }
+
+    private void releaseCheckpointWaiter() {
+        if( checkpointLatch!=null ) {
+            checkpointLatch.countDown();
+            checkpointLatch=null;
+        }
+    }       
+    
+    /**
+     * Inspects the recovery buffer and re-applies any 
+     * partially applied page writes.
+     * 
+     * @return the next transaction id that can be used.
+     * @throws IOException
+     */
+    private long redoRecoveryUpdates() throws IOException {
+        if( !enableRecoveryFile ) {
+            return 0;
+        }
+        recoveryPageCount=0;
+        
+        // Are we initializing the recovery file?
+        if( recoveryFile.length() == 0 ) {
+            // Write an empty header..
+            recoveryFile.write(new byte[RECOVERY_FILE_HEADER_SIZE]);
+            // Preallocate the minium size for better performance.
+            recoveryFile.setLength(recoveryFileSizeForPages(recoveryFileMinPageCount));
+            return 0;
+        }
+        
+        // How many recovery pages do we have in the recovery buffer?
+        recoveryFile.seek(0);
+        long nextTxId = readFile.readLong();
+        long expectedChecksum = readFile.readLong();
+        int pageCounter = readFile.readInt();
+        
+        recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+        Checksum checksum = new Adler32();
+        LinkedHashMap<Long, byte[]> batch = new LinkedHashMap<Long, byte[]>();
+        try {
+            for (int i = 0; i < pageCounter; i++) {
+                long offset = recoveryFile.readLong();
+                byte []data = new byte[pageSize];
+                if( recoveryFile.read(data, 0, pageSize) != pageSize ) {
+                    // Invalid recovery record, Could not fully read the data". Probably due to a partial write to the recovery buffer
+                    return nextTxId;
+                }
+                checksum.update(data, 0, pageSize);
+                batch.put(offset, data);
+            }
+        } catch (IllegalStateException e) {
+            // If an error occurred it was cause the redo buffer was not full written out correctly.. so don't redo it. 
+            // as the pages should still be consistent.
+            return nextTxId;
+        }
+        
+        recoveryPageCount = pageCounter;
+        
+        // If the checksum is not valid then the recovery buffer was partially written to disk.
+        if( checksum.getValue() != expectedChecksum ) {
+            return nextTxId;
+        }
+        
+        // Re-apply all the writes in the recovery buffer.
+        for (Map.Entry<Long, byte[]> e : batch.entrySet()) {
+            writeFile.seek(e.getKey());
+            e.getValue();
+            writeFile.write(e.getValue());
+        }
+        
+        // And sync it to disk
+        writeFile.getFD().sync();
+        return nextTxId;
+    }
+
+    private void startWriter() {
+        synchronized( writes ) {
+            if( enableAsyncWrites ) {
+                stopWriter.set(false);
+                writerThread = new Thread("Page Writer") {
+                    @Override
+                    public void run() {
+                        try {
+                            while( !stopWriter.get() ) {
+                                writeBatch(1000, TimeUnit.MILLISECONDS);
+                            }
+                        } catch (Throwable e) {
+                            e.printStackTrace();
+                        } finally {
+                            releaseCheckpointWaiter();
+                        }
+                    }
+                };
+                writerThread.start();
+            }
+        }
+    }
+ 
+    private void stopWriter() throws InterruptedException {
+        synchronized( writes ) {
+            if( enableAsyncWrites ) {
+                stopWriter.set(true);
+                writerThread.join();
+            }
+        }
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Thu Sep 11 08:56:07 2008
@@ -16,14 +16,16 @@
  */
 package org.apache.kahadb.page;
 
+import com.sun.org.apache.bcel.internal.generic.AllocationInstruction;
+
 import java.io.DataInputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.NoSuchElementException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.kahadb.Marshaller;
 import org.apache.kahadb.page.PageFile.PageWrite;
@@ -31,6 +33,7 @@
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.Sequence;
+import org.apache.kahadb.util.SequenceSet;
 
 /**
  * The class used to read/update a PageFile object.  Using a transaction allows you to
@@ -39,7 +42,8 @@
 public class Transaction implements Iterable<Page> {
     
     /**
-     * 
+     * The PageOverflowIOException occurs when a page write is requested
+     * and it's data is larger than what would fit into a single page.
      */
     public class PageOverflowIOException extends IOException {
         public PageOverflowIOException(String message) {
@@ -47,6 +51,10 @@
         }
     }
     
+    /**
+     * The InvalidPageIOException is thrown if try to load/store a a page
+     * with an invalid page id.
+     */
     public class InvalidPageIOException extends IOException {
         private final long page;
 
@@ -78,19 +86,25 @@
     public interface CallableClosure<R, T extends Throwable> {
         public R execute(Transaction tx) throws T;
     }
+    
 
-
+    // The page file that this Transaction operates against.
     private final PageFile pageFile;
+    // If this transaction is updating stuff.. this is the tx of 
+    private long writeTransactionId=-1;
+    // List of pages that this transaction has modified.
+    private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>();
+    // List of pages allocated in this transaction
+    private final SequenceSet allocateList = new SequenceSet();
+    // List of pages freed in this transaction
+    private final SequenceSet freeList = new SequenceSet();
 
-    /**
-     * @param pageFile
-     */
     Transaction(PageFile pageFile) {
         this.pageFile = pageFile;
     }
 
     /**
-     * @see org.apache.kahadb.page.Transaction#getPageFile()
+     * @return the page file that created this Transaction
      */
     public PageFile getPageFile() {
         return this.pageFile;
@@ -120,42 +134,11 @@
      *         if the PageFile is not loaded
      */
     public <T> Page<T> allocate(int count) throws IOException {
-        this.pageFile.assertLoaded();
-        if (count <= 0) {
-            throw new IllegalArgumentException("The allocation count must be larger than zero");
-        }
-
-        Sequence seq = this.pageFile.freeList.removeFirstSequence(count);
-
-        // We may need to create new free pages...
-        if (seq == null) {
-
-            Page<T> first = null;
-            int c = count;
-            while (c > 0) {
-                Page<T> page = new Page<T>(this.pageFile.nextFreePageId++);
-                page.makeFree(this.pageFile.nextTxid.get());
-
-                if (first == null) {
-                    first = page;
-                }
-
-                this.pageFile.addToCache(page);
-                DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize);
-                page.write(out);
-                write(page, out.getData());
-
-                // LOG.debug("allocate writing: "+page.getPageId());
-                c--;
-            }
-
-            return first;
-        }
-
-        Page<T> page = new Page<T>(seq.getFirst());
-        page.makeFree(0);
-        // LOG.debug("allocated: "+page.getPageId());
-        return page;
+        // TODO: we need to track allocated pages so that they can be returned if the 
+        // transaction gets rolled back.
+        Page<T> rc = pageFile.allocate(count);
+        allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
+        return rc;
     }
 
     /**
@@ -198,7 +181,7 @@
      *         if the PageFile is not loaded
      */
     public <T> void free(Page<T> page, int count) throws IOException {
-        this.pageFile.assertLoaded();
+        pageFile.assertLoaded();
         long offsetPage = page.getPageId();
         for (int i = 0; i < count; i++) {
             if (page == null) {
@@ -208,7 +191,7 @@
             page = null;
         }
     }
-
+    
     /**
      * Frees up a previously allocated page so that it can be re-allocated again.
      * 
@@ -219,7 +202,7 @@
      *         if the PageFile is not loaded
      */
     public <T> void free(Page<T> page) throws IOException {
-        this.pageFile.assertLoaded();
+        pageFile.assertLoaded();
 
         // We may need loop to free up a page chain.
         while (page != null) {
@@ -234,14 +217,13 @@
                 next = load(page.getNext(), null);
             }
 
-            page.makeFree(this.pageFile.nextTxid.get());
+            page.makeFree(getWriteTransactionId());
 
-            DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize);
+            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
             page.write(out);
             write(page, out.getData());
 
-            this.pageFile.removeFromCache(page);
-            this.pageFile.freeList.add(page.getPageId());
+            freeList.add(page.getPageId());
             page = next;
         }
     }
@@ -275,19 +257,19 @@
      * @throws IOException
      */
     public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
-        this.pageFile.assertLoaded();
+        pageFile.assertLoaded();
 
         // Copy to protect against the end user changing
         // the page instance while we are doing a write.
         final Page copy = page.copy();
-        this.pageFile.addToCache(copy);
+        pageFile.addToCache(copy);
 
         //
         // To support writing VERY large data, we override the output stream so
         // that we
         // we do the page writes incrementally while the data is being
         // marshalled.
-        DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize * 2) {
+        DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
             Page current = copy;
 
             @SuppressWarnings("unchecked")
@@ -295,7 +277,8 @@
             protected void onWrite() throws IOException {
 
                 // Are we at an overflow condition?
-                if (pos >= Transaction.this.pageFile.pageSize) {
+                final int pageSize = pageFile.getPageSize();
+                if (pos >= pageSize) {
                     // If overflow is allowed
                     if (overflow) {
 
@@ -312,12 +295,12 @@
                         int oldPos = pos;
                         pos = 0;
 
-                        current.makePagePart(next.getPageId(), Transaction.this.pageFile.nextTxid.get());
+                        current.makePagePart(next.getPageId(), getWriteTransactionId());
                         current.write(this);
 
                         // Do the page write..
-                        byte[] data = new byte[Transaction.this.pageFile.pageSize];
-                        System.arraycopy(buf, 0, data, 0, Transaction.this.pageFile.pageSize);
+                        byte[] data = new byte[pageSize];
+                        System.arraycopy(buf, 0, data, 0, pageSize);
                         Transaction.this.write(current, data);
 
                         // Reset for the next page chunk
@@ -325,8 +308,8 @@
                         // The page header marshalled after the data is written.
                         skip(Page.PAGE_HEADER_SIZE);
                         // Move the overflow data after the header.
-                        System.arraycopy(buf, Transaction.this.pageFile.pageSize, buf, pos, oldPos - Transaction.this.pageFile.pageSize);
-                        pos += oldPos - Transaction.this.pageFile.pageSize;
+                        System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
+                        pos += oldPos - pageSize;
                         current = next;
 
                     } else {
@@ -346,7 +329,7 @@
                     free(current.getNext());
                 }
 
-                current.makePageEnd(pos, Transaction.this.pageFile.nextTxid.get());
+                current.makePageEnd(pos, getWriteTransactionId());
 
                 // Write the header..
                 pos = 0;
@@ -362,38 +345,6 @@
     }
 
     /**
-     * @param page
-     * @param data
-     * @throws IOException
-     */
-    @SuppressWarnings("unchecked")
-    private <T> void write(final Page<T> page, byte[] data) throws IOException {
-        Long key = page.getPageId();
-        synchronized (this.pageFile.writes) {
-            // If it's not in the write cache...
-            PageWrite<T> write = this.pageFile.writes.get(key);
-            if (write == null) {
-                write = this.pageFile.new PageWrite<T>(page, data);
-                this.pageFile.writes.put(key, write);
-            } else {
-                write.setCurrent(page, data);
-            }
-
-            // Once we start approaching capacity, notify the writer to start
-            // writing
-            if (this.pageFile.canStartWriteBatch()) {
-                if (this.pageFile.enableAsyncWrites) {
-                    this.pageFile.writes.notify();
-                } else {
-                    while (this.pageFile.canStartWriteBatch()) {
-                        this.pageFile.writeBatch(-1, TimeUnit.MILLISECONDS);
-                    }
-                }
-            }
-        }
-    }
-
-    /**
      * Loads a page from disk.
      * 
      * @param pageId 
@@ -407,34 +358,44 @@
      *         if the PageFile is not loaded
      */
     public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
-        this.pageFile.assertLoaded();
+        pageFile.assertLoaded();
         Page<T> page = new Page<T>(pageId);
         load(page, marshaller);
         return page;
     }
 
     /**
-     * Loads a page from disk.  If the page.pageId is not valid then then this method will set the page.type to
-     * Page.INVALID_TYPE.
+     * Loads a page from disk.
      * 
      * @param page - The pageId field must be properly set 
      * @param marshaller
      *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
      * @throws IOException
      *         If an disk error occurred.
+     * @throws InvalidPageIOException
+     *         If the page is is not valid.      
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
+    @SuppressWarnings("unchecked")
     public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
-        this.pageFile.assertLoaded();
+        pageFile.assertLoaded();
 
         // Can't load invalid offsets...
-        if (page.getPageId() < 0) {
-            throw new InvalidPageIOException("Page id is not valid", page.getPageId());
+        long pageId = page.getPageId();
+        if (pageId < 0) {
+            throw new InvalidPageIOException("Page id is not valid", pageId);
+        }
+
+        // It might be a page this transaction has modified...
+        PageWrite update = writes.get(pageId);
+        if (update != null) {
+            page.copy(update.getPage());
+            return;
         }
 
-        // Try to load it from the cache first...
-        Page<T> t = this.pageFile.getFromCache(page.getPageId());
+        // We may be able to get it from the cache...
+        Page<T> t = pageFile.getFromCache(pageId);
         if (t != null) {
             page.copy(t);
             return;
@@ -449,15 +410,14 @@
         } else {
             // Page header read.
             DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
-            this.pageFile.readFile.seek(this.pageFile.toOffset(page.getPageId()));
-            this.pageFile.readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE);
+            pageFile.readPage(pageId, in.getRawData());
             page.read(in);
             page.set(null);
         }
 
         // Cache it.
         if (marshaller != null) {
-            this.pageFile.addToCache(page);
+            pageFile.addToCache(page);
         }
     }
 
@@ -469,7 +429,7 @@
 
         return new InputStream() {
 
-            private ByteSequence chunk = new ByteSequence(new byte[Transaction.this.pageFile.pageSize]);
+            private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
             private Page page = readPage(p);
             private int pageCount = 1;
 
@@ -478,10 +438,11 @@
 
             private Page readPage(Page page) throws IOException {
                 // Read the page data
-                Transaction.this.pageFile.readFile.seek(Transaction.this.pageFile.toOffset(page.getPageId()));
-                Transaction.this.pageFile.readFile.readFully(chunk.getData(), 0, Transaction.this.pageFile.pageSize);
+                
+                pageFile.readPage(page.getPageId(), chunk.getData());
+                
                 chunk.setOffset(0);
-                chunk.setLength(Transaction.this.pageFile.pageSize);
+                chunk.setLength(pageFile.getPageSize());
 
                 DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
                 page.read(in);
@@ -569,8 +530,8 @@
 
             public void mark(int markpos) {
                 markPage = page;
-                byte data[] = new byte[Transaction.this.pageFile.pageSize];
-                System.arraycopy(chunk.getData(), 0, data, 0, Transaction.this.pageFile.pageSize);
+                byte data[] = new byte[pageFile.getPageSize()];
+                System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
                 markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
             }
 
@@ -607,7 +568,7 @@
      */
     public Iterator<Page> iterator(final boolean includeFreePages) {
 
-        this.pageFile.assertLoaded();
+        pageFile.assertLoaded();
 
         return new Iterator<Page>() {
             long nextId;
@@ -615,7 +576,7 @@
             Page lastPage;
 
             private void findNextPage() {
-                if (!Transaction.this.pageFile.loaded.get()) {
+                if (!pageFile.isLoaded()) {
                     throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
                 }
 
@@ -624,7 +585,7 @@
                 }
 
                 try {
-                    while (nextId < Transaction.this.pageFile.nextFreePageId) {
+                    while (nextId < pageFile.getPageCount()) {
 
                         Page page = load(nextId, null);
 
@@ -656,6 +617,7 @@
                 }
             }
 
+            @SuppressWarnings("unchecked")
             public void remove() {
                 if (lastPage == null) {
                     throw new IllegalStateException();
@@ -670,19 +632,89 @@
         };
     }
 
+    ///////////////////////////////////////////////////////////////////
+    // Commit / Rollback related methods..
+    ///////////////////////////////////////////////////////////////////
+    
     /**
      * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
      * with the transaction are written to disk or none will.
      */
     public void commit() throws IOException {
+        if( writeTransactionId!=-1 ) {
+            // Actually do the page writes...
+            pageFile.write(writes.entrySet());
+            // Release the pages that were freed up in the transaction..
+            freePages(freeList);
+            
+            freeList.clear();
+            allocateList.clear();
+            writes.clear();
+            writeTransactionId = -1;
+        }
     }
 
     /**
      * Rolls back the transaction.
      */
-    private void rollback() throws IOException {
+    public void rollback() throws IOException {
+        if( writeTransactionId!=-1 ) {
+            // Release the pages that were allocated in the transaction...
+            freePages(allocateList);
+
+            freeList.clear();
+            allocateList.clear();
+            writes.clear();
+            writeTransactionId = -1;
+        }
     }
 
+    private long getWriteTransactionId() {
+        if( writeTransactionId==-1 ) {
+            writeTransactionId = pageFile.getNextWriteTransactionId();
+        }
+        return writeTransactionId;
+    }
+
+    /**
+     * Queues up a page write that should get done when commit() gets called.
+     */
+    @SuppressWarnings("unchecked")
+    private void write(final Page page, byte[] data) throws IOException {
+        Long key = page.getPageId();
+        // TODO: if a large update transaction is in progress, we may want to move
+        // all the current updates to a temp file so that we don't keep using 
+        // up memory.
+        writes.put(key, new PageWrite(page, data));        
+    }   
+
+    /**
+     * @param list
+     * @throws RuntimeException
+     */
+    private void freePages(SequenceSet list) throws RuntimeException {
+        Sequence seq = list.getHead();
+        while( seq!=null ) {
+            seq.each(new Sequence.Closure<RuntimeException>(){
+                public void execute(long value) {
+                    pageFile.freePage(value);
+                }
+            });
+            seq = seq.getNext();
+        }
+    }
+    
+    /**
+     * @return true if there are no uncommitted page file updates associated with this transaction.
+     */
+    public boolean isReadOnly() {
+        return writeTransactionId==-1;
+    }
+    
+    ///////////////////////////////////////////////////////////////////
+    // Transaction closure helpers...
+    ///////////////////////////////////////////////////////////////////
+    
     /**
      * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
      * If the closure throws an Exception, then the transaction is rolled back.
@@ -730,18 +762,4 @@
         }
     }
 
-    /**
-     * @return true if there are no uncommitted page file updates associated with this transaction.
-     */
-    public boolean isReadOnly() {
-        return false;
-    }
-
-    /**
-     * @return the number of pages allocated in the PageFile
-     */
-    public long getPageCount() {
-        return this.pageFile.nextFreePageId;
-    }
-
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Thu Sep 11 08:56:07 2008
@@ -207,7 +207,7 @@
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
-                    if (tx.getPageCount() == 0) {
+                    if (pageFile.getPageCount() == 0) {
                         // First time this is created.. Initialize the metadata
                         Page<Metadata> page = tx.allocate();
                         assert page.getPageId() == 0;
@@ -367,7 +367,7 @@
                         checkpointUpdate(tx);
                     }
                 });
-                pageFile.checkpoint();
+                pageFile.flush();
             }
             store(new KahaTraceCommand().setMessage("CHECKPOINT " + new Date()), true);
         } catch (IOException e) {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LinkedNodeList.java Thu Sep 11 08:56:07 2008
@@ -48,6 +48,12 @@
     public T getTail() {
         return head.prev;
     }
+    
+    public void clear() {
+        while (head != null) {
+            head.unlink();
+        }
+    }
 
     public void addLast(LinkedNodeList<T> list) {
         if (list.isEmpty()) {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/Sequence.java Thu Sep 11 08:56:07 2008
@@ -70,5 +70,15 @@
     public void setLast(long last) {
         this.last = last;
     }
+    
+    public interface Closure<T extends Throwable> {
+        public void execute(long value) throws T;
+    }
+
+    public <T extends Throwable> void each(Closure<T> closure) throws T {
+        for( long i=first; i<=last; i++ ) {
+            closure.execute(i);
+        }
+    }
 
 }
\ No newline at end of file

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java Thu Sep 11 08:56:07 2008
@@ -30,7 +30,7 @@
  * 
  * @author chirino
  */
-public class SequenceSet {
+public class SequenceSet extends LinkedNodeList<Sequence> {
     
     public static class Marshaller implements org.apache.kahadb.Marshaller<SequenceSet> {
 
@@ -46,18 +46,18 @@
             for (int i = 0; i < count; i++) {
                 if( in.readBoolean() ) {
                     Sequence sequence = new Sequence(in.readLong(), in.readLong());
-                    value.sequences.addLast(sequence);
+                    value.addLast(sequence);
                 } else {
                     Sequence sequence = new Sequence(in.readLong());
-                    value.sequences.addLast(sequence);
+                    value.addLast(sequence);
                 }
             }
             return value;
         }
 
         public void writePayload(SequenceSet value, DataOutput out) throws IOException {
-            out.writeInt(value.sequences.size());
-            Sequence sequence = value.sequences.getHead();
+            out.writeInt(value.size());
+            Sequence sequence = value.getHead();
             while (sequence != null ) {
                 if( sequence.range() > 1 ) {
                     out.writeBoolean(true);
@@ -72,8 +72,13 @@
         }
     }
     
-    LinkedNodeList<Sequence> sequences = new LinkedNodeList<Sequence>();
-    int size;
+    public void add(Sequence value) {
+        // TODO we can probably optimize this a bit
+        for(long i=value.first; i<value.last+1; i++) {
+            add(i);
+        }
+    }
+    
     
     /**
      * 
@@ -83,13 +88,12 @@
      */
     public boolean add(long value) {
 
-        if (sequences.isEmpty()) {
-            sequences.addFirst(new Sequence(value));
-            size++;
+        if (isEmpty()) {
+            addFirst(new Sequence(value));
             return true;
         }
 
-        Sequence sequence = sequences.getHead();
+        Sequence sequence = getHead();
         while (sequence != null) {
 
             if (sequence.isAdjacentToLast(value)) {
@@ -104,7 +108,6 @@
                         next.unlink();
                     }
                 }
-                size++;
                 return true;
             }
 
@@ -121,7 +124,6 @@
                         prev.unlink();
                     }
                 }
-                size++;
                 return true;
             }
 
@@ -129,7 +131,6 @@
             if (value < sequence.first) {
                 // Then insert a new entry before this sequence item.
                 sequence.linkBefore(new Sequence(value));
-                size++;
                 return true;
             }
 
@@ -142,8 +143,7 @@
         }
 
         // Then the value is getting appended to the tail of the sequence.
-        sequences.addLast(new Sequence(value));
-        size++;
+        addLast(new Sequence(value));
         return true;
     }
     
@@ -154,7 +154,7 @@
      * @throws NoSuchElementException if this list is empty.
      */
     public long removeFirst() {
-        if (sequences.isEmpty()) {
+        if (isEmpty()) {
             throw new NoSuchElementException();
         }
         
@@ -168,21 +168,19 @@
      * @return a sequence that is count range large, or null if no sequence is that large in the list.
      */
     public Sequence removeFirstSequence(long count) {
-        if (sequences.isEmpty()) {
+        if (isEmpty()) {
             return null;
         }
         
-        Sequence sequence = sequences.getHead();
+        Sequence sequence = getHead();
         while (sequence != null ) {
             if (sequence.range() == count ) {
                 sequence.unlink();
-                size--;
                 return sequence;
             }
             if (sequence.range() > count ) {
                 Sequence rc = new Sequence(sequence.first, sequence.first+count);
                 sequence.first+=count;
-                size--;
                 return rc;
             }
             sequence = sequence.getNext();
@@ -200,13 +198,13 @@
         if (first > last) {
             throw new IllegalArgumentException("First cannot be more than last");
         }
-        if (sequences.isEmpty()) {
+        if (isEmpty()) {
             // We are missing all the messages.
             rc.add(new Sequence(first, last));
             return rc;
         }
 
-        Sequence sequence = sequences.getHead();
+        Sequence sequence = getHead();
         while (sequence != null && first <= last) {
             if (sequence.contains(first)) {
                 first = sequence.last + 1;
@@ -234,29 +232,13 @@
      * @return all the Sequence that are in this list
      */
     public List<Sequence> getReceived() {
-        ArrayList<Sequence> rc = new ArrayList<Sequence>(sequences.size());
-        Sequence sequence = sequences.getHead();
+        ArrayList<Sequence> rc = new ArrayList<Sequence>(size());
+        Sequence sequence = getHead();
         while (sequence != null) {
             rc.add(new Sequence(sequence.first, sequence.last));
             sequence = sequence.getNext();
         }
         return rc;
     }
-
-    public boolean isEmpty() {
-        return sequences.isEmpty();
-    }
-
-    public long size() {
-        return size;
-    }
-
-    public void clear() {
-        sequences = new LinkedNodeList<Sequence>();
-    }
-    
-    @Override
-    public String toString() {
-        return sequences.toString();
-    }
+   
 }
\ No newline at end of file

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java?rev=694320&r1=694319&r2=694320&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java Thu Sep 11 08:56:07 2008
@@ -29,33 +29,33 @@
 import junit.framework.TestCase;
 
 public class PageFileTest extends TestCase {
-    
+
     public void testCRUD() throws IOException {
-        
+
         PageFile pf = new PageFile(new File("target/test-data"), getName());
         pf.delete();
         pf.load();
-        
+
         HashSet<String> expected = new HashSet<String>();
-        
+
         // Insert some data into the page file.
         Transaction tx = pf.tx();
-        for( int i=0 ; i < 100; i++) {
+        for (int i = 0; i < 100; i++) {
             Page<String> page = tx.allocate();
             assertEquals(Page.PAGE_FREE_TYPE, page.getType());
-            
-            String t = "page:"+i;
+
+            String t = "page:" + i;
             expected.add(t);
             page.set(t);
             tx.store(page, StringMarshaller.INSTANCE, false);
             tx.commit();
         }
-        
+
         // Reload it...
         pf.unload();
         pf.load();
         tx = pf.tx();
-        
+
         // Iterate it to make sure they are still there..
         HashSet<String> actual = new HashSet<String>();
         for (Page<String> page : tx) {
@@ -63,28 +63,28 @@
             actual.add(page.get());
         }
         assertEquals(expected, actual);
-        
+
         // Remove the odd records..
-        for( int i=0 ; i < 100; i++) {
-            if( i%2 == 0 ) {
+        for (int i = 0; i < 100; i++) {
+            if (i % 2 == 0) {
                 break;
             }
-            String t = "page:"+i;
+            String t = "page:" + i;
             expected.remove(t);
         }
         for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            if( !expected.contains(page.get()) ) {
+            if (!expected.contains(page.get())) {
                 tx.free(page);
             }
         }
         tx.commit();
-        
+
         // Reload it...
         pf.unload();
         pf.load();
         tx = pf.tx();
-        
+
         // Iterate it to make sure the even records are still there..
         actual.clear();
         for (Page<String> page : tx) {
@@ -97,15 +97,15 @@
         HashSet<String> t = expected;
         expected = new HashSet<String>();
         for (String s : t) {
-            expected.add(s+":updated");
+            expected.add(s + ":updated");
         }
         for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            page.set(page.get()+":updated");
+            page.set(page.get() + ":updated");
             tx.store(page, StringMarshaller.INSTANCE, false);
         }
         tx.commit();
-        
+
         // Reload it...
         pf.unload();
         pf.load();
@@ -118,38 +118,38 @@
             actual.add(page.get());
         }
         assertEquals(expected, actual);
-        
+
         pf.unload();
     }
-    
+
     public void testStreams() throws IOException {
-        
+
         PageFile pf = new PageFile(new File("target/test-data"), getName());
         pf.delete();
         pf.load();
-        
+
         Transaction tx = pf.tx();
         Page page = tx.allocate();
         tx.commit();
-        
+
         OutputStream pos = tx.openOutputStream(page, true);
         DataOutputStream os = new DataOutputStream(pos);
-        for( int i=0; i < 10000; i++) {
-            os.writeUTF("Test string:"+i);
+        for (int i = 0; i < 10000; i++) {
+            os.writeUTF("Test string:" + i);
         }
-        
+
         os.close();
         tx.commit();
-        
+
         // Reload the page file.
         pf.unload();
         pf.load();
         tx = pf.tx();
-        
+
         InputStream pis = tx.openInputStream(page);
         DataInputStream is = new DataInputStream(pis);
-        for( int i=0; i < 10000; i++) {
-            assertEquals("Test string:"+i, is.readUTF());
+        for (int i = 0; i < 10000; i++) {
+            assertEquals("Test string:" + i, is.readUTF());
         }
         assertEquals(-1, is.read());
         is.close();
@@ -157,4 +157,45 @@
         pf.unload();
     }
 
+    public void testAddRollback() throws IOException {
+
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+
+        HashSet<String> expected = new HashSet<String>();
+
+        // Insert some data into the page file.
+        Transaction tx = pf.tx();
+        for (int i = 0; i < 100; i++) {
+            Page<String> page = tx.allocate();
+            assertEquals(Page.PAGE_FREE_TYPE, page.getType());
+
+            String t = "page:" + i;
+            page.set(t);
+            tx.store(page, StringMarshaller.INSTANCE, false);
+
+            // Rollback every other insert.
+            if (i % 2 == 0) {
+                expected.add(t);
+                tx.commit();
+            } else {
+                tx.rollback();
+            }
+
+        }
+
+        // Reload it...
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+
+        // Iterate it to make sure they are still there..
+        HashSet<String> actual = new HashSet<String>();
+        for (Page<String> page : tx) {
+            tx.load(page, StringMarshaller.INSTANCE);
+            actual.add(page.get());
+        }
+        assertEquals(expected, actual);
+    }
 }



Mime
View raw message