activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r693109 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page: PageFile.java Transaction.java
Date Mon, 08 Sep 2008 14:25:28 GMT
Author: chirino
Date: Mon Sep  8 07:25:27 2008
New Revision: 693109

URL: http://svn.apache.org/viewvc?rev=693109&view=rev
Log:
Refactored the PageFile/Transaction stuff a bit to make it easier to maintian.  At this time an interface is not needed for Transaction.

Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=693109&r1=693108&r2=693109&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Sep  8 07:25:27 2008
@@ -20,21 +20,17 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.InterruptedIOException;
-import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
@@ -44,14 +40,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.kahadb.Marshaller;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
-import org.apache.kahadb.util.Sequence;
 import org.apache.kahadb.util.SequenceSet;
 
 /**
@@ -78,32 +69,32 @@
     
     private final String name;
     private File directory;
-    private RandomAccessFile readFile;
+    RandomAccessFile readFile;
     private RandomAccessFile writeFile;
 
-    private int pageSize = DEFAULT_PAGE_SIZE;
+    int pageSize = DEFAULT_PAGE_SIZE;
     private int recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER;
     private int initialPageOffset;
 
-    private long nextFreePageId;
+    long nextFreePageId;
     
-    private SequenceSet freeList = new SequenceSet();
-    private AtomicBoolean loaded = new AtomicBoolean();
+    SequenceSet freeList = new SequenceSet();
+    AtomicBoolean loaded = new AtomicBoolean();
     private LRUCache<Long, Page> pageCache;
     
     private boolean enableRecoveryBuffer=false;
     private boolean enableSyncedWrites=false;
     private boolean enablePageCaching=true;
-    private boolean enableAsyncWrites=false;
+    boolean enableAsyncWrites=false;
     
     private int pageCacheSize = 100;
     
-    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
+    TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
     private Thread writerThread;
     AtomicBoolean stopWriter = new AtomicBoolean();
     private CountDownLatch checkpointLatch;
 
-    private AtomicLong nextTxid = new AtomicLong();
+    AtomicLong nextTxid = new AtomicLong();
     private MetaData metaData;
     
     /**
@@ -167,7 +158,7 @@
     /**
      * Internally used by the double write buffer implementation used in this class. 
      */
