activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r694681 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/index/ main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/store/ test/java/org/apache/kahadb/index/
Date Fri, 12 Sep 2008 14:17:48 GMT
Author: chirino
Date: Fri Sep 12 07:17:47 2008
New Revision: 694681

URL: http://svn.apache.org/viewvc?rev=694681&view=rev
Log:
The periodic checkpoint is enabled which will clean up old journal data files and fush outstanding
index writes to disk.
Also enabled the recovery buffer, using synced writes and the async thread writer as the perf
looks good with them on still.


Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Fri Sep 12
07:17:47 2008
@@ -520,7 +520,7 @@
                 }
             }
         } else {
-            visitor.visit(keys, values);
+            visitor.visit(Arrays.asList(keys), Arrays.asList(values));
         }
     }
     

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java Fri Sep
12 07:17:47 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.kahadb.index;
 
+import java.util.List;
+
 /**
  * Interface used to selectively visit the entries in a BTree.
  * 
@@ -39,6 +41,6 @@
      * @param keys
      * @param values
      */
-    void visit(Key[] keys, Value[] values);
+    void visit(List<Key> keys, List<Value> values);
     
 }
\ No newline at end of file

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Fri Sep 12
07:17:47 2008
@@ -107,11 +107,11 @@
     
     // Should first log the page write to the recovery buffer? Avoids partial
     // page write failures..
-    private boolean enableRecoveryFile=false;
+    private boolean enableRecoveryFile=true;
     // Will we sync writes to disk. Ensures that data will not be lost after a checkpoint()
-    private boolean enableSyncedWrites=false;
+    private boolean enableSyncedWrites=true;
     // Will writes be done in an async thread?
-    private boolean enableAsyncWrites=false;
+    private boolean enableAsyncWrites=true;
 
     // These are used if enableAsyncWrites==true 
     private AtomicBoolean stopWriter = new AtomicBoolean();
@@ -427,18 +427,17 @@
             if( writes.isEmpty()) {                
                 return;
             }
-            if( this.checkpointLatch == null ) {
-                this.checkpointLatch = new CountDownLatch(1);
-            }
-            checkpointLatch = this.checkpointLatch;
             if( enableAsyncWrites ) {
+                if( this.checkpointLatch == null ) {
+                    this.checkpointLatch = new CountDownLatch(1);
+                }
+                checkpointLatch = this.checkpointLatch;
                 writes.notify();
             } else {
-                while( !writes.isEmpty() ) {
-                    writeBatch(-1, TimeUnit.MILLISECONDS);
-                }
+                writeBatch();
+                return;
             }
-        }        
+        }
         try {
             checkpointLatch.await();        
         } catch (InterruptedException e) {
@@ -811,9 +810,7 @@
                 if( enableAsyncWrites  ) {
                     writes.notify();
                 } else {
-                    while( canStartWriteBatch() ) {
-                        writeBatch(-1, TimeUnit.MILLISECONDS);
-                    }
+                    writeBatch();
                 }
             }
         }            
@@ -865,33 +862,46 @@
     ///////////////////////////////////////////////////////////////////
     // Internal Double write implementation follows...
     ///////////////////////////////////////////////////////////////////
-    
+    /**
+     * 
+     */
+    private void pollWrites() {
+        try {
+            while( !stopWriter.get() ) {
+                // Wait for a notification...
+                synchronized( writes ) {   
+                    // If there is not enough to write, wait for a notification...
+                    while( !canStartWriteBatch() && !stopWriter.get() ) {
+                        writes.wait(100);
+                    }
+                    
+                    if( writes.isEmpty() ) {
+                        releaseCheckpointWaiter();
+                    }
+                }
+                writeBatch();
+            }
+        } catch (Throwable e) {
+            e.printStackTrace();
+        } finally {
+            releaseCheckpointWaiter();
+        }
+    }
+
     /**
      * 
      * @param timeout
      * @param unit
-     * @return true if a write was done.
+     * @return true if there are still pending writes to do.
      * @throws InterruptedException 
      * @throws IOException 
      */
-    private boolean writeBatch(long timeout, TimeUnit unit) throws IOException {
-                
-        ArrayList<PageWrite> batch;
-        synchronized( writes ) {   
+    private void writeBatch() throws IOException {
             
+        CountDownLatch checkpointLatch;
+        ArrayList<PageWrite> batch;
+        synchronized( writes ) {
             // If there is not enough to write, wait for a notification...
-            if( !canStartWriteBatch() && timeout>=0 ) {
-                releaseCheckpointWaiter();
-                try {
-                    writes.wait(unit.toMillis(timeout));
-                } catch (InterruptedException e) {
-                    throw new InterruptedIOException();
-                }
-            }
-            if( writes.isEmpty() ) {
-                releaseCheckpointWaiter();
-                return false;
-            }
 
             batch = new ArrayList<PageWrite>(writes.size());
             // build a write batch from the current write cache. 
@@ -901,6 +911,11 @@
                 // page again without blocking for this write.
                 write.begin();
             }
+
+            // Grab on to the existing checkpoint latch cause once we do this write we can

+            // release the folks that were waiting for those writes to hit disk.
+            checkpointLatch = this.checkpointLatch;
+            this.checkpointLatch=null;
         }
         
  
@@ -964,12 +979,11 @@
                     writes.remove(w.page.getPageId());
                 }
             }
