activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r688600 [2/2] - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/util/ test/java/org/apache/kahadb/page/
Date Mon, 25 Aug 2008 02:36:00 GMT
Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Sun Aug 24 19:35:58 2008
@@ -20,15 +20,17 @@
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InterruptedIOException;
+import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -43,7 +45,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.Marshaller;
-import org.apache.kahadb.page.Chunk.ChunkMarshaller;
+import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
 import org.apache.kahadb.util.IOHelper;
@@ -83,8 +85,6 @@
     private int recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER;
     private int initialPageOffset;
 
-    private DataByteArrayInputStream dataIn;
-    private byte[] readBuffer;
     private long nextFreePageId;
     
     private SequenceSet freeList = new SequenceSet();
@@ -167,16 +167,16 @@
     /**
      * Internally used by the double write buffer implementation used in this class. 
      */
-    private class PageWrite {
-        Page page;
+    private class PageWrite<T> {
+        Page<T> page;
         byte[] current;
         byte[] diskBound;
 
-        public PageWrite(Page page, byte[] data) {
+        public PageWrite(Page<T> page, byte[] data) {
             setCurrent(page, data);
         }
         
-        public void setCurrent(Page page, byte[] data) {
+        public void setCurrent(Page<T> page, byte[] data) {
             this.page=page;
             current=data;
         }
@@ -207,148 +207,237 @@
      */
     class PageFileTransaction implements Transaction {
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#getPageFile()
+        /**
+         * @see org.apache.kahadb.page.Transaction#getPageFile()
          */
         public PageFile getPageFile() {
             return PageFile.this;
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#allocate()
+        /**
+         * @see org.apache.kahadb.page.Transaction#allocate()
          */
-        public Page allocate() throws IOException {
-            if( !loaded.get() ) {
-                throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
-            }
-            
-            Page page = null;
-            
-            // We may need to create a new free page...
-            if(freeList.isEmpty()) {
-                page = new Page();
-                page.setPageId(nextFreePageId);
-                page.setType(Page.FREE_TYPE);
-                nextFreePageId ++;
-//                LOG.debug("allocated: "+page.getPageId());
-                write(page, null);
-            }
-
-            long pageId = freeList.removeFirst();
-            page = new Page();
-            page.setPageId(pageId);
-            page.setType(Page.FREE_TYPE);
-            return page;
+        public <T> Page<T> allocate() throws IOException {
+            return allocate(1);
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#allocate(int)
+        /**
+         * @see org.apache.kahadb.page.Transaction#allocate(int)
          */
-        public Page allocate(int count) throws IOException {
-            if( !loaded.get() ) {
-                throw new IllegalStateException("Cannot allocate a page when the page file is not loaded");
+        public <T> Page<T> allocate(int count) throws IOException {
+            assertLoaded();
+            if ( count <= 0 ) {
+                throw new IllegalArgumentException("The allocation count must be larger than zero");
             }
-
-            Page page = null;
+            
             Sequence seq = freeList.removeFirstSequence(count);
+
+            // We may need to create new free pages...
             if(seq==null) {
                 
-                // We may need to create a new free page...
-                page = new Page();
+                Page<T> first=null;                
                 int c=count;
                 while( c > 0 ) {
-                    page.setPageId(nextFreePageId);
-                    page.setType(Page.FREE_TYPE);
-                    nextFreePageId ++;
+                    Page<T> page = new Page<T>(nextFreePageId ++);
+                    page.makeFree(nextTxid.get());
+                    
+                    if( first == null ) {
+                        first = page;
+                    }
+                    
+                    addToCache(page);
+                    DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
+                    page.write(out);
+                    write(page, out.getData());
+
 //                    LOG.debug("allocate writing: "+page.getPageId());
-                    write(page, null);
                     c--;
-                    if( page == null ) {
-                        page = page;
-                    }
                 }
                 
-                seq = freeList.removeFirstSequence(count);
+                return first;
             }
             
-            page = new Page();
-            page.setPageId(seq.getFirst());
-            page.setType(Page.FREE_TYPE);
+            Page<T> page = new Page<T>(seq.getFirst());
+            page.makeFree(0);
 //            LOG.debug("allocated: "+page.getPageId());
             return page;
         }
-
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#free(org.apache.kahadb.page.Page)
-         */
-        public void free(Page page) throws IOException {
-            page.setType(Page.FREE_TYPE);
-            free(page.getPageId(), 1);
-        }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#free(long)
+        /**
+         * @see org.apache.kahadb.page.Transaction#free(long)
          */
         public void free(long pageId) throws IOException {
-            free(pageId, 1);
+            free(load(pageId, null));
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#free(org.apache.kahadb.page.Page, int)
+        /**
+         * @see org.apache.kahadb.page.Transaction#free(long, int)
          */
-        public void free(Page page, int count) throws IOException {
-            page.setType(Page.FREE_TYPE);
-            free(page.getPageId(), count);
+        public void free(long pageId, int count) throws IOException {
+            free(load(pageId, null), count);
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#free(long, int)
+        /**
+         * @see org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page, int)
          */
-        public void free(long pageId, int count) throws IOException {
-            if( !loaded.get() ) {
-                throw new IllegalStateException("Cannot free a page when the page file is not loaded");
-            }
-            
-            Page page = new Page();
-            long initialId = pageId;
+        public <T> void free(Page<T> page, int count) throws IOException {
+            assertLoaded();
+            long offsetPage=page.getPageId();
             for (int i = 0; i < count; i++) {
-                page.setPageId(initialId+i);
-                page.setType(Page.FREE_TYPE);
-//                LOG.debug("free: "+page.getPageId());
-                write(page, null);
+                if( page == null ) {
+                    page=load(offsetPage+i, null);
+                }
+                free(page);
+                page=null;
             }
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#write(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+        /**
+         * @see org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page)
          */
-        public void write(Page page, Marshaller marshaller) throws IOException {
+        public <T> void free(Page<T> page) throws IOException {
+            assertLoaded();
             
-            if( !loaded.get() ) {
-                throw new IllegalStateException("Cannot wriate a page when the page file is not loaded");
+            // We may need loop to free up a page chain.
+            while(page!=null){
+                
+                // Is it already free??
+                if( page.getType() == Page.PAGE_FREE_TYPE ) {
+                    return;
+                }
+
+                Page<T> next = null;
+                if( page.getType()==Page.PAGE_PART_TYPE ) {
+                    next = load(page.getNext(), null);
+                }
+
+                page.makeFree(nextTxid.get());
+                
+                DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize);
+                page.write(out);
+                write(page, out.getData());
+                
+                removeFromCache(page);
+                freeList.add(page.getPageId());
+                page = next;
             }
-                            
-            DataByteArrayOutputStream dataOut = new DataByteArrayOutputStream(pageSize);
-            page.setTxId(nextTxid.get());
-            page.write(dataOut, marshaller);
-            if (dataOut.size() > pageSize) {
-                throw new IOException("Page Size overflow: pageSize is " + pageSize + " trying to write " + dataOut.size());
+        }
+        
+        /**
+         * @see org.apache.kahadb.page.Transaction#store(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller, boolean)
+         */
+        public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
+            DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
+            if( marshaller!=null ) {
+                marshaller.writePayload(page.get(), out);
             }
+            out.close();
+        }
+
+        /**
+         * @throws IOException 
+         */
+        public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
+            assertLoaded();
             
-            page = page.copy();
-            Long key = page.getPageId();
-            addToCache(page);
+            // Copy to protect against the end user changing 
+            // the page instance while we are doing a write.
+            final Page copy = page.copy();
+            addToCache(copy);
+
+            //
+            // To support writing VERY large data, we override the output stream so that we 
+            // we do the page writes incrementally while the data is being marshalled.
+            DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize*2) {
+                Page current = copy;
+
+                @SuppressWarnings("unchecked")
+                @Override
+                protected void onWrite() throws IOException {
+                    
+                    // Are we at an overflow condition?
+                    if( pos >= pageSize ) {
+                        // If overflow is allowed
+                        if( overflow ) {
+
+                            Page next;
+                            if( current.getType() == Page.PAGE_PART_TYPE ) {
+                                next = load(current.getNext(), null);
+                            } else {
+                                next = allocate();
+                            }
+
+                            next.txId = current.txId;
 
-//            LOG.debug("write: "+page.getPageId());
+                            // Write the page header
+                            int oldPos = pos;
+                            pos = 0;
+                            
+                            current.makePagePart(next.getPageId(), nextTxid.get());
+                            current.write(this);
+
+                            // Do the page write..
+                            byte [] data = new byte[pageSize];
+                            System.arraycopy(buf, 0, data, 0, pageSize);
+                            PageFileTransaction.this.write(current, data);
+                            
+                            // Reset for the next page chunk
+                            pos = 0;
+                            // The page header marshalled after the data is written.
+                            skip(Page.PAGE_HEADER_SIZE);
+                            // Move the overflow data after the header.
+                            System.arraycopy(buf, pageSize, buf, pos, oldPos-pageSize);
+                            pos += oldPos-pageSize;
+                            current = next;
+                            
+                        } else {
+                            throw new PageOverflowIOException("Page overflow.");
+                        }
+                    }
+                    
+                }
+                
+                @SuppressWarnings("unchecked")
+                @Override
+                public void close() throws IOException {
+                    super.close();
+
+                    // We need to free up the rest of the page chain..
+                    if( current.getType() == Page.PAGE_PART_TYPE ) {
+                        free(current.getNext());
+                    }
+                    
+                    current.makePageEnd(pos, nextTxid.get());
+
+                    // Write the header..
+                    pos = 0;
+                    current.write(this);
+                    
+                    PageFileTransaction.this.write(current, buf);
+                }
+            };
             
+            // The page header marshaled after the data is written.
+            out.skip(Page.PAGE_HEADER_SIZE);
+            return out;
+        }
+        
+        /**
+         * @param page
+         * @param data
+         * @throws IOException
+         */
+        @SuppressWarnings("unchecked")
+        private <T> void write(final Page<T> page, byte[] data) throws IOException {
+            Long key = page.getPageId();
             synchronized( writes ) {
                 // If it's not in the write cache...
-                PageWrite write = writes.get(key);
+                PageWrite<T> write = writes.get(key);
                 if( write==null ) {
-                    write = new PageWrite(page, dataOut.getData());
+                    write = new PageWrite<T>(page, data);
                     writes.put(key, write);
                 } else {
-                    write.setCurrent(page, dataOut.getData());
+                    write.setCurrent(page, data);
                 }
                 
                 // Once we start approaching capacity, notify the writer to start writing
@@ -361,80 +450,198 @@
                         }
                     }
                 }
-            }
-            
-            if( page.getType() == Page.FREE_TYPE ) {
-                removeFromCache(page);
-                freeList.add(page.getPageId());
-            }
-
+            }            
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#load(long, org.apache.kahadb.Marshaller)
+        /**
+         * @see org.apache.kahadb.page.Transaction#load(long, org.apache.kahadb.Marshaller)
          */
-        public Page load(long pageId, Marshaller marshaller) throws IOException {
-            Page page = new Page();
-            page.setPageId(pageId);
+        public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
+            assertLoaded();
+            Page<T> page = new Page<T>(pageId);
             load(page, marshaller);
             return page;
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+        /**
+         * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
          */
-        public void load(Page page, Marshaller marshaller) throws IOException {
-            if( !loaded.get() ) {
-                throw new IllegalStateException("Cannot load a page when the page file is not loaded");
-            }
+        public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
+            assertLoaded();
 
             // Can't load invalid offsets...
             if (page.getPageId() < 0) {
-                page.setType(Page.INVALID_TYPE);
-                return;
+                throw new Transaction.InvalidPageIOException("Page id is not valid", page.getPageId());
             }        
 
             // Try to load it from the cache first...
-            Page t = getFromCache(page.getPageId());
+            Page<T> t = getFromCache(page.getPageId());
             if (t != null) {
                 page.copy(t);
                 return;
             }
             
-            // Read the page data
-            readFile.seek(toOffset(page.getPageId()));
-            readFile.readFully(readBuffer, 0, pageSize);
-            dataIn.restart(readBuffer);
-            
-            // Unmarshall it.
-//            LOG.debug("load: "+page.getPageId());
-            page.read(dataIn, marshaller);
+            if( marshaller!=null ) {
+                // Full page read..
+                InputStream is = openInputStream(page);
+                DataInputStream dataIn = new DataInputStream(is);
+                page.set(marshaller.readPayload(dataIn));
+                is.close();
+            } else {
+                // Page header read.
+                DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
+                readFile.seek(toOffset(page.getPageId()));
+                readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE);
+                page.read(in);
+                page.set(null);
+            }
             
             // Cache it.
-            addToCache(page);
+            if( marshaller!=null ) {
+                addToCache(page);
+            }
+        }
+
+        /**
+         * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller)
+         */
+        public InputStream openInputStream(final Page p) throws IOException {
+            
+            return new InputStream() {
+
+                private ByteSequence chunk = new ByteSequence(new byte[pageSize]);
+                private Page page = readPage(p);
+                private int pageCount=1; 
+                
+                private Page markPage;
+                private ByteSequence markChunk;
+                
+                private Page readPage(Page page) throws IOException {
+                    // Read the page data
+                    readFile.seek(toOffset(page.getPageId()));
+                    readFile.readFully(chunk.getData(), 0, pageSize);
+                    chunk.setOffset(0);
+                    chunk.setLength(pageSize);
+                    
+                    DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
+                    page.read(in);
+                    
+                    chunk.setOffset(Page.PAGE_HEADER_SIZE);
+                    if( page.getType() == Page.PAGE_END_TYPE ) {
+                        chunk.setLength((int)(page.getNext()));
+                    }
+                    
+                    if( page.getType() == Page.PAGE_FREE_TYPE ) {
+                        throw new EOFException("Chunk stream does not exist at page: "+page.getPageId());
+                    }
+                    
+                    return page;
+                }
+                
+                public int read() throws IOException {
+                    if (!atEOF()) {
+                        return chunk.data[chunk.offset++] & 0xff;
+                    } else {
+                        return -1;
+                    }
+                }
+
+                private boolean atEOF() throws IOException {
+                    if( chunk.offset < chunk.length ) {
+                      return false;  
+                    }
+                    if( page.getType() == Page.PAGE_END_TYPE ) {
+                        return true;
+                    }
+                    fill();
+                    return chunk.offset >= chunk.length;
+                }
+
+                private void fill() throws IOException {
+                    page = readPage(new Page(page.getNext()));
+                    pageCount++;
+                }
+                
+                public int read(byte[] b) throws IOException {
+                    return read(b, 0, b.length);
+                }
+
+                public int read(byte b[], int off, int len) throws IOException {
+                    if (!atEOF()) {
+                        int rc=0;
+                        while(!atEOF() && rc < len) {
+                            len = Math.min(len, chunk.length - chunk.offset);
+                            if (len > 0) {
+                                System.arraycopy(chunk.data, chunk.offset, b, off, len);
+                                chunk.offset += len;
+                            }
+                            rc+=len;
+                        }
+                        return rc;
+                    } else {
+                        return -1;
+                    }
+                }
+
+                public long skip(long len) throws IOException {
+                    if (atEOF()) {
+                        int rc=0;
+                        while(!atEOF() && rc < len) {
+                            len = Math.min(len, chunk.length - chunk.offset);
+                            if (len > 0) {
+                                chunk.offset += len;
+                            }
+                            rc+=len;
+                        }
+                        return rc;
+                    } else {
+                        return -1;
+                    }
+                }
+
+                public int available() {
+                    return chunk.length - chunk.offset;
+                }
+
+                public boolean markSupported() {
+                    return true;
+                }
+
+                public void mark(int markpos) {
+                    markPage = page;
+                    byte data[] = new byte[pageSize];
+                    System.arraycopy(chunk.getData(), 0, data, 0, pageSize);
+                    markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
+                }
+
+                public void reset() {
+                    page = markPage;
+                    chunk = markChunk;
+                }
+
+            };
         }
 
 
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#iterator()
+        /**
+         * @see org.apache.kahadb.page.Transaction#iterator()
          */
+        @SuppressWarnings("unchecked")
         public Iterator<Page> iterator() {
-            return iterator(false);
+            return (Iterator<Page>)iterator(false);
         }
         
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#iterator(boolean)
+        /**
+         * @see org.apache.kahadb.page.Transaction#iterator(boolean)
          */
-        public Iterator<Page> iterator(final boolean includeFreePages) {
+        public <T> Iterator<Page<T>> iterator(final boolean includeFreePages) {
             
-            if( !loaded.get() ) {
-                throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
-            }
+            assertLoaded();
 
-            return new Iterator<Page>() {
+            return new Iterator<Page<T>>() {
                 long nextId;
-                Page nextPage;
-                Page lastPage;
+                Page<T> nextPage;
+                Page<T> lastPage;
                 
                 private void findNextPage() {
                     if( !loaded.get() ) {
@@ -447,15 +654,10 @@
                     
                     try {
                         while( nextId < PageFile.this.nextFreePageId ) {
-                            readFile.seek(toOffset(nextId));
-                            readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
-                            dataIn.restart(readBuffer);
                             
-                            Page page = new Page();
-                            page.setPageId(nextId);
-                            page.read(dataIn, null);
+                            Page<T> page = load(nextId, null);
                             
-                            if( includeFreePages || page.getType()!=Page.FREE_TYPE ) {
+                            if( includeFreePages || page.getType()!=Page.PAGE_FREE_TYPE ) {
                                 nextPage = page;
                                 return;
                             } else {
@@ -471,7 +673,7 @@
                     return nextPage !=null;
                 }
 
-                public Page next() {
+                public Page<T> next() {
                     findNextPage(); 
                     if( nextPage !=null ) {
                         lastPage = nextPage;
@@ -497,8 +699,8 @@
             };
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#commit()
+        /**
+         * @see org.apache.kahadb.page.Transaction#commit()
          */
         public void commit() throws IOException {
         }
@@ -509,8 +711,8 @@
         private void rollback() throws IOException {
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#execute(org.apache.kahadb.page.PageFile.Closure)
+        /**
+         * @see org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.Closure)
          */
         public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
             boolean success=false;
@@ -526,8 +728,8 @@
             }
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#execute(org.apache.kahadb.page.PageFile.CallableClosure)
+        /**
+         * @see org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.CallableClosure)
          */
         public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
             boolean success=false;
@@ -544,15 +746,16 @@
             }
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.kahadb.page.ITransaction#isReadOnly()
+        /**
+         * @see org.apache.kahadb.page.Transaction#isReadOnly()
          */
         public boolean isReadOnly() {
             return false;
         }
     }
     
-    public PageFileTransaction tx() {
+    public Transaction tx() {
+        assertLoaded();
         return new PageFileTransaction();
     }
     
@@ -634,9 +837,6 @@
             recoveryBufferSize = (pageSize+RECOVERY_HEADER_SIZE) * MAX_PAGES_IN_RECOVERY_BUFFER;
             initialPageOffset = CONFIG_SPACE_SIZE+recoveryBufferSize;
             
-            dataIn = new DataByteArrayInputStream();
-            readBuffer = new byte[pageSize];
-
             long lastTxId=0;
             if(  metaData.isCleanShutdown() ) {
                 lastTxId = metaData.getLastTxId();
@@ -649,20 +849,11 @@
                 
                 // Scan all to find the free pages.
                 freeList = new SequenceSet();
-                long length = readFile.length();
-                Page page = new Page();
-                long offset = initialPageOffset;
-                while( offset < length ) {
-                    // Just read the headers to re-build the free list.
-                    readFile.seek(offset);
-                    readFile.readFully(readBuffer, 0, Page.PAGE_HEADER_SIZE);
-                    dataIn.restart(readBuffer);
-                    page.read(dataIn, null);
-                    
-                    if( page.getType() == Page.FREE_TYPE ) {
-                        freeList.add(offset);
+                for (Iterator i = tx().iterator(true); i.hasNext();) {
+                    Page page = (Page)i.next();
+                    if( page.getType() == Page.PAGE_FREE_TYPE ) {
+                        freeList.add(page.getPageId());
                     }
-                    offset+=pageSize;
                 }
                 
             }
@@ -1020,20 +1211,29 @@
         return initialPageOffset+(pageId*pageSize);
     }
 
+    /**
+     * @throws IllegalStateException if the page file is not loaded.
+     */
+    private void assertLoaded() throws IllegalStateException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("PageFile is not loaded");
+        }
+    }
 
     ///////////////////////////////////////////////////////////////////
     // Internal Cache Related operations
     ///////////////////////////////////////////////////////////////////
     
-    private Page getFromCache(long pageId) {
+    @SuppressWarnings("unchecked")
+    private <T> Page<T> getFromCache(long pageId) {
         synchronized(writes) {
-            PageWrite pageWrite = writes.get(pageId);
+            PageWrite<T> pageWrite = writes.get(pageId);
             if( pageWrite != null ) {
                 return pageWrite.page;
             }
         }
 
-        Page result = null;
+        Page<T> result = null;
         if (enablePageCaching) {
             result = pageCache.get(pageId);
         }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Sun Aug 24 19:35:58 2008
@@ -17,6 +17,8 @@
 package org.apache.kahadb.page;
 
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.Iterator;
 
 import org.apache.kahadb.Marshaller;
@@ -28,6 +30,29 @@
 interface Transaction extends Iterable<Page> {
     
     /**
+     * 
+     */
+    public class PageOverflowIOException extends IOException {
+        public PageOverflowIOException(String message) {
+            super(message);
+        }
+    }
+    
+    public class InvalidPageIOException extends IOException {
+        private final long page;
+
+        public InvalidPageIOException(String message, long page) {
+            super(message);
+            this.page = page;
+        }
+
+        public long getPage() {
+            return page;
+        }
+    }
+    
+    
+    /**
      * This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
      * 
      * @param <T> The type of exceptions that operation will throw.
@@ -57,7 +82,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public Page allocate() throws IOException;
+    public <T> Page<T> allocate() throws IOException;
 
     /** 
      * Allocates a block of free pages that you can write data to.
@@ -69,7 +94,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public Page allocate(int count) throws IOException;
+    public <T> Page<T> allocate(int count) throws IOException;
 
     /**
      * Frees up a previously allocated page so that it can be re-allocated again.
@@ -80,7 +105,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public void free(Page page) throws IOException;
+    public <T> void free(Page<T> page) throws IOException;
 
     /**
      * Frees up a previously allocated page so that it can be re-allocated again.
@@ -104,7 +129,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public void free(Page page, int count) throws IOException;
+    public <T> void free(Page<T> page, int count) throws IOException;
 
     /**
      * Frees up a previously allocated sequence of pages so that it can be re-allocated again.
@@ -125,12 +150,18 @@
      *        the page to write. The Page object must be fully populated with a valid pageId, type, and data.
      * @param marshaller
      *        the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
+     * @param overflow
+     *        If true, then if the page data marshalls to a bigger size than can fit in one page, then additional 
+     *        overflow pages are automatically allocated and chained to this page to store all the data.  If false,
+     *        and the overflow condition would occur, then the PageOverflowIOException is thrown. 
      * @throws IOException
      *         If an disk error occurred.
+     * @throws PageOverflowIOException
+     *         If the page data marshalls to size larger than maximum page size and overflow was false.
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public void write(Page page, Marshaller marshaller) throws IOException;
+    public <T> void store(Page<T> page, Marshaller<T> marshaller, boolean overflow) throws IOException, PageOverflowIOException;
 
     /**
      * Loads a page from disk.
@@ -145,7 +176,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public Page load(long pageId, Marshaller marshaller) throws IOException;
+    public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException;
 
     /**
      * Loads a page from disk.  If the page.pageId is not valid then then this method will set the page.type to
@@ -159,8 +190,26 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public void load(Page page, Marshaller marshaller) throws IOException;
+    public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException;
 
+    
+    /**
+     * 
+     * @param page
+     * @param overflow
+     * @return
+     * @throws IOException
+     */
+    public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException;
+    
+    /**
+     * 
+     * @param p
+     * @return
+     * @throws IOException
+     */
+    public InputStream openInputStream(final Page p) throws IOException;
+    
     /**
      * Allows you to iterate through all active Pages in this object.  Pages with type Page.FREE_TYPE are 
      * not included in this iteration. 
@@ -170,7 +219,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public Iterator<Page> iterator();
+    public <T> Iterator<Page<T>> iterator();
 
     /**
      * Allows you to iterate through all active Pages in this object.  You can optionally include free pages in the pages
@@ -181,7 +230,7 @@
      * @throws IllegalStateException
      *         if the PageFile is not loaded
      */
-    public Iterator<Page> iterator(final boolean includeFreePages);
+    public <T> Iterator<Page<T>> iterator(final boolean includeFreePages);
 
     /**
      * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayInputStream.java Sun Aug 24 19:35:58 2008
@@ -30,6 +30,7 @@
     private byte[] buf;
     private int pos;
     private int offset;
+    private int length;
 
     /**
      * Creates a <code>StoreByteArrayInputStream</code>.
@@ -40,6 +41,7 @@
         this.buf = buf;
         this.pos = 0;
         this.offset = 0;
+        this.length = buf.length;
     }
 
     /**
@@ -51,6 +53,7 @@
         this.buf = sequence.getData();
         this.offset = sequence.getOffset();
         this.pos =  this.offset;
+        this.length = sequence.length;
     }
 
     /**
@@ -84,6 +87,12 @@
     public void restart(byte[] newBuff) {
         buf = newBuff;
         pos = 0;
+        length = newBuff.length;
+    }
+
+    public void restart() {
+        pos = 0;
+        length = buf.length;
     }
 
     /**
@@ -95,6 +104,7 @@
     public void restart(ByteSequence sequence) {
         this.buf = sequence.getData();
         this.pos = sequence.getOffset();
+        this.length = sequence.getLength();
     }
 
     /**
@@ -107,6 +117,7 @@
             buf = new byte[size];
         }
         restart(buf);
+        this.length = size;
     }
 
     /**
@@ -121,7 +132,7 @@
      *         stream has been reached.
      */
     public int read() {
-        return (pos < buf.length) ? (buf[pos++] & 0xff) : -1;
+        return (pos < length) ? (buf[pos++] & 0xff) : -1;
     }
 
     /**
@@ -139,11 +150,11 @@
         if (b == null) {
             throw new NullPointerException();
         }
-        if (pos >= buf.length) {
+        if (pos >= length) {
             return -1;
         }
-        if (pos + len > buf.length) {
-            len = buf.length - pos;
+        if (pos + len > length) {
+            len = length - pos;
         }
         if (len <= 0) {
             return 0;
@@ -158,7 +169,7 @@
      *         without blocking.
      */
     public int available() {
-        return buf.length - pos;
+        return length - pos;
     }
 
     public void readFully(byte[] b) {
@@ -170,8 +181,8 @@
     }
 
     public int skipBytes(int n) {
-        if (pos + n > buf.length) {
-            n = buf.length - pos;
+        if (pos + n > length) {
+            n = length - pos;
         }
         if (n < 0) {
             return 0;
@@ -233,7 +244,7 @@
 
     public String readLine() {
         int start = pos;
-        while (pos < buf.length) {
+        while (pos < length) {
             int c = read();
             if (c == '\n') {
                 break;
@@ -309,4 +320,20 @@
         }
         return new String(characters, 0, count);
     }
+
+    public int getPos() {
+        return pos;
+    }
+
+    public void setPos(int pos) {
+        this.pos = pos;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/DataByteArrayOutputStream.java Sun Aug 24 19:35:58 2008
@@ -26,10 +26,10 @@
  * 
  * @version $Revision: 1.1.1.1 $
  */
-public final class DataByteArrayOutputStream extends OutputStream implements DataOutput {
+public class DataByteArrayOutputStream extends OutputStream implements DataOutput {
     private static final int DEFAULT_SIZE = 2048;
-    private byte buf[];
-    private int pos;
+    protected byte buf[];
+    protected int pos;
 
     /**
      * Creates a new byte array output stream, with a buffer capacity of the
@@ -82,12 +82,14 @@
      * Writes the specified byte to this byte array output stream.
      * 
      * @param b the byte to be written.
+     * @throws IOException 
      */
-    public void write(int b) {
+    public void write(int b) throws IOException {
         int newcount = pos + 1;
         ensureEnoughBuffer(newcount);
         buf[pos] = (byte)b;
         pos = newcount;
+        onWrite();
     }
 
     /**
@@ -97,8 +99,9 @@
      * @param b the data.
      * @param off the start offset in the data.
      * @param len the number of bytes to write.
+     * @throws IOException 
      */
-    public void write(byte b[], int off, int len) {
+    public void write(byte b[], int off, int len) throws IOException {
         if (len == 0) {
             return;
         }
@@ -106,6 +109,7 @@
         ensureEnoughBuffer(newcount);
         System.arraycopy(b, off, buf, pos, len);
         pos = newcount;
+        onWrite();
     }
 
     /**
@@ -126,47 +130,54 @@
      * Set the current position for writing
      * 
      * @param offset
+     * @throws IOException 
      */
-    public void position(int offset) {
+    public void position(int offset) throws IOException {
         ensureEnoughBuffer(offset);
         pos = offset;
+        onWrite();
     }
 
     public int size() {
         return pos;
     }
 
-    public void writeBoolean(boolean v) {
+    public void writeBoolean(boolean v) throws IOException {
         ensureEnoughBuffer(pos + 1);
         buf[pos++] = (byte)(v ? 1 : 0);
+        onWrite();
     }
 
-    public void writeByte(int v) {
+    public void writeByte(int v) throws IOException {
         ensureEnoughBuffer(pos + 1);
         buf[pos++] = (byte)(v >>> 0);
+        onWrite();
     }
 
-    public void writeShort(int v) {
+    public void writeShort(int v) throws IOException {
         ensureEnoughBuffer(pos + 2);
         buf[pos++] = (byte)(v >>> 8);
         buf[pos++] = (byte)(v >>> 0);
+        onWrite();
     }
 
-    public void writeChar(int v) {
+    public void writeChar(int v) throws IOException {
         ensureEnoughBuffer(pos + 2);
         buf[pos++] = (byte)(v >>> 8);
         buf[pos++] = (byte)(v >>> 0);
+        onWrite();
     }
 
-    public void writeInt(int v) {
+    public void writeInt(int v) throws IOException {
         ensureEnoughBuffer(pos + 4);
         buf[pos++] = (byte)(v >>> 24);
         buf[pos++] = (byte)(v >>> 16);
         buf[pos++] = (byte)(v >>> 8);
         buf[pos++] = (byte)(v >>> 0);
+        onWrite();
     }
 
-    public void writeLong(long v) {
+    public void writeLong(long v) throws IOException {
         ensureEnoughBuffer(pos + 8);
         buf[pos++] = (byte)(v >>> 56);
         buf[pos++] = (byte)(v >>> 48);
@@ -176,6 +187,7 @@
         buf[pos++] = (byte)(v >>> 16);
         buf[pos++] = (byte)(v >>> 8);
         buf[pos++] = (byte)(v >>> 0);
+        onWrite();
     }
 
     public void writeFloat(float v) throws IOException {
@@ -186,14 +198,14 @@
         writeLong(Double.doubleToLongBits(v));
     }
 
-    public void writeBytes(String s) {
+    public void writeBytes(String s) throws IOException {
         int length = s.length();
         for (int i = 0; i < length; i++) {
             write((byte)s.charAt(i));
         }
     }
 
-    public void writeChars(String s) {
+    public void writeChars(String s) throws IOException {
         int length = s.length();
         for (int i = 0; i < length; i++) {
             int c = s.charAt(i);
@@ -242,6 +254,7 @@
                 buf[pos++] = (byte)(0x80 | ((c >> 0) & 0x3F));
             }
         }
+        onWrite();
     }
 
     private void ensureEnoughBuffer(int newcount) {
@@ -251,4 +264,17 @@
             buf = newbuf;
         }
     }
+    
+    /**
+     * This method is called after each write to the buffer.  This should allow subclasses 
+     * to take some action based on the writes, for example flushing data to an external system based on size. 
+     */
+    protected void onWrite() throws IOException {
+    }
+
+    public void skip(int size) throws IOException {
+        ensureEnoughBuffer(pos + size);
+        pos+=size;
+        onWrite();
+    }
 }

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexBenchMark.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexBenchMark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexBenchMark.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.Store;
+import org.apache.kahadb.StringMarshaller;
+
+public class BTreeIndexBenchMark extends IndexBenchmark {
+
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+
+        Transaction tx = pf.tx();
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, Long> index = new BTreeIndex<String, Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexTest.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexTest.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+
+public class BTreeIndexTest extends IndexTestSupport {
+
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+        
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        BTreeIndex<String, Long> index = new BTreeIndex<String,Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+}

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java Sun Aug 24 19:35:58 2008
@@ -18,27 +18,23 @@
 
 import java.io.File;
 
+import org.apache.kahadb.LongMarshaller;
 import org.apache.kahadb.Store;
-import org.apache.kahadb.impl.index.Index;
-import org.apache.kahadb.impl.index.IndexBenchmark;
+import org.apache.kahadb.StringMarshaller;
 
 public class HashIndexBenchMark extends IndexBenchmark {
 
     @Override
-    protected Index createIndex(File root, String name) throws Exception {
+    protected Index<String, Long> createIndex() throws Exception {
 
-        PageFile pf = new PageFile(root, name);
-        pf.load();
-        
         Transaction tx = pf.tx();
         long id = tx.allocate().getPageId();
         tx.commit();
 
-        HashIndex index = new HashIndex(indexManager, pf, id);
-        index.setKeyMarshaller(Store.STRING_MARSHALLER);
+        HashIndex<String, Long> index = new HashIndex<String, Long>(pf, id);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
         
-//        index.setEnableRecoveryBuffer(false);
-//        index.setEnableSyncedWrites(false);
         return index;
     }
 

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexTest.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexTest.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexTest.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+
+public class HashIndexTest extends IndexTestSupport {
+
+    @Override
+    protected Index<String, Long> createIndex() throws Exception {
+        
+        long id = tx.allocate().getPageId();
+        tx.commit();
+
+        HashIndex<String, Long> index = new HashIndex<String,Long>(pf, id);
+        index.setBinCapacity(12);
+        index.setKeyMarshaller(StringMarshaller.INSTANCE);
+        index.setValueMarshaller(LongMarshaller.INSTANCE);
+        
+        return index;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import junit.framework.TestCase;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * @author chirino
+ */
+public abstract class IndexBenchmark extends TestCase {
+
+    // Slower machines might need to make this bigger.
+    private static final long SAMPLE_DURATION = Integer.parseInt(System.getProperty("SAMPLES_DURATION", "" + 1000 * 5));
+    // How many times do we sample?
+    private static final long SAMPLES = Integer.parseInt(System.getProperty("SAMPLES", "" + 60 * 1000 / SAMPLE_DURATION));
+    // How many indexes will we be benchmarking concurrently?
+    private static final int INDEX_COUNT = Integer.parseInt(System.getProperty("INDEX_COUNT", "" + 1));
+    // Indexes tend to perform worse when they get big.. so how many items
+    // should we put into the index before we start sampling.
+    private static final int INDEX_PRE_LOAD_COUNT = Integer.parseInt(System.getProperty("INDEX_PRE_LOAD_COUNT", "" + 10000 / INDEX_COUNT));
+
+    protected File ROOT_DIR;
+    protected final HashMap<String, Index<String, Long>> indexes = new HashMap<String, Index<String, Long>>();
+    protected PageFile pf;
+
+    public void setUp() throws Exception {
+        ROOT_DIR = new File(IOHelper.getDefaultDataDirectory());
+        IOHelper.mkdirs(ROOT_DIR);
+        IOHelper.deleteChildren(ROOT_DIR);
+        
+        pf = new PageFile(ROOT_DIR, getClass().getName());
+        pf.load();
+    }
+
+    protected void tearDown() throws Exception {
+        for (Index i : indexes.values()) {
+            try {
+                i.unload();
+            } catch (Throwable ignore) {
+            }
+        }
+    }
+
+    abstract protected Index<String, Long> createIndex() throws Exception;
+
+    synchronized private Index<String, Long> openIndex(String name) throws Exception {
+        Index<String, Long> index = indexes.get(name);
+        if (index == null) {
+            index = createIndex();
+            index.load();
+            indexes.put(name, index);
+        }
+        return index;
+    }
+
+    class Producer extends Thread {
+        private final String name;
+        AtomicBoolean shutdown = new AtomicBoolean();
+
+        public Producer(String name) {
+            super("Producer: " + name);
+            this.name = name;
+        }
+
+        public void shutdown() {
+            shutdown.set(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+
+                Transaction tx = pf.tx();
+                
+                Index<String,Long> index = openIndex(name);
+                long counter = 0;
+                while (!shutdown.get()) {
+                    long c = counter;
+
+                    String key = "a-long-message-id-like-key-" + c;
+                    
+                    index.put(tx, key, c);
+                    tx.commit();
+                    
+                    onProduced(counter++);
+                }
+
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void onProduced(long counter) {
+        }
+    }
+
+    class Consumer extends Thread {
+        private final String name;
+        AtomicBoolean shutdown = new AtomicBoolean();
+
+        public Consumer(String name) {
+            super("Consumer: " + name);
+            this.name = name;
+        }
+
+        public void shutdown() {
+            shutdown.set(true);
+        }
+
+        @Override
+        public void run() {
+            try {
+                Transaction tx = pf.tx();
+
+                Index<String,Long> index = openIndex(name);
+                long counter = 0;
+                while (!shutdown.get()) {
+                    long c = counter;
+                    String key = "a-long-message-id-like-key-" + c;
+                    Long record = index.get(tx, key);
+                    if (record != null) {
+                        index.remove(tx, key);
+                        tx.commit();
+                        onConsumed(counter++);
+                    } else {
+                        Thread.sleep(0);
+                    }
+                }
+            } catch (Throwable e) {
+                e.printStackTrace();
+            }
+        }
+
+        public void onConsumed(long counter) {
+        }
+    }
+
+    public void testLoad() throws Exception {
+
+        final Producer producers[] = new Producer[INDEX_COUNT];
+        final Consumer consumers[] = new Consumer[INDEX_COUNT];
+        final CountDownLatch preloadCountDown = new CountDownLatch(INDEX_COUNT);
+        final AtomicLong producedRecords = new AtomicLong();
+        final AtomicLong consumedRecords = new AtomicLong();
+
+        System.out.println("Starting: " + INDEX_COUNT + " producers");
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            producers[i] = new Producer("test-" + i) {
+                private boolean prelaodDone;
+
+                public void onProduced(long counter) {
+                    if (!prelaodDone && counter >= INDEX_PRE_LOAD_COUNT) {
+                        prelaodDone = true;
+                        preloadCountDown.countDown();
+                    }
+                    producedRecords.incrementAndGet();
+                }
+            };
+            producers[i].start();
+        }
+
+        long start = System.currentTimeMillis();
+        System.out.println("Waiting for each producer create " + INDEX_PRE_LOAD_COUNT + " records before starting the consumers.");
+        preloadCountDown.await();
+        long end = System.currentTimeMillis();
+        System.out.println("Preloaded " + INDEX_PRE_LOAD_COUNT * INDEX_COUNT + " records at " + (INDEX_PRE_LOAD_COUNT * INDEX_COUNT * 1000f / (end - start)) + " records/sec");
+
+        System.out.println("Starting: " + INDEX_COUNT + " consumers");
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            consumers[i] = new Consumer("test-" + i) {
+                public void onConsumed(long counter) {
+                    consumedRecords.incrementAndGet();
+                }
+            };
+            consumers[i].start();
+        }
+
+        long sample_start = System.currentTimeMillis();
+        System.out.println("Taking " + SAMPLES + " performance samples every " + SAMPLE_DURATION + " ms");
+        System.out.println("time (s), produced, produce rate (r/s), consumed, consume rate (r/s), used memory (k)");
+        producedRecords.set(0);
+        consumedRecords.set(0);
+        for (int i = 0; i < SAMPLES; i++) {
+            start = System.currentTimeMillis();
+            Thread.sleep(SAMPLE_DURATION);
+            end = System.currentTimeMillis();
+            long p = producedRecords.getAndSet(0);
+            long c = consumedRecords.getAndSet(0);
+
+            long usedMemory = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
+
+            System.out.println(((end-sample_start)/1000f)+", "+p+", "+(p * 1000f / (end - start)) + ", "+ c+", " + (c * 1000f / (end - start))+", "+(usedMemory/(1024)) );
+        }
+        System.out.println("Samples done... Shutting down the producers and consumers...");
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            producers[i].shutdown();
+            consumers[i].shutdown();
+        }
+        for (int i = 0; i < INDEX_COUNT; i++) {
+            producers[i].join(1000 * 5);
+            consumers[i].join(1000 * 5);
+        }
+        System.out.println("Shutdown.");
+    }
+
+}

Propchange: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexTestSupport.java?rev=688600&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexTestSupport.java (added)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexTestSupport.java Sun Aug 24 19:35:58 2008
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.page;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+import org.apache.kahadb.LongMarshaller;
+import org.apache.kahadb.StringMarshaller;
+import org.apache.kahadb.util.IOHelper;
+
+/**
+ * Test a HashIndex
+ */
+public abstract class IndexTestSupport extends TestCase {
+
+    private static final int COUNT = 10000;
+
+    protected Index<String,Long> index;
+    protected File directory;
+    protected PageFile pf;
+    protected Transaction tx;
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#setUp()
+     */
+    protected void setUp() throws Exception {
+        super.setUp();
+        directory = new File(IOHelper.getDefaultDataDirectory());
+        IOHelper.mkdirs(directory);
+        IOHelper.deleteChildren(directory);
+        
+        pf = new PageFile(directory, getClass().getName());
+        pf.load();
+        tx = pf.tx();
+    
+        this.index = createIndex();
+    }
+    
+    abstract protected Index<String, Long> createIndex() throws Exception;
+
+    public void testHashIndex() throws Exception {
+        String keyRoot = "key:";
+        this.index.load();
+        doInsert(keyRoot);
+        this.index.unload();
+        this.index.load();
+        checkRetrieve(keyRoot);
+        doRemove(keyRoot);
+        this.index.unload();
+        this.index.load();
+        doInsert(keyRoot);
+        doRemoveHalf(keyRoot);
+        doInsertHalf(keyRoot);
+        this.index.unload();
+        this.index.load();
+        checkRetrieve(keyRoot);
+        this.index.unload();
+    }
+
+    void doInsert(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            index.put(tx, keyRoot + i, (long)i);
+            tx.commit();
+        }
+    }
+
+    void checkRetrieve(String keyRoot) throws IOException {
+        for (int i = 0; i < COUNT; i++) {
+            Long item = index.get(tx, keyRoot + i);
+            assertNotNull("Key missing: "+keyRoot + i, item);
+        }
+    }
+
+    void doRemoveHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                index.remove(tx, keyRoot + i);
+                tx.commit();
+            }
+
+        }
+    }
+
+    void doInsertHalf(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            if (i % 2 == 0) {
+                index.put(tx, keyRoot + i, (long)i);
+                tx.commit();
+            }
+        }
+    }
+
+    void doRemove(String keyRoot) throws Exception {
+        for (int i = 0; i < COUNT; i++) {
+            index.remove(tx, keyRoot + i);
+            tx.commit();
+        }
+        for (int i = 0; i < COUNT; i++) {
+            Long item = index.get(tx, keyRoot + i);
+            assertNull(item);
+        }
+    }
+
+    void doRemoveBackwards(String keyRoot) throws Exception {
+        for (int i = COUNT - 1; i >= 0; i--) {
+            index.remove(tx, keyRoot + i);
+            tx.commit();
+        }
+        for (int i = 0; i < COUNT; i++) {
+            Long item = index.get(tx, keyRoot + i);
+            assertNull(item);
+        }
+    }
+
+    /**
+     * @throws java.lang.Exception
+     * @see junit.framework.TestCase#tearDown()
+     */
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        pf.unload();
+    }
+}

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java?rev=688600&r1=688599&r2=688600&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/PageFileTest.java Sun Aug 24 19:35:58 2008
@@ -16,8 +16,12 @@
  */
 package org.apache.kahadb.page;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.HashSet;
 
 import org.apache.kahadb.StringMarshaller;
@@ -25,8 +29,6 @@
 import junit.framework.TestCase;
 
 public class PageFileTest extends TestCase {
-
-    static final short TEST_TYPE = 65;
     
     public void testCRUD() throws IOException {
         
@@ -39,13 +41,13 @@
         // Insert some data into the page file.
         Transaction tx = pf.tx();
         for( int i=0 ; i < 100; i++) {
-            Page page = tx.allocate();
-            page.setType(TEST_TYPE);
+            Page<String> page = tx.allocate();
+            assertEquals(Page.PAGE_FREE_TYPE, page.getType());
             
             String t = "page:"+i;
             expected.add(t);
-            page.setData(t);
-            tx.write(page, StringMarshaller.INSTANCE);
+            page.set(t);
+            tx.store(page, StringMarshaller.INSTANCE, false);
             tx.commit();
         }
         
@@ -56,9 +58,9 @@
         
         // Iterate it to make sure they are still there..
         HashSet<String> actual = new HashSet<String>();
-        for (Page page : tx) {
+        for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            actual.add((String)page.getData());
+            actual.add(page.get());
         }
         assertEquals(expected, actual);
         
@@ -70,9 +72,9 @@
             String t = "page:"+i;
             expected.remove(t);
         }
-        for (Page page : tx) {
+        for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            if( !expected.contains(page.getData()) ) {
+            if( !expected.contains(page.get()) ) {
                 tx.free(page);
             }
         }
@@ -85,9 +87,9 @@
         
         // Iterate it to make sure the even records are still there..
         actual.clear();
-        for (Page page : tx) {
+        for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            actual.add((String)page.getData());
+            actual.add((String)page.get());
         }
         assertEquals(expected, actual);
 
@@ -97,10 +99,10 @@
         for (String s : t) {
             expected.add(s+":updated");
         }
-        for (Page page : tx) {
+        for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            page.setData(page.getData()+":updated");
-            tx.write(page, StringMarshaller.INSTANCE);
+            page.set(page.get()+":updated");
+            tx.store(page, StringMarshaller.INSTANCE, false);
         }
         tx.commit();
         
@@ -111,13 +113,48 @@
 
         // Iterate it to make sure the updated records are still there..
         actual.clear();
-        for (Page page : tx) {
+        for (Page<String> page : tx) {
             tx.load(page, StringMarshaller.INSTANCE);
-            actual.add((String)page.getData());
+            actual.add(page.get());
         }
         assertEquals(expected, actual);
         
         pf.unload();
     }
     
+    public void testStreams() throws IOException {
+        
+        PageFile pf = new PageFile(new File("target/test-data"), getName());
+        pf.delete();
+        pf.load();
+        
+        Transaction tx = pf.tx();
+        Page page = tx.allocate();
+        tx.commit();
+        
+        OutputStream pos = tx.openOutputStream(page, true);
+        DataOutputStream os = new DataOutputStream(pos);
+        for( int i=0; i < 10000; i++) {
+            os.writeUTF("Test string:"+i);
+        }
+        
+        os.close();
+        tx.commit();
+        
+        // Reload the page file.
+        pf.unload();
+        pf.load();
+        tx = pf.tx();
+        
+        InputStream pis = tx.openInputStream(page);
+        DataInputStream is = new DataInputStream(pis);
+        for( int i=0; i < 10000; i++) {
+            assertEquals("Test string:"+i, is.readUTF());
+        }
+        assertEquals(-1, is.read());
+        is.close();
+
+        pf.unload();
+    }
+
 }



Mime
View raw message