-    private class PageWrite<T> {
+    class PageWrite<T> {
         Page<T> page;
         byte[] current;
         byte[] diskBound;
@@ -201,567 +192,9 @@
 
     }
 
-    /**
-     * Provides transaction update access to the PageFile.  All operations that modify 
-     * the PageFile are done via a Transaction.
-     */
-    class PageFileTransaction implements Transaction {
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#getPageFile()
-         */
-        public PageFile getPageFile() {
-            return PageFile.this;
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#allocate()
-         */
-        public <T> Page<T> allocate() throws IOException {
-            return allocate(1);
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#allocate(int)
-         */
-        public <T> Page<T> allocate(int count) throws IOException {
-            assertLoaded();
-            if ( count <= 0 ) {
-                throw new IllegalArgumentException("The allocation count must be larger than zero");
-            }
-            
-            Sequence seq = freeList.removeFirstSequence(count);
-
-            // We may need to create new free pages...
-            if(seq==null) {
-                
-                Page<T> first=null;                
-                int c=count;
-                while( c > 0 ) {
-                    Page<T> page = new Page<T>(nextFreePageId ++);
-                    page.makeFree(nextTxid.get());
-                    
-                    if( first == null ) {
-                        first = page;
-                    }
-                    
-                    addToCache(page);
-                    DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
-                    page.write(out);
-                    write(page, out.getData());
-
-//                    LOG.debug("allocate writing: "+page.getPageId());
-                    c--;
-                }
-                
-                return first;
-            }
-            
-            Page<T> page = new Page<T>(seq.getFirst());
-            page.makeFree(0);
-//            LOG.debug("allocated: "+page.getPageId());
-            return page;
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#free(long)
-         */
-        public void free(long pageId) throws IOException {
-            free(load(pageId, null));
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#free(long, int)
-         */
-        public void free(long pageId, int count) throws IOException {
-            free(load(pageId, null), count);
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page, int)
-         */
-        public <T> void free(Page<T> page, int count) throws IOException {
-            assertLoaded();
-            long offsetPage=page.getPageId();
-            for (int i = 0; i < count; i++) {
-                if( page == null ) {
-                    page=load(offsetPage+i, null);
-                }
-                free(page);
-                page=null;
-            }
-        }
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page)
-         */
-        public <T> void free(Page<T> page) throws IOException {
-            assertLoaded();
-            
-            // We may need loop to free up a page chain.
-            while(page!=null){
-                
-                // Is it already free??
-                if( page.getType() == Page.PAGE_FREE_TYPE ) {
-                    return;
-                }
-
-                Page<T> next = null;
-                if( page.getType()==Page.PAGE_PART_TYPE ) {
-                    next = load(page.getNext(), null);
-                }
-
-                page.makeFree(nextTxid.get());
-                
-                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
-                page.write(out);
-                write(page, out.getData());
-                
-                removeFromCache(page);
-                freeList.add(page.getPageId());
-                page = next;
-            }
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#store(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller, boolean)
-         */
-        public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
-            DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
-            if( marshaller!=null ) {
-                marshaller.writePayload(page.get(), out);
-            }
-            out.close();
-        }
-
-        /**
-         * @throws IOException 
-         */
-        public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
-            assertLoaded();
-            
-            // Copy to protect against the end user changing 
-            // the page instance while we are doing a write.
-            final Page copy = page.copy();
-            addToCache(copy);
-
-            //
-            // To support writing VERY large data, we override the output stream so that we 
-            // we do the page writes incrementally while the data is being marshalled.
-            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize*2) {
-                Page current = copy;
-
-                @SuppressWarnings("unchecked")
-                @Override
-                protected void onWrite() throws IOException {
-                    
-                    // Are we at an overflow condition?
-                    if( pos >= pageSize ) {
-                        // If overflow is allowed
-                        if( overflow ) {
-
-                            Page next;
-                            if( current.getType() == Page.PAGE_PART_TYPE ) {
-                                next = load(current.getNext(), null);
-                            } else {
-                                next = allocate();
-                            }
-
-                            next.txId = current.txId;
-
-                            // Write the page header
-                            int oldPos = pos;
-                            pos = 0;
-                            
-                            current.makePagePart(next.getPageId(), nextTxid.get());
-                            current.write(this);
-
-                            // Do the page write..
-                            byte [] data = new byte[pageSize];
-                            System.arraycopy(buf, 0, data, 0, pageSize);
-                            PageFileTransaction.this.write(current, data);
-                            
-                            // Reset for the next page chunk
-                            pos = 0;
-                            // The page header marshalled after the data is written.
-                            skip(Page.PAGE_HEADER_SIZE);
-                            // Move the overflow data after the header.
-                            System.arraycopy(buf, pageSize, buf, pos, oldPos-pageSize);
-                            pos += oldPos-pageSize;
-                            current = next;
-                            
-                        } else {
-                            throw new PageOverflowIOException("Page overflow.");
-                        }
-                    }
-                    
-                }
-                
-                @SuppressWarnings("unchecked")
-                @Override
-                public void close() throws IOException {
-                    super.close();
-
-                    // We need to free up the rest of the page chain..
-                    if( current.getType() == Page.PAGE_PART_TYPE ) {
-                        free(current.getNext());
-                    }
-                    
-                    current.makePageEnd(pos, nextTxid.get());
-
-                    // Write the header..
-                    pos = 0;
-                    current.write(this);
-                    
-                    PageFileTransaction.this.write(current, buf);
-                }
-            };
-            
-            // The page header marshaled after the data is written.
-            out.skip(Page.PAGE_HEADER_SIZE);
-            return out;
-        }
-        
-        /**
-         * @param page
-         * @param data
-         * @throws IOException
-         */
-        @SuppressWarnings("unchecked")
-        private <T> void write(final Page<T> page, byte[] data) throws IOException {
-            Long key = page.getPageId();
-            synchronized( writes ) {
-                // If it's not in the write cache...
-                PageWrite<T> write = writes.get(key);
-                if( write==null ) {
-                    write = new PageWrite<T>(page, data);
-                    writes.put(key, write);
-                } else {
-                    write.setCurrent(page, data);
-                }
-                
-                // Once we start approaching capacity, notify the writer to start writing
-                if( canStartWriteBatch() ) {
-                    if( enableAsyncWrites  ) {
-                        writes.notify();
-                    } else {
-                        while( canStartWriteBatch() ) {
-                            writeBatch(-1, TimeUnit.MILLISECONDS);
-                        }
-                    }
-                }
-            }            
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#load(long, org.apache.kahadb.Marshaller)
-         */
-        public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
-            assertLoaded();
-            Page<T> page = new Page<T>(pageId);
-            load(page, marshaller);
-            return page;
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
-         */
-        public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
-            assertLoaded();
-
-            // Can't load invalid offsets...
-            if (page.getPageId() < 0) {
-                throw new Transaction.InvalidPageIOException("Page id is not valid", page.getPageId());
-            }        
-
-            // Try to load it from the cache first...
-            Page<T> t = getFromCache(page.getPageId());
-            if (t != null) {
-                page.copy(t);
-                return;
-            }
-            
-            if( marshaller!=null ) {
-                // Full page read..
-                InputStream is = openInputStream(page);
-                DataInputStream dataIn = new DataInputStream(is);
-                page.set(marshaller.readPayload(dataIn));
-                is.close();
-            } else {
-                // Page header read.
-                DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
-                readFile.seek(toOffset(page.getPageId()));
-                readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE);
-                page.read(in);
-                page.set(null);
-            }
-            
-            // Cache it.
-            if( marshaller!=null ) {
-                addToCache(page);
-            }
-        }
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
-         */
-        public InputStream openInputStream(final Page p) throws IOException {
-            
-            return new InputStream() {
-
-                private ByteSequence chunk = new ByteSequence(new byte[pageSize]);
-                private Page page = readPage(p);
-                private int pageCount=1; 
-                
-                private Page markPage;
-                private ByteSequence markChunk;
-                
-                private Page readPage(Page page) throws IOException {
-                    // Read the page data
-                    readFile.seek(toOffset(page.getPageId()));
-                    readFile.readFully(chunk.getData(), 0, pageSize);
-                    chunk.setOffset(0);
-                    chunk.setLength(pageSize);
-                    
-                    DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
-                    page.read(in);
-                    
-                    chunk.setOffset(Page.PAGE_HEADER_SIZE);
-                    if( page.getType() == Page.PAGE_END_TYPE ) {
-                        chunk.setLength((int)(page.getNext()));
-                    }
-                    
-                    if( page.getType() == Page.PAGE_FREE_TYPE ) {
-                        throw new EOFException("Chunk stream does not exist at page: "+page.getPageId());
-                    }
-                    
-                    return page;
-                }
-                
-                public int read() throws IOException {
-                    if (!atEOF()) {
-                        return chunk.data[chunk.offset++] & 0xff;
-                    } else {
-                        return -1;
-                    }
-                }
-
-                private boolean atEOF() throws IOException {
-                    if( chunk.offset < chunk.length ) {
-                      return false;  
-                    }
-                    if( page.getType() == Page.PAGE_END_TYPE ) {
-                        return true;
-                    }
-                    fill();
-                    return chunk.offset >= chunk.length;
-                }
-
-                private void fill() throws IOException {
-                    page = readPage(new Page(page.getNext()));
-                    pageCount++;
-                }
-                
-                public int read(byte[] b) throws IOException {
-                    return read(b, 0, b.length);
-                }
-
-                public int read(byte b[], int off, int len) throws IOException {
-                    if (!atEOF()) {
-                        int rc=0;
-                        while(!atEOF() && rc < len) {
-                            len = Math.min(len, chunk.length - chunk.offset);
-                            if (len > 0) {
-                                System.arraycopy(chunk.data, chunk.offset, b, off, len);
-                                chunk.offset += len;
-                            }
-                            rc+=len;
-                        }
-                        return rc;
-                    } else {
-                        return -1;
-                    }
-                }
-
-                public long skip(long len) throws IOException {
-                    if (atEOF()) {
-                        int rc=0;
-                        while(!atEOF() && rc < len) {
-                            len = Math.min(len, chunk.length - chunk.offset);
-                            if (len > 0) {
-                                chunk.offset += len;
-                            }
-                            rc+=len;
-                        }
-                        return rc;
-                    } else {
-                        return -1;
-                    }
-                }
-
-                public int available() {
-                    return chunk.length - chunk.offset;
-                }
-
-                public boolean markSupported() {
-                    return true;
-                }
-
-                public void mark(int markpos) {
-                    markPage = page;
-                    byte data[] = new byte[pageSize];
-                    System.arraycopy(chunk.getData(), 0, data, 0, pageSize);
-                    markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
-                }
-
-                public void reset() {
-                    page = markPage;
-                    chunk = markChunk;
-                }
-
-            };
-        }
-
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#iterator()
-         */
-        @SuppressWarnings("unchecked")
-        public Iterator<Page> iterator() {
-            return (Iterator<Page>)iterator(false);
-        }
-        
-        /**
-         * @see org.apache.kahadb.page.Transaction#iterator(boolean)
-         */
-        public Iterator<Page> iterator(final boolean includeFreePages) {
-            
-            assertLoaded();
-
-            return new Iterator<Page>() {
-                long nextId;
-                Page nextPage;
-                Page lastPage;
-                
-                private void findNextPage() {
-                    if( !loaded.get() ) {
-                        throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
-                    }
-
-                    if( nextPage!=null ) {
-                        return;
-                    }
-                    
-                    try {
-                        while( nextId < PageFile.this.nextFreePageId ) {
-                            
-                            Page page = load(nextId, null);
-                            
-                            if( includeFreePages || page.getType()!=Page.PAGE_FREE_TYPE ) {
-                                nextPage = page;
-                                return;
-                            } else {
-                                nextId++;
-                            }
-                        }
-                    } catch (IOException e) {
-                    }
-                }
-
-                public boolean hasNext() {
-                    findNextPage();
-                    return nextPage !=null;
-                }
-
-                public Page next() {
-                    findNextPage(); 
-                    if( nextPage !=null ) {
-                        lastPage = nextPage;
-                        nextPage=null;
-                        nextId++;
-                        return lastPage;
-                    } else {
-                        throw new NoSuchElementException();
-                    }
-                }
-                
-                public void remove() {
-                    if( lastPage==null ) {
-                        throw new IllegalStateException();
-                    }
-                    try {
-                        free(lastPage);
-                        lastPage=null;
-                    } catch (IOException e) {
-                        new RuntimeException(e);
-                    }
-                }
-            };
-        }
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#commit()
-         */
-        public void commit() throws IOException {
-        }
-        
-        /**
-         * Rolls back the transaction.
-         */
-        private void rollback() throws IOException {
-        }
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.Closure)
-         */
-        public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
-            boolean success=false;
-            try {
-                closure.execute(this);
-                success=true;
-            } finally {
-                if( success ) {
-                    commit();
-                } else {
-                    rollback();
-                }
-            }
-        }
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.CallableClosure)
-         */
-        public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
-            boolean success=false;
-            try {
-                R rc = closure.execute(this);
-                success=true;
-                return rc;
-            } finally {
-                if( success ) {
-                    commit();
-                } else {
-                    rollback();
-                }
-            }
-        }
-
-        /**
-         * @see org.apache.kahadb.page.Transaction#isReadOnly()
-         */
-        public boolean isReadOnly() {
-            return false;
-        }
-
-        public long getPageCount() {
-            return nextFreePageId;
-        }
-
-    }
-    
     public Transaction tx() {
         assertLoaded();
-        return new PageFileTransaction();
+        return new Transaction(this);
     }
     
     /**
@@ -977,7 +410,7 @@
     // Internal Double write implementation follows...
     ///////////////////////////////////////////////////////////////////
         
-    private boolean canStartWriteBatch() {
+    boolean canStartWriteBatch() {
         int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER);
         
         if( enableAsyncWrites ) {
@@ -999,7 +432,7 @@
      * @throws InterruptedException 
      * @throws IOException 
      */