-            if( writes.isEmpty() ) {
-                releaseCheckpointWaiter();
-            }
         }
         
-        return true;
+        if( checkpointLatch!=null ) {
+            checkpointLatch.countDown();
+        }
     }
 
     private long recoveryFileSizeForPages(int pageCount) {
@@ -1054,31 +1068,22 @@
         synchronized( writes ) {
             if( enableAsyncWrites ) {
                 stopWriter.set(false);
-                writerThread = new Thread("Page Writer") {
+                writerThread = new Thread("KahaDB Page Writer") {
                     @Override
                     public void run() {
-                        try {
-                            while( !stopWriter.get() ) {
-                                writeBatch(1000, TimeUnit.MILLISECONDS);
-                            }
-                        } catch (Throwable e) {
-                            e.printStackTrace();
-                        } finally {
-                            releaseCheckpointWaiter();
-                        }
+                        pollWrites();
                     }
                 };
+                writerThread.setPriority(Thread.MAX_PRIORITY);
                 writerThread.start();
             }
         }
     }
  
     private void stopWriter() throws InterruptedException {
-        synchronized( writes ) {
-            if( enableAsyncWrites ) {
-                stopWriter.set(true);
-                writerThread.join();
-            }
+        if( enableAsyncWrites ) {
+            stopWriter.set(true);
+            writerThread.join();
         }
     }
 

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Fri
Sep 12 07:17:47 2008
@@ -150,6 +150,7 @@
     protected boolean recovering;
     protected Thread checkpointThread;
     protected boolean syncWrites;
+    int checkpointInterval = 30*1000;
     
     protected AtomicBoolean started = new AtomicBoolean();
 
@@ -258,7 +259,7 @@
                     while (started.get()) {
                         Thread.sleep(500);
                         long now = System.currentTimeMillis();
-                        if( now - start >= 1000*1000 ) {
+                        if( now - start >= checkpointInterval ) {
                             checkpoint();
                             start = now;
                         }
@@ -663,9 +664,12 @@
      */
     private void checkpointUpdate(Transaction tx) throws IOException {
 
+        LOG.debug("Checkpoint started.");
+
         // Find empty journal files to remove.
         final HashSet<Integer> inUseFiles = new HashSet<Integer>();
         
+        
         for (StoredDestination sd : storedDestinations.values()) {
             // Use a visitor to cut down the number of pages that we load
             sd.orderIndex.visit(tx, new BTreeVisitor<Location, String>() {
@@ -683,18 +687,19 @@
                     return true;
                 }
 
-                public void visit(Location[] keys, String[] values) {
-                    for (int i = 0; i < keys.length; i++) {
-                        if( last == keys[i].getDataFileId() ) {
-                            inUseFiles.add(keys[i].getDataFileId());
-                            last = keys[i].getDataFileId();
+                public void visit(List<Location> keys, List<String> values) {
+                    for (int i = 0; i < keys.size(); i++) {
+                        if( last != keys.get(i).getDataFileId() ) {
+                            inUseFiles.add(keys.get(i).getDataFileId());
+                            last = keys.get(i).getDataFileId();
                         }
                     }
                     
                 }
+
             });
         }
-        
+                
         metadata.state = OPEN_STATE;
         metadata.firstInProgressTransactionLocation = getFirstInProgressTxLocation();
         tx.store(metadata.page, metadataMarshaller, true);
@@ -703,7 +708,13 @@
         if( metadata.firstInProgressTransactionLocation!=null ) {
             l = metadata.firstInProgressTransactionLocation;
         }
+        
+        LOG.debug("In use files: "+inUseFiles+", lastUpdate: "+l);
+
+        pageFile.flush();
         asyncDataManager.consolidateDataFilesNotIn(inUseFiles, l==null?null:l.getDataFileId());
+        
+        LOG.debug("Checkpoint done.");
     }
 
 
@@ -1058,4 +1069,12 @@
         this.syncWrites = syncWrites;
     }
 
+    public int getCheckpointInterval() {
+        return checkpointInterval;
+    }
+
+    public void setCheckpointInterval(int checkpointInterval) {
+        this.checkpointInterval = checkpointInterval;
+    }
+
 }

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=694681&r1=694680&r2=694681&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Fri
Sep 12 07:17:47 2008
@@ -19,6 +19,7 @@
 import java.io.PrintWriter;
 import java.text.NumberFormat;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.kahadb.LongMarshaller;
@@ -143,6 +144,36 @@
         tx.commit();
     }
     
+    
+    public void testVisitor() throws Exception {
+        createPageFileAndIndex(100);
+        BTreeIndex<String,Long> index = ((BTreeIndex<String,Long>)this.index);
+        this.index.load(tx);
+        tx.commit();
+          
+        // Insert in reverse order..
+        doInsert(1000);
+        
+        this.index.unload(tx);
+        tx.commit();
+        this.index.load(tx);
+        tx.commit();
+
+        // BTree should iterate it in sorted order.
+        
+        index.visit(tx, new BTreeVisitor<String, Long>(){
+            public boolean isInterestedInKeysBetween(String first, String second) {
+                return true;
+            }
+            public void visit(List<String> keys, List<Long> values) {
+            }
+        });
+        
+
+        this.index.unload(tx);
+        tx.commit();
+    }
+    
     void doInsertReverse(int count) throws Exception {
         for (int i = count-1; i >= 0; i--) {
             index.put(tx, key(i), (long)i);



Mime
View raw message