Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 95863 invoked from network); 8 Sep 2008 14:25:50 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Sep 2008 14:25:50 -0000 Received: (qmail 41089 invoked by uid 500); 8 Sep 2008 14:25:48 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 41065 invoked by uid 500); 8 Sep 2008 14:25:48 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 41056 invoked by uid 99); 8 Sep 2008 14:25:48 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2008 07:25:48 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Sep 2008 14:24:57 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 45DBF2388986; Mon, 8 Sep 2008 07:25:29 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r693109 - in /activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page: PageFile.java Transaction.java Date: Mon, 08 Sep 2008 14:25:28 -0000 To: commits@activemq.apache.org From: chirino@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080908142529.45DBF2388986@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: chirino Date: Mon Sep 8 07:25:27 2008 New Revision: 693109 URL: http://svn.apache.org/viewvc?rev=693109&view=rev Log: Refactored the PageFile/Transaction stuff a bit to make it easier to maintian. At this time an interface is not needed for Transaction. Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=693109&r1=693108&r2=693109&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Mon Sep 8 07:25:27 2008 @@ -20,21 +20,17 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.InterruptedIOException; -import java.io.OutputStream; import java.io.RandomAccessFile; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; -import java.util.NoSuchElementException; import java.util.Properties; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; @@ -44,14 +40,9 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.kahadb.Marshaller; -import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.DataByteArrayInputStream; -import org.apache.kahadb.util.DataByteArrayOutputStream; import org.apache.kahadb.util.IOHelper; import org.apache.kahadb.util.IntrospectionSupport; import org.apache.kahadb.util.LRUCache; -import org.apache.kahadb.util.Sequence; import org.apache.kahadb.util.SequenceSet; /** @@ -78,32 +69,32 @@ private final String name; private File directory; - private RandomAccessFile readFile; + RandomAccessFile readFile; private RandomAccessFile writeFile; - private int pageSize = DEFAULT_PAGE_SIZE; + int pageSize = DEFAULT_PAGE_SIZE; private int recoveryBufferSize=(this.pageSize+RECOVERY_HEADER_SIZE)*MAX_PAGES_IN_RECOVERY_BUFFER; private int initialPageOffset; - private long nextFreePageId; + long nextFreePageId; - private SequenceSet freeList = new SequenceSet(); - private AtomicBoolean loaded = new AtomicBoolean(); + SequenceSet freeList = new SequenceSet(); + AtomicBoolean loaded = new AtomicBoolean(); private LRUCache pageCache; private boolean enableRecoveryBuffer=false; private boolean enableSyncedWrites=false; private boolean enablePageCaching=true; - private boolean enableAsyncWrites=false; + boolean enableAsyncWrites=false; private int pageCacheSize = 100; - private TreeMap writes=new TreeMap(); + TreeMap writes=new TreeMap(); private Thread writerThread; AtomicBoolean stopWriter = new AtomicBoolean(); private CountDownLatch checkpointLatch; - private AtomicLong nextTxid = new AtomicLong(); + AtomicLong nextTxid = new AtomicLong(); private MetaData metaData; /** @@ -167,7 +158,7 @@ /** * Internally used by the double write buffer implementation used in this class. */ - private class PageWrite { + class PageWrite { Page page; byte[] current; byte[] diskBound; @@ -201,567 +192,9 @@ } - /** - * Provides transaction update access to the PageFile. All operations that modify - * the PageFile are done via a Transaction. - */ - class PageFileTransaction implements Transaction { - - /** - * @see org.apache.kahadb.page.Transaction#getPageFile() - */ - public PageFile getPageFile() { - return PageFile.this; - } - - /** - * @see org.apache.kahadb.page.Transaction#allocate() - */ - public Page allocate() throws IOException { - return allocate(1); - } - - /** - * @see org.apache.kahadb.page.Transaction#allocate(int) - */ - public Page allocate(int count) throws IOException { - assertLoaded(); - if ( count <= 0 ) { - throw new IllegalArgumentException("The allocation count must be larger than zero"); - } - - Sequence seq = freeList.removeFirstSequence(count); - - // We may need to create new free pages... - if(seq==null) { - - Page first=null; - int c=count; - while( c > 0 ) { - Page page = new Page(nextFreePageId ++); - page.makeFree(nextTxid.get()); - - if( first == null ) { - first = page; - } - - addToCache(page); - DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); - page.write(out); - write(page, out.getData()); - -// LOG.debug("allocate writing: "+page.getPageId()); - c--; - } - - return first; - } - - Page page = new Page(seq.getFirst()); - page.makeFree(0); -// LOG.debug("allocated: "+page.getPageId()); - return page; - } - - /** - * @see org.apache.kahadb.page.Transaction#free(long) - */ - public void free(long pageId) throws IOException { - free(load(pageId, null)); - } - - /** - * @see org.apache.kahadb.page.Transaction#free(long, int) - */ - public void free(long pageId, int count) throws IOException { - free(load(pageId, null), count); - } - - /** - * @see org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page, int) - */ - public void free(Page page, int count) throws IOException { - assertLoaded(); - long offsetPage=page.getPageId(); - for (int i = 0; i < count; i++) { - if( page == null ) { - page=load(offsetPage+i, null); - } - free(page); - page=null; - } - } - - /** - * @see org.apache.kahadb.page.Transaction#free(org.apache.kahadb.page.Page) - */ - public void free(Page page) throws IOException { - assertLoaded(); - - // We may need loop to free up a page chain. - while(page!=null){ - - // Is it already free?? - if( page.getType() == Page.PAGE_FREE_TYPE ) { - return; - } - - Page next = null; - if( page.getType()==Page.PAGE_PART_TYPE ) { - next = load(page.getNext(), null); - } - - page.makeFree(nextTxid.get()); - - DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize); - page.write(out); - write(page, out.getData()); - - removeFromCache(page); - freeList.add(page.getPageId()); - page = next; - } - } - - /** - * @see org.apache.kahadb.page.Transaction#store(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller, boolean) - */ - public void store(Page page, Marshaller marshaller, final boolean overflow) throws IOException { - DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow); - if( marshaller!=null ) { - marshaller.writePayload(page.get(), out); - } - out.close(); - } - - /** - * @throws IOException - */ - public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException { - assertLoaded(); - - // Copy to protect against the end user changing - // the page instance while we are doing a write. - final Page copy = page.copy(); - addToCache(copy); - - // - // To support writing VERY large data, we override the output stream so that we - // we do the page writes incrementally while the data is being marshalled. - DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageSize*2) { - Page current = copy; - - @SuppressWarnings("unchecked") - @Override - protected void onWrite() throws IOException { - - // Are we at an overflow condition? - if( pos >= pageSize ) { - // If overflow is allowed - if( overflow ) { - - Page next; - if( current.getType() == Page.PAGE_PART_TYPE ) { - next = load(current.getNext(), null); - } else { - next = allocate(); - } - - next.txId = current.txId; - - // Write the page header - int oldPos = pos; - pos = 0; - - current.makePagePart(next.getPageId(), nextTxid.get()); - current.write(this); - - // Do the page write.. - byte [] data = new byte[pageSize]; - System.arraycopy(buf, 0, data, 0, pageSize); - PageFileTransaction.this.write(current, data); - - // Reset for the next page chunk - pos = 0; - // The page header marshalled after the data is written. - skip(Page.PAGE_HEADER_SIZE); - // Move the overflow data after the header. - System.arraycopy(buf, pageSize, buf, pos, oldPos-pageSize); - pos += oldPos-pageSize; - current = next; - - } else { - throw new PageOverflowIOException("Page overflow."); - } - } - - } - - @SuppressWarnings("unchecked") - @Override - public void close() throws IOException { - super.close(); - - // We need to free up the rest of the page chain.. - if( current.getType() == Page.PAGE_PART_TYPE ) { - free(current.getNext()); - } - - current.makePageEnd(pos, nextTxid.get()); - - // Write the header.. - pos = 0; - current.write(this); - - PageFileTransaction.this.write(current, buf); - } - }; - - // The page header marshaled after the data is written. - out.skip(Page.PAGE_HEADER_SIZE); - return out; - } - - /** - * @param page - * @param data - * @throws IOException - */ - @SuppressWarnings("unchecked") - private void write(final Page page, byte[] data) throws IOException { - Long key = page.getPageId(); - synchronized( writes ) { - // If it's not in the write cache... - PageWrite write = writes.get(key); - if( write==null ) { - write = new PageWrite(page, data); - writes.put(key, write); - } else { - write.setCurrent(page, data); - } - - // Once we start approaching capacity, notify the writer to start writing - if( canStartWriteBatch() ) { - if( enableAsyncWrites ) { - writes.notify(); - } else { - while( canStartWriteBatch() ) { - writeBatch(-1, TimeUnit.MILLISECONDS); - } - } - } - } - } - - /** - * @see org.apache.kahadb.page.Transaction#load(long, org.apache.kahadb.Marshaller) - */ - public Page load(long pageId, Marshaller marshaller) throws IOException { - assertLoaded(); - Page page = new Page(pageId); - load(page, marshaller); - return page; - } - - /** - * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller) - */ - public void load(Page page, Marshaller marshaller) throws IOException { - assertLoaded(); - - // Can't load invalid offsets... - if (page.getPageId() < 0) { - throw new Transaction.InvalidPageIOException("Page id is not valid", page.getPageId()); - } - - // Try to load it from the cache first... - Page t = getFromCache(page.getPageId()); - if (t != null) { - page.copy(t); - return; - } - - if( marshaller!=null ) { - // Full page read.. - InputStream is = openInputStream(page); - DataInputStream dataIn = new DataInputStream(is); - page.set(marshaller.readPayload(dataIn)); - is.close(); - } else { - // Page header read. - DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]); - readFile.seek(toOffset(page.getPageId())); - readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE); - page.read(in); - page.set(null); - } - - // Cache it. - if( marshaller!=null ) { - addToCache(page); - } - } - - /** - * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, org.apache.kahadb.Marshaller) - */ - public InputStream openInputStream(final Page p) throws IOException { - - return new InputStream() { - - private ByteSequence chunk = new ByteSequence(new byte[pageSize]); - private Page page = readPage(p); - private int pageCount=1; - - private Page markPage; - private ByteSequence markChunk; - - private Page readPage(Page page) throws IOException { - // Read the page data - readFile.seek(toOffset(page.getPageId())); - readFile.readFully(chunk.getData(), 0, pageSize); - chunk.setOffset(0); - chunk.setLength(pageSize); - - DataByteArrayInputStream in = new DataByteArrayInputStream(chunk); - page.read(in); - - chunk.setOffset(Page.PAGE_HEADER_SIZE); - if( page.getType() == Page.PAGE_END_TYPE ) { - chunk.setLength((int)(page.getNext())); - } - - if( page.getType() == Page.PAGE_FREE_TYPE ) { - throw new EOFException("Chunk stream does not exist at page: "+page.getPageId()); - } - - return page; - } - - public int read() throws IOException { - if (!atEOF()) { - return chunk.data[chunk.offset++] & 0xff; - } else { - return -1; - } - } - - private boolean atEOF() throws IOException { - if( chunk.offset < chunk.length ) { - return false; - } - if( page.getType() == Page.PAGE_END_TYPE ) { - return true; - } - fill(); - return chunk.offset >= chunk.length; - } - - private void fill() throws IOException { - page = readPage(new Page(page.getNext())); - pageCount++; - } - - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - public int read(byte b[], int off, int len) throws IOException { - if (!atEOF()) { - int rc=0; - while(!atEOF() && rc < len) { - len = Math.min(len, chunk.length - chunk.offset); - if (len > 0) { - System.arraycopy(chunk.data, chunk.offset, b, off, len); - chunk.offset += len; - } - rc+=len; - } - return rc; - } else { - return -1; - } - } - - public long skip(long len) throws IOException { - if (atEOF()) { - int rc=0; - while(!atEOF() && rc < len) { - len = Math.min(len, chunk.length - chunk.offset); - if (len > 0) { - chunk.offset += len; - } - rc+=len; - } - return rc; - } else { - return -1; - } - } - - public int available() { - return chunk.length - chunk.offset; - } - - public boolean markSupported() { - return true; - } - - public void mark(int markpos) { - markPage = page; - byte data[] = new byte[pageSize]; - System.arraycopy(chunk.getData(), 0, data, 0, pageSize); - markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength()); - } - - public void reset() { - page = markPage; - chunk = markChunk; - } - - }; - } - - - /** - * @see org.apache.kahadb.page.Transaction#iterator() - */ - @SuppressWarnings("unchecked") - public Iterator iterator() { - return (Iterator)iterator(false); - } - - /** - * @see org.apache.kahadb.page.Transaction#iterator(boolean) - */ - public Iterator iterator(final boolean includeFreePages) { - - assertLoaded(); - - return new Iterator() { - long nextId; - Page nextPage; - Page lastPage; - - private void findNextPage() { - if( !loaded.get() ) { - throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded"); - } - - if( nextPage!=null ) { - return; - } - - try { - while( nextId < PageFile.this.nextFreePageId ) { - - Page page = load(nextId, null); - - if( includeFreePages || page.getType()!=Page.PAGE_FREE_TYPE ) { - nextPage = page; - return; - } else { - nextId++; - } - } - } catch (IOException e) { - } - } - - public boolean hasNext() { - findNextPage(); - return nextPage !=null; - } - - public Page next() { - findNextPage(); - if( nextPage !=null ) { - lastPage = nextPage; - nextPage=null; - nextId++; - return lastPage; - } else { - throw new NoSuchElementException(); - } - } - - public void remove() { - if( lastPage==null ) { - throw new IllegalStateException(); - } - try { - free(lastPage); - lastPage=null; - } catch (IOException e) { - new RuntimeException(e); - } - } - }; - } - - /** - * @see org.apache.kahadb.page.Transaction#commit() - */ - public void commit() throws IOException { - } - - /** - * Rolls back the transaction. - */ - private void rollback() throws IOException { - } - - /** - * @see org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.Closure) - */ - public void execute(Closure closure) throws T, IOException { - boolean success=false; - try { - closure.execute(this); - success=true; - } finally { - if( success ) { - commit(); - } else { - rollback(); - } - } - } - - /** - * @see org.apache.kahadb.page.Transaction#execute(org.apache.kahadb.page.PageFile.CallableClosure) - */ - public R execute(CallableClosure closure) throws T, IOException { - boolean success=false; - try { - R rc = closure.execute(this); - success=true; - return rc; - } finally { - if( success ) { - commit(); - } else { - rollback(); - } - } - } - - /** - * @see org.apache.kahadb.page.Transaction#isReadOnly() - */ - public boolean isReadOnly() { - return false; - } - - public long getPageCount() { - return nextFreePageId; - } - - } - public Transaction tx() { assertLoaded(); - return new PageFileTransaction(); + return new Transaction(this); } /** @@ -977,7 +410,7 @@ // Internal Double write implementation follows... /////////////////////////////////////////////////////////////////// - private boolean canStartWriteBatch() { + boolean canStartWriteBatch() { int capacityUsed = ((writes.size() * 100)/MAX_PAGES_IN_RECOVERY_BUFFER); if( enableAsyncWrites ) { @@ -999,7 +432,7 @@ * @throws InterruptedException * @throws IOException */ - private boolean writeBatch(long timeout, TimeUnit unit) throws IOException { + boolean writeBatch(long timeout, TimeUnit unit) throws IOException { int batchLength=8+4; // Account for the: lastTxid + recovery record counter ArrayList batch = new ArrayList(MAX_PAGES_IN_RECOVERY_BUFFER); @@ -1212,14 +645,14 @@ return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre"); } - private long toOffset(long pageId) { + long toOffset(long pageId) { return initialPageOffset+(pageId*pageSize); } /** * @throws IllegalStateException if the page file is not loaded. */ - private void assertLoaded() throws IllegalStateException { + void assertLoaded() throws IllegalStateException { if( !loaded.get() ) { throw new IllegalStateException("PageFile is not loaded"); } @@ -1229,8 +662,7 @@ // Internal Cache Related operations /////////////////////////////////////////////////////////////////// - @SuppressWarnings("unchecked") - private Page getFromCache(long pageId) { + @SuppressWarnings("unchecked") Page getFromCache(long pageId) { synchronized(writes) { PageWrite pageWrite = writes.get(pageId); if( pageWrite != null ) { @@ -1245,13 +677,13 @@ return result; } - private void addToCache(Page page) { + void addToCache(Page page) { if (enablePageCaching) { pageCache.put(page.getPageId(), page); } } - private void removeFromCache(Page page) { + void removeFromCache(Page page) { if (enablePageCaching) { pageCache.remove(page.getPageId()); } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=693109&r1=693108&r2=693109&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Mon Sep 8 07:25:27 2008 @@ -16,18 +16,27 @@ */ package org.apache.kahadb.page; +import java.io.DataInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.concurrent.TimeUnit; import org.apache.kahadb.Marshaller; +import org.apache.kahadb.page.PageFile.PageWrite; +import org.apache.kahadb.util.ByteSequence; +import org.apache.kahadb.util.DataByteArrayInputStream; +import org.apache.kahadb.util.DataByteArrayOutputStream; +import org.apache.kahadb.util.Sequence; /** - * The interface used to read/update a PageFile object. Using a transaction allows you to + * The class used to read/update a PageFile object. Using a transaction allows you to * do multiple update operations in a single unit of work. */ -public interface Transaction extends Iterable { +public class Transaction implements Iterable { /** * @@ -70,7 +79,22 @@ public R execute(Transaction tx) throws T; } - public PageFile getPageFile(); + + private final PageFile pageFile; + + /** + * @param pageFile + */ + Transaction(PageFile pageFile) { + this.pageFile = pageFile; + } + + /** + * @see org.apache.kahadb.page.Transaction#getPageFile() + */ + public PageFile getPageFile() { + return this.pageFile; + } /** * Allocates a free page that you can write data to. @@ -81,7 +105,9 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public Page allocate() throws IOException; + public Page allocate() throws IOException { + return allocate(1); + } /** * Allocates a block of free pages that you can write data to. @@ -93,7 +119,44 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public Page allocate(int count) throws IOException; + public Page allocate(int count) throws IOException { + this.pageFile.assertLoaded(); + if (count <= 0) { + throw new IllegalArgumentException("The allocation count must be larger than zero"); + } + + Sequence seq = this.pageFile.freeList.removeFirstSequence(count); + + // We may need to create new free pages... + if (seq == null) { + + Page first = null; + int c = count; + while (c > 0) { + Page page = new Page(this.pageFile.nextFreePageId++); + page.makeFree(this.pageFile.nextTxid.get()); + + if (first == null) { + first = page; + } + + this.pageFile.addToCache(page); + DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize); + page.write(out); + write(page, out.getData()); + + // LOG.debug("allocate writing: "+page.getPageId()); + c--; + } + + return first; + } + + Page page = new Page(seq.getFirst()); + page.makeFree(0); + // LOG.debug("allocated: "+page.getPageId()); + return page; + } /** * Frees up a previously allocated page so that it can be re-allocated again. @@ -104,18 +167,24 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public void free(Page page) throws IOException; + public void free(long pageId) throws IOException { + free(load(pageId, null)); + } /** - * Frees up a previously allocated page so that it can be re-allocated again. + * Frees up a previously allocated sequence of pages so that it can be re-allocated again. + * + * @param page the initial page of the sequence that will be getting freed + * @param count the number of pages in the sequence * - * @param page the page to free up * @throws IOException * If an disk error occurred. * @throws IllegalStateException * if the PageFile is not loaded */ - public void free(long pageId) throws IOException; + public void free(long pageId, int count) throws IOException { + free(load(pageId, null), count); + } /** * Frees up a previously allocated sequence of pages so that it can be re-allocated again. @@ -128,20 +197,54 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public void free(Page page, int count) throws IOException; + public void free(Page page, int count) throws IOException { + this.pageFile.assertLoaded(); + long offsetPage = page.getPageId(); + for (int i = 0; i < count; i++) { + if (page == null) { + page = load(offsetPage + i, null); + } + free(page); + page = null; + } + } /** - * Frees up a previously allocated sequence of pages so that it can be re-allocated again. - * - * @param page the initial page of the sequence that will be getting freed - * @param count the number of pages in the sequence + * Frees up a previously allocated page so that it can be re-allocated again. * + * @param page the page to free up * @throws IOException * If an disk error occurred. * @throws IllegalStateException * if the PageFile is not loaded */ - public void free(long pageId, int count) throws IOException; + public void free(Page page) throws IOException { + this.pageFile.assertLoaded(); + + // We may need loop to free up a page chain. + while (page != null) { + + // Is it already free?? + if (page.getType() == Page.PAGE_FREE_TYPE) { + return; + } + + Page next = null; + if (page.getType() == Page.PAGE_PART_TYPE) { + next = load(page.getNext(), null); + } + + page.makeFree(this.pageFile.nextTxid.get()); + + DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize); + page.write(out); + write(page, out.getData()); + + this.pageFile.removeFromCache(page); + this.pageFile.freeList.add(page.getPageId()); + page = next; + } + } /** * @@ -160,7 +263,135 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public void store(Page page, Marshaller marshaller, boolean overflow) throws IOException, PageOverflowIOException; + public void store(Page page, Marshaller marshaller, final boolean overflow) throws IOException { + DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow); + if (marshaller != null) { + marshaller.writePayload(page.get(), out); + } + out.close(); + } + + /** + * @throws IOException + */ + public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException { + this.pageFile.assertLoaded(); + + // Copy to protect against the end user changing + // the page instance while we are doing a write. + final Page copy = page.copy(); + this.pageFile.addToCache(copy); + + // + // To support writing VERY large data, we override the output stream so + // that we + // we do the page writes incrementally while the data is being + // marshalled. + DataByteArrayOutputStream out = new DataByteArrayOutputStream(this.pageFile.pageSize * 2) { + Page current = copy; + + @SuppressWarnings("unchecked") + @Override + protected void onWrite() throws IOException { + + // Are we at an overflow condition? + if (pos >= Transaction.this.pageFile.pageSize) { + // If overflow is allowed + if (overflow) { + + Page next; + if (current.getType() == Page.PAGE_PART_TYPE) { + next = load(current.getNext(), null); + } else { + next = allocate(); + } + + next.txId = current.txId; + + // Write the page header + int oldPos = pos; + pos = 0; + + current.makePagePart(next.getPageId(), Transaction.this.pageFile.nextTxid.get()); + current.write(this); + + // Do the page write.. + byte[] data = new byte[Transaction.this.pageFile.pageSize]; + System.arraycopy(buf, 0, data, 0, Transaction.this.pageFile.pageSize); + Transaction.this.write(current, data); + + // Reset for the next page chunk + pos = 0; + // The page header marshalled after the data is written. + skip(Page.PAGE_HEADER_SIZE); + // Move the overflow data after the header. + System.arraycopy(buf, Transaction.this.pageFile.pageSize, buf, pos, oldPos - Transaction.this.pageFile.pageSize); + pos += oldPos - Transaction.this.pageFile.pageSize; + current = next; + + } else { + throw new PageOverflowIOException("Page overflow."); + } + } + + } + + @SuppressWarnings("unchecked") + @Override + public void close() throws IOException { + super.close(); + + // We need to free up the rest of the page chain.. + if (current.getType() == Page.PAGE_PART_TYPE) { + free(current.getNext()); + } + + current.makePageEnd(pos, Transaction.this.pageFile.nextTxid.get()); + + // Write the header.. + pos = 0; + current.write(this); + + Transaction.this.write(current, buf); + } + }; + + // The page header marshaled after the data is written. + out.skip(Page.PAGE_HEADER_SIZE); + return out; + } + + /** + * @param page + * @param data + * @throws IOException + */ + @SuppressWarnings("unchecked") + private void write(final Page page, byte[] data) throws IOException { + Long key = page.getPageId(); + synchronized (this.pageFile.writes) { + // If it's not in the write cache... + PageWrite write = this.pageFile.writes.get(key); + if (write == null) { + write = this.pageFile.new PageWrite(page, data); + this.pageFile.writes.put(key, write); + } else { + write.setCurrent(page, data); + } + + // Once we start approaching capacity, notify the writer to start + // writing + if (this.pageFile.canStartWriteBatch()) { + if (this.pageFile.enableAsyncWrites) { + this.pageFile.writes.notify(); + } else { + while (this.pageFile.canStartWriteBatch()) { + this.pageFile.writeBatch(-1, TimeUnit.MILLISECONDS); + } + } + } + } + } /** * Loads a page from disk. @@ -175,7 +406,12 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public Page load(long pageId, Marshaller marshaller) throws IOException; + public Page load(long pageId, Marshaller marshaller) throws IOException { + this.pageFile.assertLoaded(); + Page page = new Page(pageId); + load(page, marshaller); + return page; + } /** * Loads a page from disk. If the page.pageId is not valid then then this method will set the page.type to @@ -189,31 +425,163 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public void load(Page page, Marshaller marshaller) throws IOException; + public void load(Page page, Marshaller marshaller) throws IOException { + this.pageFile.assertLoaded(); + + // Can't load invalid offsets... + if (page.getPageId() < 0) { + throw new InvalidPageIOException("Page id is not valid", page.getPageId()); + } + + // Try to load it from the cache first... + Page t = this.pageFile.getFromCache(page.getPageId()); + if (t != null) { + page.copy(t); + return; + } + + if (marshaller != null) { + // Full page read.. + InputStream is = openInputStream(page); + DataInputStream dataIn = new DataInputStream(is); + page.set(marshaller.readPayload(dataIn)); + is.close(); + } else { + // Page header read. + DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]); + this.pageFile.readFile.seek(this.pageFile.toOffset(page.getPageId())); + this.pageFile.readFile.readFully(in.getRawData(), 0, Page.PAGE_HEADER_SIZE); + page.read(in); + page.set(null); + } + + // Cache it. + if (marshaller != null) { + this.pageFile.addToCache(page); + } + } - - /** - * - * @param page - * @param overflow - * @return - * @throws IOException - */ - public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException; - - /** - * - * @param p - * @return - * @throws IOException - */ - public InputStream openInputStream(final Page p) throws IOException; - /** - * @return the number of pages allocated in the PageFile + * @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page, + * org.apache.kahadb.Marshaller) */ - public long getPageCount(); - + public InputStream openInputStream(final Page p) throws IOException { + + return new InputStream() { + + private ByteSequence chunk = new ByteSequence(new byte[Transaction.this.pageFile.pageSize]); + private Page page = readPage(p); + private int pageCount = 1; + + private Page markPage; + private ByteSequence markChunk; + + private Page readPage(Page page) throws IOException { + // Read the page data + Transaction.this.pageFile.readFile.seek(Transaction.this.pageFile.toOffset(page.getPageId())); + Transaction.this.pageFile.readFile.readFully(chunk.getData(), 0, Transaction.this.pageFile.pageSize); + chunk.setOffset(0); + chunk.setLength(Transaction.this.pageFile.pageSize); + + DataByteArrayInputStream in = new DataByteArrayInputStream(chunk); + page.read(in); + + chunk.setOffset(Page.PAGE_HEADER_SIZE); + if (page.getType() == Page.PAGE_END_TYPE) { + chunk.setLength((int)(page.getNext())); + } + + if (page.getType() == Page.PAGE_FREE_TYPE) { + throw new EOFException("Chunk stream does not exist at page: " + page.getPageId()); + } + + return page; + } + + public int read() throws IOException { + if (!atEOF()) { + return chunk.data[chunk.offset++] & 0xff; + } else { + return -1; + } + } + + private boolean atEOF() throws IOException { + if (chunk.offset < chunk.length) { + return false; + } + if (page.getType() == Page.PAGE_END_TYPE) { + return true; + } + fill(); + return chunk.offset >= chunk.length; + } + + private void fill() throws IOException { + page = readPage(new Page(page.getNext())); + pageCount++; + } + + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + public int read(byte b[], int off, int len) throws IOException { + if (!atEOF()) { + int rc = 0; + while (!atEOF() && rc < len) { + len = Math.min(len, chunk.length - chunk.offset); + if (len > 0) { + System.arraycopy(chunk.data, chunk.offset, b, off, len); + chunk.offset += len; + } + rc += len; + } + return rc; + } else { + return -1; + } + } + + public long skip(long len) throws IOException { + if (atEOF()) { + int rc = 0; + while (!atEOF() && rc < len) { + len = Math.min(len, chunk.length - chunk.offset); + if (len > 0) { + chunk.offset += len; + } + rc += len; + } + return rc; + } else { + return -1; + } + } + + public int available() { + return chunk.length - chunk.offset; + } + + public boolean markSupported() { + return true; + } + + public void mark(int markpos) { + markPage = page; + byte data[] = new byte[Transaction.this.pageFile.pageSize]; + System.arraycopy(chunk.getData(), 0, data, 0, Transaction.this.pageFile.pageSize); + markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength()); + } + + public void reset() { + page = markPage; + chunk = markChunk; + } + + }; + } + /** * Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are * not included in this iteration. @@ -223,7 +591,10 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public Iterator iterator(); + @SuppressWarnings("unchecked") + public Iterator iterator() { + return (Iterator)iterator(false); + } /** * Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages @@ -234,13 +605,83 @@ * @throws IllegalStateException * if the PageFile is not loaded */ - public Iterator> iterator(final boolean includeFreePages); + public Iterator iterator(final boolean includeFreePages) { + + this.pageFile.assertLoaded(); + + return new Iterator() { + long nextId; + Page nextPage; + Page lastPage; + + private void findNextPage() { + if (!Transaction.this.pageFile.loaded.get()) { + throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded"); + } + + if (nextPage != null) { + return; + } + + try { + while (nextId < Transaction.this.pageFile.nextFreePageId) { + + Page page = load(nextId, null); + + if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) { + nextPage = page; + return; + } else { + nextId++; + } + } + } catch (IOException e) { + } + } + + public boolean hasNext() { + findNextPage(); + return nextPage != null; + } + + public Page next() { + findNextPage(); + if (nextPage != null) { + lastPage = nextPage; + nextPage = null; + nextId++; + return lastPage; + } else { + throw new NoSuchElementException(); + } + } + + public void remove() { + if (lastPage == null) { + throw new IllegalStateException(); + } + try { + free(lastPage); + lastPage = null; + } catch (IOException e) { + new RuntimeException(e); + } + } + }; + } /** * Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated * with the transaction are written to disk or none will. */ - public void commit() throws IOException; + public void commit() throws IOException { + } + + /** + * Rolls back the transaction. + */ + private void rollback() throws IOException { + } /** * Executes a closure and if it does not throw any exceptions, then it commits the transaction. @@ -251,7 +692,19 @@ * @throws T if the closure throws it * @throws IOException If the commit fails. */ - public void execute(Closure closure) throws T, IOException; + public void execute(Closure closure) throws T, IOException { + boolean success = false; + try { + closure.execute(this); + success = true; + } finally { + if (success) { + commit(); + } else { + rollback(); + } + } + } /** * Executes a closure and if it does not throw any exceptions, then it commits the transaction. @@ -262,12 +715,33 @@ * @throws T if the closure throws it * @throws IOException If the commit fails. */ - public R execute(CallableClosure closure) throws T, IOException; + public R execute(CallableClosure closure) throws T, IOException { + boolean success = false; + try { + R rc = closure.execute(this); + success = true; + return rc; + } finally { + if (success) { + commit(); + } else { + rollback(); + } + } + } /** - * * @return true if there are no uncommitted page file updates associated with this transaction. */ - public boolean isReadOnly(); + public boolean isReadOnly() { + return false; + } + + /** + * @return the number of pages allocated in the PageFile + */ + public long getPageCount() { + return this.pageFile.nextFreePageId; + } }