-    private boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
+    boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
                 
         int batchLength=8+4; // Account for the:  lastTxid + recovery record counter
         ArrayList<PageWrite> batch = new ArrayList<PageWrite>(MAX_PAGES_IN_RECOVERY_BUFFER);
@@ -1212,14 +645,14 @@
         return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
     } 
     
-    private long toOffset(long pageId) {
+    long toOffset(long pageId) {
         return initialPageOffset+(pageId*pageSize);
     }
 
     /**
      * @throws IllegalStateException if the page file is not loaded.
      */
-    private void assertLoaded() throws IllegalStateException {
+    void assertLoaded() throws IllegalStateException {
         if( !loaded.get() ) {
             throw new IllegalStateException("PageFile is not loaded");
         }
@@ -1229,8 +662,7 @@
     // Internal Cache Related operations
     ///////////////////////////////////////////////////////////////////
     
-    @SuppressWarnings("unchecked")
-    private <T> Page<T> getFromCache(long pageId) {
+    @SuppressWarnings("unchecked") <T> Page<T> getFromCache(long pageId) {
         synchronized(writes) {
             PageWrite<T> pageWrite = writes.get(pageId);
             if( pageWrite != null ) {
@@ -1245,13 +677,13 @@
         return result;
     }
 
-    private void addToCache(Page page) {
+    void addToCache(Page page) {
         if (enablePageCaching) {
             pageCache.put(page.getPageId(), page);
         }
     }
 
-    private void removeFromCache(Page page) {
+    void removeFromCache(Page page) {
         if (enablePageCaching) {
             pageCache.remove(page.getPageId());
         }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=693109&r1=693108&r2=693109&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Mon Sep  8 07:25:27 2008
@@ -16,18 +16,27 @@
  */
 package org.apache.kahadb.page;
 
+import java.io.DataInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.page.PageFile.PageWrite;
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.DataByteArrayInputStream;
+import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.Sequence;
 
 /**
- * The interface used to read/update a PageFile object.  Using a transaction allows you to
+ * The class used to read/update a PageFile object.  Using a transaction allows you to
  * do multiple update operations in a single unit of work.
  */
-public interface Transaction extends Iterable<Page> {
+public class Transaction implements Iterable<Page> {
     
     /**
      * 
@@ -70,7 +79,22 @@
         public R execute(Transaction tx) throws T;
     }
 
-    public PageFile getPageFile();
+
+    private final PageFile pageFile;
+
+    /**
+     * @param pageFile
+     */
+    Transaction(PageFile pageFile) {
+        this.pageFile = pageFile;
+    }
+
+    /**
+     * @see org.apache.kahadb.page.Transaction#getPageFile()
+     */
+    public PageFile getPageFile() {
+        return this.pageFile;
+    }
 
     /** 
      * Allocates a free page that you can write data to.
@@ -81,7 +105,9 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> Page<T> allocate() throws IOException;
+    public <T> Page<T> allocate() throws IOException {
+        return allocate(1);
+    }
 
     /** 
      * Allocates a block of free pages that you can write data to.
@@ -93,7 +119,44 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> Page<T> allocate(int count) throws IOException;
+    public <T> Page<T> allocate(int count) throws IOException {
+        this.pageFile.assertLoaded();
+        if (count <= 0) {
+            throw new IllegalArgumentException("The allocation count must be larger than zero");
+        }
+
+        Sequence seq = this.pageFile.freeList.removeFirstSequence(count);
+
+        // We may need to create new free pages...
+        if (seq == null) {
+
+            Page<T> first = null;
+            int c = count;
+            while (c > 0) {
+                Page<T> page = new Page<T>(this.pageFile.nextFreePageId++);
+                page.makeFree(this.pageFile.nextTxid.get());
+
+                if (first == null) {
+                    first = page;
+                }
+
+                this.pageFile.addToCache(page);
+                DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize);
+                page.write(out);
+                write(page, out.getData());
+
+                // LOG.debug("allocate writing: "+page.getPageId());
+                c--;
+            }
+
+            return first;
+        }
+
+        Page<T> page = new Page<T>(seq.getFirst());
+        page.makeFree(0);
+        // LOG.debug("allocated: "+page.getPageId());
+        return page;
+    }
 
     /**
      * Frees up a previously allocated page so that it can be re-allocated again.
@@ -104,18 +167,24 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> void free(Page<T> page) throws IOException;
+    public void free(long pageId) throws IOException {
+        free(load(pageId, null));
+    }
 
     /**
-     * Frees up a previously allocated page so that it can be re-allocated again.
+     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
+     * 
+     * @param page the initial page of the sequence that will be getting freed
+     * @param count the number of pages in the sequence
      * 
-     * @param page the page to free up
      * @throws IOException
      *         If an disk error occurred.
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public void free(long pageId) throws IOException;
+    public void free(long pageId, int count) throws IOException {
+        free(load(pageId, null), count);
+    }
 
     /**
      * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
@@ -128,20 +197,54 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> void free(Page<T> page, int count) throws IOException;
+    public <T> void free(Page<T> page, int count) throws IOException {
+        this.pageFile.assertLoaded();
+        long offsetPage = page.getPageId();
+        for (int i = 0; i < count; i++) {
+            if (page == null) {
+                page = load(offsetPage + i, null);
+            }
+            free(page);
+            page = null;
+        }
+    }
 
     /**
-     * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
-     * 
-     * @param page the initial page of the sequence that will be getting freed
-     * @param count the number of pages in the sequence
+     * Frees up a previously allocated page so that it can be re-allocated again.
      * 
+     * @param page the page to free up
      * @throws IOException
      *         If an disk error occurred.
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public void free(long pageId, int count) throws IOException;
+    public <T> void free(Page<T> page) throws IOException {
+        this.pageFile.assertLoaded();
+
+        // We may need loop to free up a page chain.
+        while (page != null) {
+
+            // Is it already free??
+            if (page.getType() == Page.PAGE_FREE_TYPE) {
+                return;
+            }
+
+            Page<T> next = null;
+            if (page.getType() == Page.PAGE_PART_TYPE) {
+                next = load(page.getNext(), null);
+            }
+
+            page.makeFree(this.pageFile.nextTxid.get());
+
+            DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize);
+            page.write(out);
+            write(page, out.getData());
+
+            this.pageFile.removeFromCache(page);
+            this.pageFile.freeList.add(page.getPageId());
+            page = next;
+        }
+    }
 
     /**
      * 
@@ -160,7 +263,135 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> void store(Page<T> page, Marshaller<T> marshaller, boolean overflow) throws IOException, PageOverflowIOException;
+    public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
+        DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
+        if (marshaller != null) {
+            marshaller.writePayload(page.get(), out);
+        }
+        out.close();
+    }
+
+    /**
+     * @throws IOException
+     */
+    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
+        this.pageFile.assertLoaded();
+
+        // Copy to protect against the end user changing
+        // the page instance while we are doing a write.
+        final Page copy = page.copy();
+        this.pageFile.addToCache(copy);
+
+        //
+        // To support writing VERY large data, we override the output stream so
+        // that we
+        // we do the page writes incrementally while the data is being
+        // marshalled.
+        DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize * 2) {
+            Page current = copy;
+
+            @SuppressWarnings("unchecked")
+            @Override
+            protected void onWrite() throws IOException {
+
+                // Are we at an overflow condition?
+                if (pos >= Transaction.this.pageFile.pageSize) {
+                    // If overflow is allowed
+                    if (overflow) {
+
+                        Page next;
+                        if (current.getType() == Page.PAGE_PART_TYPE) {
+                            next = load(current.getNext(), null);
+                        } else {
+                            next = allocate();
+                        }
+
+                        next.txId = current.txId;
+
+                        // Write the page header
+                        int oldPos = pos;
+                        pos = 0;
+
+                        current.makePagePart(next.getPageId(), Transaction.this.pageFile.nextTxid.get());
+                        current.write(this);
+
+                        // Do the page write..
+                        byte[] data = new byte[Transaction.this.pageFile.pageSize];
+                        System.arraycopy(buf, 0, data, 0, Transaction.this.pageFile.pageSize);
+                        Transaction.this.write(current, data);
+
+                        // Reset for the next page chunk
+                        pos = 0;
+                        // The page header marshalled after the data is written.
+                        skip(Page.PAGE_HEADER_SIZE);
+                        // Move the overflow data after the header.
+                        System.arraycopy(buf, Transaction.this.pageFile.pageSize, buf, pos, oldPos - Transaction.this.pageFile.pageSize);
+                        pos += oldPos - Transaction.this.pageFile.pageSize;
+                        current = next;
+
+                    } else {
+                        throw new PageOverflowIOException("Page overflow.");
+                    }
+                }
+
+            }
+
+            @SuppressWarnings("unchecked")
+            @Override
+            public void close() throws IOException {
+                super.close();
+
+                // We need to free up the rest of the page chain..
+                if (current.getType() == Page.PAGE_PART_TYPE) {
+                    free(current.getNext());
+                }
+
+                current.makePageEnd(pos, Transaction.this.pageFile.nextTxid.get());
+
+                // Write the header..
+                pos = 0;
+                current.write(this);
+
+                Transaction.this.write(current, buf);
+            }
+        };
+
+        // The page header marshaled after the data is written.
+        out.skip(Page.PAGE_HEADER_SIZE);
+        return out;
+    }
+
+    /**
+     * @param page
+     * @param data
+     * @throws IOException
+     */
+    @SuppressWarnings("unchecked")
+    private <T> void write(final Page<T> page, byte[] data) throws IOException {
+        Long key = page.getPageId();
+        synchronized (this.pageFile.writes) {
+            // If it's not in the write cache...
+            PageWrite<T> write = this.pageFile.writes.get(key);
+            if (write == null) {
+                write = this.pageFile.new PageWrite<T>(page, data);
+                this.pageFile.writes.put(key, write);
+            } else {
+                write.setCurrent(page, data);
+            }
+
+            // Once we start approaching capacity, notify the writer to start
+            // writing
+            if (this.pageFile.canStartWriteBatch()) {
+                if (this.pageFile.enableAsyncWrites) {
+                    this.pageFile.writes.notify();
+                } else {
+                    while (this.pageFile.canStartWriteBatch()) {
+                        this.pageFile.writeBatch(-1, TimeUnit.MILLISECONDS);
+                    }
+                }
+            }
+        }
+    }
 
     /**
      * Loads a page from disk.
@@ -175,7 +406,12 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException;
+    public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
+        this.pageFile.assertLoaded();
+        Page<T> page = new Page<T>(pageId);
+        load(page, marshaller);
+        return page;
+    }
 
     /**
      * Loads a page from disk.  If the page.pageId is not valid then then this method will set the page.type to
@@ -189,31 +425,163 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException;
+    public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
+        this.pageFile.assertLoaded();
+
+        // Can't load invalid offsets...
+        if (page.getPageId() < 0) {
+            throw new InvalidPageIOException("Page id is not valid", page.getPageId());
+        }
+
+        // Try to load it from the cache first...
+        Page<T> t = this.pageFile.getFromCache(page.getPageId());
+        if (t != null) {
+            page.copy(t);
+            return;
+        }
+
+        if (marshaller != null) {
+            // Full page read..
+            InputStream is = openInputStream(page);
+            DataInputStream dataIn = new DataInputStream(is);
+            page.set(marshaller.readPayload(dataIn));
+            is.close();
+        } else {
+            // Page header read.
+            DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
+            this.pageFile.readFile.seek(this.pageFile.toOffset(page.getPageId()));
+            this.pageFile.readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE);
+            page.read(in);
+            page.set(null);
+        }
+
+        // Cache it.
+        if (marshaller != null) {
+            this.pageFile.addToCache(page);
+        }
+    }
 
-    
-    /**
-     * 
-     * @param page
-     * @param overflow
-     * @return
-     * @throws IOException
-     */
-    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException;
-    
-    /**
-     * 
-     * @param p
-     * @return
-     * @throws IOException
-     */
-    public InputStream openInputStream(final Page p) throws IOException;
-    
     /**
-     * @return the number of pages allocated in the PageFile
+     * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
+     *      org.apache.kahadb.Marshaller)
      */
-    public long getPageCount();
-    
+    public InputStream openInputStream(final Page p) throws IOException {
+
+        return new InputStream() {
+
+            private ByteSequence chunk = new ByteSequence(new byte[Transaction.this.pageFile.pageSize]);
+            private Page page = readPage(p);
+            private int pageCount = 1;
+
+            private Page markPage;
+            private ByteSequence markChunk;
+
+            private Page readPage(Page page) throws IOException {
+                // Read the page data
+                Transaction.this.pageFile.readFile.seek(Transaction.this.pageFile.toOffset(page.getPageId()));
+                Transaction.this.pageFile.readFile.readFully(chunk.getData(), 0, Transaction.this.pageFile.pageSize);
+                chunk.setOffset(0);
+                chunk.setLength(Transaction.this.pageFile.pageSize);
+
+                DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
+                page.read(in);
+
+                chunk.setOffset(Page.PAGE_HEADER_SIZE);
+                if (page.getType() == Page.PAGE_END_TYPE) {
+                    chunk.setLength((int)(page.getNext()));
+                }
+
+                if (page.getType() == Page.PAGE_FREE_TYPE) {
+                    throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
+                }
+
+                return page;
+            }
+
+            public int read() throws IOException {
+                if (!atEOF()) {
+                    return chunk.data[chunk.offset++] & 0xff;
+                } else {
+                    return -1;
+                }
+            }
+
+            private boolean atEOF() throws IOException {
+                if (chunk.offset < chunk.length) {
+                    return false;
+                }
+                if (page.getType() == Page.PAGE_END_TYPE) {
+                    return true;
+                }
+                fill();
+                return chunk.offset >= chunk.length;
+            }
+
+            private void fill() throws IOException {
+                page = readPage(new Page(page.getNext()));
+                pageCount++;
+            }
+
+            public int read(byte[] b) throws IOException {
+                return read(b, 0, b.length);
+            }
+
+            public int read(byte b[], int off, int len) throws IOException {
+                if (!atEOF()) {
+                    int rc = 0;
+                    while (!atEOF() && rc < len) {
+                        len = Math.min(len, chunk.length - chunk.offset);
+                        if (len > 0) {
+                            System.arraycopy(chunk.data, chunk.offset, b, off, len);
+                            chunk.offset += len;
+                        }
+                        rc += len;
+                    }
+                    return rc;
+                } else {
+                    return -1;
+                }
+            }
+
+            public long skip(long len) throws IOException {
+                if (atEOF()) {
+                    int rc = 0;
+                    while (!atEOF() && rc < len) {
+                        len = Math.min(len, chunk.length - chunk.offset);
+                        if (len > 0) {
+                            chunk.offset += len;
+                        }
+                        rc += len;
+                    }
+                    return rc;
+                } else {
+                    return -1;
+                }
+            }
+
+            public int available() {
+                return chunk.length - chunk.offset;
+            }
+
+            public boolean markSupported() {
+                return true;
+            }
+
+            public void mark(int markpos) {
+                markPage = page;
+                byte data[] = new byte[Transaction.this.pageFile.pageSize];
+                System.arraycopy(chunk.getData(), 0, data, 0, Transaction.this.pageFile.pageSize);
+                markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
+            }
+
+            public void reset() {
+                page = markPage;
+                chunk = markChunk;
+            }
+
+        };
+    }
+
     /**
      * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are 
      * not included in this iteration. 
@@ -223,7 +591,10 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public Iterator<Page> iterator();
+    @SuppressWarnings("unchecked")
+    public Iterator<Page> iterator() {
+        return (Iterator<Page>)iterator(false);
+    }
 
     /**
      * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
@@ -234,13 +605,83 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public <T> Iterator<Page<T>> iterator(final boolean includeFreePages);
+    public Iterator<Page> iterator(final boolean includeFreePages) {
+
+        this.pageFile.assertLoaded();
+
+        return new Iterator<Page>() {
+            long nextId;
+            Page nextPage;
+            Page lastPage;
+
+            private void findNextPage() {
+                if (!Transaction.this.pageFile.loaded.get()) {
+                    throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
+                }
+
+                if (nextPage != null) {
+                    return;
+                }
+
+                try {
+                    while (nextId < Transaction.this.pageFile.nextFreePageId) {
+
+                        Page page = load(nextId, null);
+
+                        if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
+                            nextPage = page;
+                            return;
+                        } else {
+                            nextId++;
+                        }
+                    }
+                } catch (IOException e) {
+                }
+            }
+
+            public boolean hasNext() {
+                findNextPage();
+                return nextPage != null;
+            }
+
+            public Page next() {
+                findNextPage();
+                if (nextPage != null) {
+                    lastPage = nextPage;
+                    nextPage = null;
+                    nextId++;
+                    return lastPage;
+                } else {
+                    throw new NoSuchElementException();
+                }
+            }
+
+            public void remove() {
+                if (lastPage == null) {
+                    throw new IllegalStateException();
+                }
+                try {
+                    free(lastPage);
+                    lastPage = null;
+                } catch (IOException e) {
+                    new RuntimeException(e);
+                }
+            }
+        };
+    }
 
     /**
      * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
      * with the transaction are written to disk or none will.
      */
-    public void commit() throws IOException;
+    public void commit() throws IOException {
+    }
+
+    /**
+     * Rolls back the transaction.
+     */
+    private void rollback() throws IOException {
+    }
 
     /**
      * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
@@ -251,7 +692,19 @@
      * @throws T if the closure throws it
      * @throws IOException If the commit fails.
      */
-    public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException;
+    public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
+        boolean success = false;
+        try {
+            closure.execute(this);
+            success = true;
+        } finally {
+            if (success) {
+                commit();
+            } else {
+                rollback();
+            }
+        }
+    }
 
     /**
      * Executes a closure and if it does not throw any exceptions, then it commits the transaction.
@@ -262,12 +715,33 @@
      * @throws T if the closure throws it
      * @throws IOException If the commit fails.
      */
-    public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException;
+    public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
+        boolean success = false;
+        try {
+            R rc = closure.execute(this);
+            success = true;
+            return rc;
+        } finally {
+            if (success) {
+                commit();
+            } else {
+                rollback();
+            }
+        }
+    }
 
     /**
-     * 
      * @return true if there are no uncommitted page file updates associated with this transaction.
      */
-    public boolean isReadOnly();
+    public boolean isReadOnly() {
+        return false;
+    }
+
+    /**
+     * @return the number of pages allocated in the PageFile
+     */
+    public long getPageCount() {
+        return this.pageFile.nextFreePageId;
+    }
 
 }



Mime
View raw message