Author: chirino Date: Fri Sep 12 07:17:47 2008 New Revision: 694681 URL: http://svn.apache.org/viewvc?rev=694681&view=rev Log: The periodic checkpoint is enabled which will clean up old journal data files and fush outstanding index writes to disk. Also enabled the recovery buffer, using synced writes and the async thread writer as the perf looks good with them on still. Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=694681&r1=694680&r2=694681&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Fri Sep 12 07:17:47 2008 @@ -520,7 +520,7 @@ } } } else { - visitor.visit(keys, values); + visitor.visit(Arrays.asList(keys), Arrays.asList(values)); } } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=694681&r1=694680&r2=694681&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Fri Sep 12 07:17:47 2008 @@ -16,6 +16,8 @@ */ package org.apache.kahadb.index; +import java.util.List; + /** * Interface used to selectively visit the entries in a BTree. * @@ -39,6 +41,6 @@ * @param keys * @param values */ - void visit(Key[] keys, Value[] values); + void visit(List keys, List values); } \ No newline at end of file Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=694681&r1=694680&r2=694681&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Sep 12 07:17:47 2008 @@ -107,11 +107,11 @@ // Should first log the page write to the recovery buffer? Avoids partial // page write failures.. - private boolean enableRecoveryFile=false; + private boolean enableRecoveryFile=true; // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint() - private boolean enableSyncedWrites=false; + private boolean enableSyncedWrites=true; // Will writes be done in an async thread? - private boolean enableAsyncWrites=false; + private boolean enableAsyncWrites=true; // These are used if enableAsyncWrites==true private AtomicBoolean stopWriter = new AtomicBoolean(); @@ -427,18 +427,17 @@ if( writes.isEmpty()) { return; } - if( this.checkpointLatch == null ) { - this.checkpointLatch = new CountDownLatch(1); - } - checkpointLatch = this.checkpointLatch; if( enableAsyncWrites ) { + if( this.checkpointLatch == null ) { + this.checkpointLatch = new CountDownLatch(1); + } + checkpointLatch = this.checkpointLatch; writes.notify(); } else { - while( !writes.isEmpty() ) { - writeBatch(-1, TimeUnit.MILLISECONDS); - } + writeBatch(); + return; } - } + } try { checkpointLatch.await(); } catch (InterruptedException e) { @@ -811,9 +810,7 @@ if( enableAsyncWrites ) { writes.notify(); } else { - while( canStartWriteBatch() ) { - writeBatch(-1, TimeUnit.MILLISECONDS); - } + writeBatch(); } } } @@ -865,33 +862,46 @@ /////////////////////////////////////////////////////////////////// // Internal Double write implementation follows... /////////////////////////////////////////////////////////////////// - + /** + * + */ + private void pollWrites() { + try { + while( !stopWriter.get() ) { + // Wait for a notification... + synchronized( writes ) { + // If there is not enough to write, wait for a notification... + while( !canStartWriteBatch() && !stopWriter.get() ) { + writes.wait(100); + } + + if( writes.isEmpty() ) { + releaseCheckpointWaiter(); + } + } + writeBatch(); + } + } catch (Throwable e) { + e.printStackTrace(); + } finally { + releaseCheckpointWaiter(); + } + } + /** * * @param timeout * @param unit - * @return true if a write was done. + * @return true if there are still pending writes to do. * @throws InterruptedException * @throws IOException */ - private boolean writeBatch(long timeout, TimeUnit unit) throws IOException { - - ArrayList batch; - synchronized( writes ) { + private void writeBatch() throws IOException { + CountDownLatch checkpointLatch; + ArrayList batch; + synchronized( writes ) { // If there is not enough to write, wait for a notification... - if( !canStartWriteBatch() && timeout>=0 ) { - releaseCheckpointWaiter(); - try { - writes.wait(unit.toMillis(timeout)); - } catch (InterruptedException e) { - throw new InterruptedIOException(); - } - } - if( writes.isEmpty() ) { - releaseCheckpointWaiter(); - return false; - } batch = new ArrayList(writes.size()); // build a write batch from the current write cache. @@ -901,6 +911,11 @@ // page again without blocking for this write. write.begin(); } + + // Grab on to the existing checkpoint latch cause once we do this write we can + // release the folks that were waiting for those writes to hit disk. + checkpointLatch = this.checkpointLatch; + this.checkpointLatch=null; } @@ -964,12 +979,11 @@ writes.remove(w.page.getPageId()); } } - if( writes.isEmpty() ) { - releaseCheckpointWaiter(); - } } - return true; + if( checkpointLatch!=null ) { + checkpointLatch.countDown(); + } } private long recoveryFileSizeForPages(int pageCount) { @@ -1054,31 +1068,22 @@ synchronized( writes ) { if( enableAsyncWrites ) { stopWriter.set(false); - writerThread = new Thread("Page Writer") { + writerThread = new Thread("KahaDB Page Writer") { @Override public void run() { - try { - while( !stopWriter.get() ) { - writeBatch(1000, TimeUnit.MILLISECONDS); - } - } catch (Throwable e) { - e.printStackTrace(); - } finally { - releaseCheckpointWaiter(); - } + pollWrites(); } }; + writerThread.setPriority(Thread.MAX_PRIORITY); writerThread.start(); } } } private void stopWriter() throws InterruptedException { - synchronized( writes ) { - if( enableAsyncWrites ) { - stopWriter.set(true); - writerThread.join(); - } + if( enableAsyncWrites ) { + stopWriter.set(true); + writerThread.join(); } } Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=694681&r1=694680&r2=694681&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original) +++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri Sep 12 07:17:47 2008 @@ -150,6 +150,7 @@ protected boolean recovering; protected Thread checkpointThread; protected boolean syncWrites; + int checkpointInterval = 30*1000; protected AtomicBoolean started = new AtomicBoolean(); @@ -258,7 +259,7 @@ while (started.get()) { Thread.sleep(500); long now = System.currentTimeMillis(); - if( now - start >= 1000*1000 ) { + if( now - start >= checkpointInterval ) { checkpoint(); start = now; } @@ -663,9 +664,12 @@ */ private void checkpointUpdate(Transaction tx) throws IOException { + LOG.debug("Checkpoint started."); + // Find empty journal files to remove. final HashSet inUseFiles = new HashSet(); + for (StoredDestination sd : storedDestinations.values()) { // Use a visitor to cut down the number of pages that we load sd.orderIndex.visit(tx, new BTreeVisitor() { @@ -683,18 +687,19 @@ return true; } - public void visit(Location[] keys, String[] values) { - for (int i = 0; i < keys.length; i++) { - if( last == keys[i].getDataFileId() ) { - inUseFiles.add(keys[i].getDataFileId()); - last = keys[i].getDataFileId(); + public void visit(List keys, List values) { + for (int i = 0; i < keys.size(); i++) { + if( last != keys.get(i).getDataFileId() ) { + inUseFiles.add(keys.get(i).getDataFileId()); + last = keys.get(i).getDataFileId(); } } } + }); } - + metadata.state = OPEN_STATE; metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation(); tx.store(metadata.page, metadataMarshaller, true); @@ -703,7 +708,13 @@ if( metadata.firstInProgressTransactionLocation!=null ) { l = metadata.firstInProgressTransactionLocation; } + + LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l); + + pageFile.flush(); asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId()); + + LOG.debug("Checkpoint done."); } @@ -1058,4 +1069,12 @@ this.syncWrites = syncWrites; } + public int getCheckpointInterval() { + return checkpointInterval; + } + + public void setCheckpointInterval(int checkpointInterval) { + this.checkpointInterval = checkpointInterval; + } + } Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=694681&r1=694680&r2=694681&view=diff ============================================================================== --- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original) +++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Fri Sep 12 07:17:47 2008 @@ -19,6 +19,7 @@ import java.io.PrintWriter; import java.text.NumberFormat; import java.util.Iterator; +import java.util.List; import java.util.Map; import org.apache.kahadb.LongMarshaller; @@ -143,6 +144,36 @@ tx.commit(); } + + public void testVisitor() throws Exception { + createPageFileAndIndex(100); + BTreeIndex index = ((BTreeIndex)this.index); + this.index.load(tx); + tx.commit(); + + // Insert in reverse order.. + doInsert(1000); + + this.index.unload(tx); + tx.commit(); + this.index.load(tx); + tx.commit(); + + // BTree should iterate it in sorted order. + + index.visit(tx, new BTreeVisitor(){ + public boolean isInterestedInKeysBetween(String first, String second) { + return true; + } + public void visit(List keys, List values) { + } + }); + + + this.index.unload(tx); + tx.commit(); + } + void doInsertReverse(int count) throws Exception { for (int i = count-1; i >= 0; i--) { index.put(tx, key(i), (long)i);