activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r829372 - in /activemq/sandbox/activemq-apollo/hawtdb/src: main/java/org/apache/hawtdb/internal/page/ test/java/org/apache/hawtdb/internal/page/
Date Sat, 24 Oct 2009 13:59:02 GMT
Author: chirino
Date: Sat Oct 24 13:59:01 2009
New Revision: 829372

URL: http://svn.apache.org/viewvc?rev=829372&view=rev
Log:
better doco.. fixed a bug with how new rev numbers were selected.

Modified:
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java
    activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java Sat Oct 24 13:59:01 2009
@@ -36,7 +36,7 @@
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Batch extends LinkedNode<Batch> implements Externalizable, Iterable<Commit> {
+final class Batch extends LinkedNode<Batch> implements Externalizable, Iterable<Commit> {
     private static final long serialVersionUID = 1188640492489990493L;
     
     /** the pageId that this redo batch is stored at */
@@ -49,7 +49,7 @@
     /** the commits and snapshots in the redo */ 
     final LinkedNodeList<BatchEntry> entries = new LinkedNodeList<BatchEntry>();
     /** tracks how many snapshots are referencing the redo */
-    int references;
+    int snapshots;
     /** the oldest commit in this redo */
     public long base=-1;
     /** the newest commit in this redo */
@@ -69,7 +69,7 @@
     }
 
     public String toString() { 
-        return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+references+", entries: "+entries.size()+" }";
+        return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+snapshots+", entries: "+entries.size()+" }";
     }
     
     @Override
@@ -175,7 +175,7 @@
         }
     }
 
-    public void freeRedoSpace(SimpleAllocator allocator) {
+    public void release(SimpleAllocator allocator) {
         for (Commit commit : this) {
             for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
                 int key = entry.getKey();

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java Sat Oct 24 13:59:01 2009
@@ -25,11 +25,11 @@
  */
 abstract class BatchEntry extends LinkedNode<BatchEntry> {
     
-    Commit isCommit() {
+    public Commit isCommit() {
         return null;
     }
     
-    SnapshotHead isSnapshotHead() {
+    public SnapshotHead isSnapshotHead() {
         return null;
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java Sat Oct 24 13:59:01 2009
@@ -56,7 +56,7 @@
     
     
     @Override
-    Commit isCommit() {
+    public Commit isCommit() {
         return this;
     }
 

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java Sat Oct 24 13:59:01 2009
@@ -77,10 +77,10 @@
         public final Signed32 page_size = new Signed32();
         /** The page location of the free page list */
         public final Signed32 free_list_page = new Signed32();
-        /** points at the latest redo page which is guaranteed to be fully stored */
-        public final Signed32 redo_page = new Signed32();
-        /** The page location of the latest redo page. Not guaranteed to be fully stored */ 
-        public final Signed32 unsynced_redo_page = new Signed32();
+        /** points at the latest batch page which is guaranteed to be fully stored */
+        public final Signed32 stored_batch_page = new Signed32();
+        /** The page location of the latest batch page. Not guaranteed to be fully stored */ 
+        public final Signed32 storing_batch_page = new Signed32();
         
         /** The size of all the previous fields */
         private static final int USED_FIELDS_SIZE = 32 + 8 + 4 + 4 + 4 + 4;
@@ -95,58 +95,63 @@
         public String toString() { 
             return "{ base_revision: "+this.base_revision.get()+
             ", page_size: "+page_size.get()+", free_list_page: "+free_list_page.get()+
-            ", redo_page: "+unsynced_redo_page.get()+", checksum: "+checksum.get()+
+            ", storing batch: "+storing_batch_page.get()+", checksum: "+checksum.get()+
             " }";
         }
     }
+    /** The header structure of the file */
+    private final Header header = new Header();
+    private final LinkedNodeList<Batch> batches = new LinkedNodeList<Batch>();
 
     private final MemoryMappedFile file;
     final SimpleAllocator allocator;
     final PageFile pageFile;
     private static final int updateBatchSize = 1024;
     private final boolean synch;
-
-    /** The header structure of the file */
-    private final Header header = new Header();
-    
-    int lastRedoPage = -1;
-    
-    private final LinkedNodeList<Batch> redos = new LinkedNodeList<Batch>();
+    private int lastBatchPage = -1;
     
     //
-    // The following Redo objects point to linked nodes in the previous redo list.  
-    // They are used to track designate the state of the redo object.
+    // The following batch objects point to linked nodes in the previous batch list.  
+    // They are used to track/designate the state of the batch object.
     //
     
-    /** The current redo that is currently being built */
-    Batch buildingRedo;
-    /** The stored redos.  These might be be recoverable. */
-    Batch storedRedos;
-    /** The synced redos.  A file sync occurred after these redos were stored. */
-    Batch syncedRedos;
-    /** The performed redos.  Updates are actually performed to the original page file. */
-    Batch performedRedos;
+    /** The current batch that is currently being assembled. */
+    Batch openBatch;
+    /** The batches that are being stored... These might be be recoverable. */
+    Batch storingBatches;
+    /** The stored batches. */
+    Batch storedBatches;
+    /** The performed batches.  Page updates have been copied from the redo pages to the original page locations. */
+    Batch performedBatches;
     
     /** Used as read cache */
     ReadCache readCache = new ReadCache();
 
-    /** Mutex for data structures which are used during house keeping tasks like redo management. Once acquired, you can also acquire the TRANSACTION_MUTEX */
+    /** 
+     * Mutex for data structures which are used during house keeping tasks like batch
+     * management. Once acquired, you can also acquire the TRANSACTION_MUTEX 
+     */
     private final Object HOUSE_KEEPING_MUTEX = "HOUSE_KEEPING_MUTEX";
 
-    /** Mutex for data structures which transaction threads access. Never attempt to acquire the HOUSE_KEEPING_MUTEX once this mutex is acquired.  */
+    /** 
+     * Mutex for data structures which transaction threads access. Never attempt to 
+     * acquire the HOUSE_KEEPING_MUTEX once this mutex is acquired.  
+     */
     final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX";
     
-
     /**
-     * This is the free page list at the base revision.  It does not track allocations in transactions
-     * or committed updates.  Only when the updates are squashed will this list be updated.
+     * This is the free page list at the base revision.  It does not 
+     * track allocations in transactions or committed updates.  Only 
+     * when the updates are performed will this list be updated.
      * 
-     * The main purpose of this list is to initialize the free list on recovery.
+     * The main purpose of this list is to initialize the free list 
+     * on recovery.
      * 
-     * This does not track the space associated with redo batches and free lists.  On 
-     * recovery that space is discovered and tracked in the allocator.
+     * This does not track the space associated with batch lists 
+     * and free lists.  On recovery that space is discovered and 
+     * tracked in the page file allocator.
      */
-    private Ranges baseRevisionFreePages = new Ranges();
+    private Ranges storedFreeList = new Ranges();
     
     public HawtPageFile(HawtPageFileFactory factory) {
         this.pageFile = factory.getPageFile();
@@ -160,23 +165,23 @@
     @Override
     public String toString() {
         return "{\n" +
-        		"  allocator: "+allocator+ ",\n"+
-        		"  synch: "+synch+ ",\n"+
-        		"  read cache size: "+readCache.map.size()+ ",\n"+
-        		"  base revision free pages: "+baseRevisionFreePages + ",\n"+
-        		"  redos: {\n"+ 
-        		"    performed: "+toString(performedRedos, syncedRedos) + ",\n"+ 
-        		"    synced: "+toString(syncedRedos, storedRedos) + ",\n"+
-        		"    stored: "+toString(storedRedos, buildingRedo)+ ",\n"+
-        		"    building: "+toString(buildingRedo, null)+ ",\n"+
-        		"  }"+ "\n"+
-        		"}";
+    		"  allocator: "+allocator+ ",\n"+
+    		"  synch: "+synch+ ",\n"+
+    		"  read cache size: "+readCache.map.size()+ ",\n"+
+    		"  base revision free pages: "+storedFreeList + ",\n"+
+    		"  batches: {\n"+ 
+    		"    performed: "+toString(performedBatches, storedBatches) + ",\n"+ 
+    		"    stored: "+toString(storedBatches, storingBatches) + ",\n"+
+    		"    storing: "+toString(storingBatches, openBatch)+ ",\n"+
+    		"    open: "+toString(openBatch, null)+ ",\n"+
+    		"  }"+ "\n"+
+    		"}";
     }
 
     /** 
      * @param from
      * @param to
-     * @return string representation of the redo items from the specified redo up to (exclusive) the specified redo.
+     * @return string representation of the batch items from the specified batch up to (exclusive) the specified batch.
      */
     private String toString(Batch from, Batch to) {
         StringBuilder rc = new StringBuilder();
@@ -206,7 +211,7 @@
      */
     void commit(Snapshot snapshot, ConcurrentHashMap<Integer, Update> pageUpdates) {
         
-        boolean fullRedo=false;
+        boolean fullBatch=false;
         synchronized (TRANSACTION_MUTEX) {
             
             // we need to figure out the revision id of the this commit...
@@ -222,17 +227,12 @@
                 rev = snapshot.getHead().commitCheck(pageUpdates);
                 snapshot.close();
             } else {
-                rev = buildingRedo.head;
+                rev = openBatch.head;
             }
             rev++;
 
-            if( buildingRedo.base == -1 ) {
-                buildingRedo.base = rev;
-            }
-            buildingRedo.head = rev;
-            
             Commit commit=null;
-            BatchEntry last = buildingRedo.entries.getTail();
+            BatchEntry last = openBatch.entries.getTail();
             if( last!=null ) {
                 commit = last.isCommit();
             }
@@ -241,20 +241,26 @@
                 // TODO: figure out how to do the merge outside the TRANSACTION_MUTEX
                 commit.merge(pageFile.allocator(), rev, pageUpdates);
             } else {
-                buildingRedo.entries.addLast(new Commit(rev, pageUpdates) );
+                openBatch.entries.addLast(new Commit(rev, pageUpdates) );
+            }
+            
+            if( openBatch.base == -1 ) {
+                openBatch.base = rev;
             }
+            openBatch.head = rev;
+
             
-            if( buildingRedo.pageCount() > updateBatchSize ) {
-                fullRedo = true;
+            if( openBatch.pageCount() > updateBatchSize ) {
+                fullBatch = true;
             }
         }
         
-        if( fullRedo ) {
+        if( fullBatch ) {
             synchronized (HOUSE_KEEPING_MUTEX) {
-                storeRedos(false);
+                storeBatches(false);
                 // TODO: do the following actions async.
-                syncRedos();
-                performRedos();
+                syncBatches();
+                performBatches();
             }
         }
     }
@@ -265,16 +271,16 @@
      */
     public void reset() {
         synchronized (HOUSE_KEEPING_MUTEX) {
-            redos.clear();
-            performedRedos = syncedRedos = storedRedos = buildingRedo = new Batch(-1);
-            redos.addFirst(buildingRedo);
+            batches.clear();
+            performedBatches = storedBatches = storingBatches = openBatch = new Batch(-1);
+            batches.addFirst(openBatch);
             
-            lastRedoPage = -1;
+            lastBatchPage = -1;
             readCache.clear();
             
             allocator.clear(); 
-            baseRevisionFreePages.clear();
-            baseRevisionFreePages.add(0, allocator.getLimit());
+            storedFreeList.clear();
+            storedFreeList.add(0, allocator.getLimit());
     
             // Initialize the file header..
             Header h = header();
@@ -284,21 +290,21 @@
             h.free_list_page.set(-1);
             h.page_size.set(pageFile.getPageSize());
             h.reserved.set("");
-            h.unsynced_redo_page.set(-1);
+            h.storing_batch_page.set(-1);
             replicateHeader();
         }
     }    
     /**
-     * Loads an existing file and replays the redo
+     * Loads an existing file and replays the batch
      * logs to put it in a consistent state.
      */
     public void recover() {
         synchronized (HOUSE_KEEPING_MUTEX) {
 
-            redos.clear();
-            performedRedos = syncedRedos = storedRedos = buildingRedo = new Batch(-1);
-            redos.addFirst(buildingRedo);
-            lastRedoPage = -1;
+            batches.clear();
+            performedBatches = storedBatches = storingBatches = openBatch = new Batch(-1);
+            batches.addFirst(openBatch);
+            lastBatchPage = -1;
             readCache.clear();
     
             Header h = header();
@@ -311,19 +317,19 @@
             // Initialize the free page list.
             int pageId = h.free_list_page.get();
             if( pageId >= 0 ) {
-                baseRevisionFreePages = loadObject(pageId);
-                allocator.copy(baseRevisionFreePages);
+                storedFreeList = loadObject(pageId);
+                allocator.copy(storedFreeList);
                 Extent.unfree(pageFile, pageId);
             } else {
                 allocator.clear(); 
-                baseRevisionFreePages.add(0, allocator.getLimit());
+                storedFreeList.add(0, allocator.getLimit());
             }
             
             boolean consistencyCheckNeeded=true;
-            int last_synced_redo = h.redo_page.get();
-            pageId = h.unsynced_redo_page.get();
+            int last_synced_batch = h.stored_batch_page.get();
+            pageId = h.storing_batch_page.get();
             if( pageId<0 ) {
-                pageId = last_synced_redo;
+                pageId = last_synced_batch;
                 consistencyCheckNeeded = false;
             }
             while( true ) {
@@ -333,26 +339,26 @@
 
                 if( consistencyCheckNeeded ) {
                     // TODO: when consistencyCheckNeeded==true, then we need to check the
-                    // Consistency of the redo, as it may have been partially written to disk.
+                    // Consistency of the batch, as it may have been partially written to disk.
                 }
                 
                 
-                Batch redo = loadObject(pageId); 
-                redo.page = pageId;
-                redo.recovered = true;
+                Batch batch = loadObject(pageId); 
+                batch.page = pageId;
+                batch.recovered = true;
                 Extent.unfree(pageFile, pageId);
                 
-                if( buildingRedo.head == -1 ) {
-                    buildingRedo.head = redo.head;
+                if( openBatch.head == -1 ) {
+                    openBatch.head = batch.head;
                 }
     
-                if( baseRevision < redo.head ) {
-                    // add first since we are loading redo objects oldest to youngest
+                if( baseRevision < batch.head ) {
+                    // add first since we are loading batch objects oldest to youngest
                     // but want to put them in the list youngest to oldest.
-                    redos.addFirst(redo);
-                    performedRedos = syncedRedos = redo;
-                    pageId=redo.previous;
-                    if( pageId==last_synced_redo ) {
+                    batches.addFirst(batch);
+                    performedBatches = storedBatches = batch;
+                    pageId=batch.previous;
+                    if( pageId==last_synced_batch ) {
                         consistencyCheckNeeded = false;
                     }
                 } else {
@@ -360,8 +366,8 @@
                 }
             }
             
-            // Apply all the redos..
-            performRedos();
+            // Apply all the batches..
+            performBatches();
         }        
     }
 
@@ -372,63 +378,63 @@
      */
     public void flush() {
         synchronized (HOUSE_KEEPING_MUTEX) {
-            storeRedos(true);
-            syncRedos();
+            storeBatches(true);
+            syncBatches();
         }
     }   
     
     // /////////////////////////////////////////////////////////////////
     //
-    // Methods which transition redos through their life cycle states;
+    // Methods which transition bathes through their life cycle states;
     //
-    //    building -> stored -> synced -> performed -> released
+    //    open -> storing -> stored -> performed -> released
     //
     // The HOUSE_KEEPING_MUTEX must be acquired before being called. 
     //
     // /////////////////////////////////////////////////////////////////
     
     /**
-     * Attempts to perform a redo state change: building -> stored
+     * Attempts to perform a batch state change: open -> storing
      */
-    private void storeRedos(boolean force) {
-        Batch redo;
+    private void storeBatches(boolean force) {
+        Batch batch;
         
         // We synchronized /w the transactions so that they see the state change.
         synchronized (TRANSACTION_MUTEX) {
-            // Re-checking since storing the redo may not be needed.
-            if( (force && buildingRedo.base!=-1 ) || buildingRedo.pageCount() > updateBatchSize ) {
-                redo = buildingRedo;
-                buildingRedo = new Batch(redo.head);
-                redos.addLast(buildingRedo);
+            // Re-checking since storing the batch may not be needed.
+            if( (force && openBatch.base!=-1 ) || openBatch.pageCount() > updateBatchSize ) {
+                batch = openBatch;
+                openBatch = new Batch(batch.head);
+                batches.addLast(openBatch);
             } else {
                 return;
             }
         }
         
         // Write any outstanding deferred cache updates...
-        redo.performDefferedUpdates(pageFile);
+        batch.performDefferedUpdates(pageFile);
 
-        // Link it to the last redo.
-        redo.previous = lastRedoPage; 
+        // Link it to the last batch.
+        batch.previous = lastBatchPage; 
         
-        // Store the redo record.
-        lastRedoPage = redo.page = storeObject(redo);
+        // Store the batch record.
+        lastBatchPage = batch.page = storeObject(batch);
 
-        // Update the header to know about the new redo page.
-        header().unsynced_redo_page.set(redo.page);
+        // Update the header to know about the new batch page.
+        header().storing_batch_page.set(batch.page);
         replicateHeader();
     }
     
     /**
      * Performs a file sync. 
      * 
-     * This allows two types of redo state changes to occur:
+     * This allows two types of batch state changes to occur:
      * <ul>
-     * <li> stored -> synced
+     * <li> storing -> stored
      * <li> performed -> released
      * </ul>
      */
-    private void syncRedos() {
+    private void syncBatches() {
 
         // This is a slow operation..
         if( synch ) {
@@ -437,50 +443,50 @@
         Header h = header();
 
         // Update the base_revision with the last performed revision.
-        if (performedRedos!=syncedRedos) {
-            Batch lastPerformedRedo = syncedRedos.getPrevious();
-            h.base_revision.set(lastPerformedRedo.head);
+        if (performedBatches!=storedBatches) {
+            Batch lastPerformedBatch = storedBatches.getPrevious();
+            h.base_revision.set(lastPerformedBatch.head);
         }
 
-        // Were there some redos in the stored state?
-        if (storedRedos!=buildingRedo) {
+        // Were there some batches in the stored state?
+        if (storingBatches!=openBatch) {
             
             // The last stored is actually synced now..
-            Batch lastStoredRedo = buildingRedo.getPrevious();
+            Batch lastStoredBatch = openBatch.getPrevious();
             // Let the header know about it..
-            h.redo_page.set(lastStoredRedo.page);
+            h.stored_batch_page.set(lastStoredBatch.page);
             
             // We synchronized /w the transactions so that they see the state change.
             synchronized (TRANSACTION_MUTEX) {
                 // Transition stored -> synced.
-                storedRedos = buildingRedo;
+                storingBatches = openBatch;
             }
         }
         
-        // Once a redo has been performed, subsequently synced, and no longer referenced,
+        // Once a batch has been performed, subsequently synced, and no longer referenced,
         // it's allocated recovery space can be released.
-        while( performedRedos!=syncedRedos ) {
-            if( performedRedos.references!=0 ) {
+        while( performedBatches!=storedBatches ) {
+            if( performedBatches.snapshots!=0 ) {
                 break;
             }
             
-            // Free the update pages associated with the redo.
-            performedRedos.freeRedoSpace(allocator);
+            // Free the update pages associated with the batch.
+            performedBatches.release(allocator);
             
-            // Free the redo record itself.
-            Extent.free(pageFile, performedRedos.page);
+            // Free the batch record itself.
+            Extent.free(pageFile, performedBatches.page);
             
-            // don't need to sync /w transactions since they don't use the performedRedos variable.
+            // don't need to sync /w transactions since they don't use the performedBatches variable.
             // Transition performed -> released
-            performedRedos = performedRedos.getNext();
+            performedBatches = performedBatches.getNext();
             
-            // removes the released redo form the redo list.
-            performedRedos.getPrevious().unlink();
+            // removes the released batch form the batch list.
+            performedBatches.getPrevious().unlink();
         }
 
         // Store the free list..
         int previousFreeListPage = h.free_list_page.get();
-        h.free_list_page.set(storeObject(baseRevisionFreePages));
+        h.free_list_page.set(storeObject(storedFreeList));
         replicateHeader();
 
         // Release the previous free list.
@@ -490,36 +496,36 @@
     }
 
     /**
-     * Attempts to perform a redo state change: synced -> performed
+     * Attempts to perform a batch state change: stored -> performed
      * 
-     * Once a redo is performed, new snapshots will not reference 
-     * the redo anymore.
+     * Once a batch is performed, new snapshots will not reference 
+     * the batch anymore.
      */
-    public void performRedos() {
+    public void performBatches() {
 
-        if( syncedRedos==storedRedos ) {
-            // There are no redos in the synced state for use to transition.
+        if( storedBatches==storingBatches ) {
+            // There are no batches in the synced state for use to transition.
             return;
         }
               
-        // The last performed redo MIGHT still have an open snapshot.
+        // The last performed batch MIGHT still have an open snapshot.
         // we can't transition from synced, until that snapshot closes.
-        Batch lastPerformed = syncedRedos.getPrevious();
-        if( lastPerformed!=null && lastPerformed.references!=0) {
+        Batch lastPerformed = storedBatches.getPrevious();
+        if( lastPerformed!=null && lastPerformed.snapshots!=0) {
             return;
         }
         
-        while( syncedRedos!=storedRedos ) {
+        while( storedBatches!=storingBatches ) {
             
-            // Performing the redo actually applies the updates to the original page locations.
-            for (Commit commit : syncedRedos) {
+            // Performing the batch actually applies the updates to the original page locations.
+            for (Commit commit : storedBatches) {
                 for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
                     int page = entry.getKey();
                     Update update = entry.getValue();
                     
                     if( page != update.page ) {
                         
-                        if( syncedRedos.recovered ) {
+                        if( storedBatches.recovered ) {
                             // If we are recovering, the allocator MIGHT not have this 
                             // page as being allocated.  This makes sure it's allocated so that
                             // new transaction to get this page and overwrite it in error.
@@ -538,32 +544,32 @@
                     }
                     if( update.wasAllocated() ) {
                         
-                        if( syncedRedos.recovered ) {
+                        if( storedBatches.recovered ) {
                             // If we are recovering, the allocator MIGHT not have this 
                             // page as being allocated.  This makes sure it's allocated so that
                             // new transaction to get this page and overwrite it in error.
                             allocator.unfree(page, 1);
                         }
                         // Update the persistent free list.  This gets stored on the next sync.
-                        baseRevisionFreePages.remove(page, 1);
+                        storedFreeList.remove(page, 1);
                         
                     } else if( update.wasFreed() ) {
-                        baseRevisionFreePages.add(page, 1);
+                        storedFreeList.add(page, 1);
                     }
                 }
             }
             
-            syncedRedos.performed = true;
+            storedBatches.performed = true;
             
             // We synchronized /w the transactions so that they see the state change.
             synchronized (TRANSACTION_MUTEX) {
                 // Transition synced -> performed
-                syncedRedos = syncedRedos.getNext();
+                storedBatches = storedBatches.getNext();
             }
             
-            lastPerformed = syncedRedos.getPrevious();
-            // We have to stop if the last redo performed has an open snapshot.
-            if( lastPerformed.references!=0 ) {
+            lastPerformed = storedBatches.getPrevious();
+            // We have to stop if the last batch performed has an open snapshot.
+            if( lastPerformed.snapshots!=0 ) {
                 break;
             }
         }
@@ -578,19 +584,19 @@
             SnapshotHead head=null;
 
             // re-use the last entry if it was a snapshot head..
-            BatchEntry entry = buildingRedo.entries.getTail();
+            BatchEntry entry = openBatch.entries.getTail();
             if( entry!=null ) {
                 head = entry.isSnapshotHead();
             }
             
             if( head == null ) {
                 // create a new snapshot head entry..
-                head = new SnapshotHead(this, buildingRedo);
-                buildingRedo.entries.addLast(head);
+                head = new SnapshotHead(openBatch);
+                openBatch.entries.addLast(head);
             }
             
             // Open the snapshot off that head position.
-            return new Snapshot(this, head, syncedRedos).open();
+            return new Snapshot(this, head, storedBatches).open();
         }
     }
     

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java Sat Oct 24 13:59:01 2009
@@ -60,7 +60,7 @@
         if (concurrentPageFile != null) {
             concurrentPageFile.suspend(true, false, drainOnClose);
             concurrentPageFile.flush();
-            concurrentPageFile.performRedos();
+            concurrentPageFile.performBatches();
             concurrentPageFile=null;
         }
         super.close();

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java Sat Oct 24 13:59:01 2009
@@ -108,7 +108,11 @@
         }
         
         // No?  Then ask the snapshot to load the object.
-        return snapshot().getHead().cacheLoad(marshaller, page);
+        T rc = snapshot().getHead().get(marshaller, page);
+        if( rc == null ) {
+            rc = parent.readCache.cacheLoad(marshaller, page);
+        }
+        return rc;
     }
 
     public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
@@ -158,33 +162,45 @@
         return txallocator;
     }
 
-    public void read(int pageId, Buffer buffer) throws IOPagingException {
-        Update update = updates == null ? null : updates.get(pageId);
+    public void read(int page, Buffer buffer) throws IOPagingException {
+        // We may need to translate the page due to an update..
+        Update update = updates == null ? null : updates.get(page);
         if (update != null) {
-            parent.pageFile.read(update.page(), buffer);
+            // in this transaction..
+            page = update.page();
         } else {
-            // Get the data from the snapshot.
-            snapshot().getHead().read(pageId, buffer);
+            // in a committed transaction that has not yet been performed.
+            page = snapshot().getHead().translatePage(page);  
         }
+        parent.pageFile.read(page, buffer);
     }
 
     public ByteBuffer slice(SliceType type, int page, int count) throws IOPagingException {
-        //TODO: need to improve the design of ranged ops..
+        //TODO: wish we could do ranged opps more efficiently.
+        
         if( type==SliceType.READ ) {
             Update udpate = updates == null ? null : updates.get(page);
             if (udpate != null) {
-                return parent.pageFile.slice(type, udpate.page(), count);
+                page = udpate.page();
             } else {
-                // Get the data from the snapshot.
-                return snapshot().getHead().slice(page, count);
+                page = snapshot().getHead().translatePage(page);
             }
-            
         } else {
             Update update = getUpdates().get(page);
             if (update == null) {
+
+                // Allocate space of the update redo pages.
                 update = update(parent.allocator.alloc(count)).allocated();
+                int end = page+count;
+                for (int i = page; i < end; i++) {
+                    getUpdates().put(i, update(i).allocated());
+                }
+                
                 if (type==SliceType.READ_WRITE) {
-                    ByteBuffer slice = snapshot().getHead().slice(page, count);
+                    // Oh he's going to read it too?? then copy the original to the 
+                    // redo pages..
+                    int originalPage = snapshot().getHead().translatePage(page);
+                    ByteBuffer slice = parent.pageFile.slice(SliceType.READ, originalPage, count);
                     try {
                         parent.pageFile.write(update.page, slice);
                     } finally { 
@@ -192,16 +208,13 @@
                     }
                 }
                 
-                int end = page+count;
-                for (int i = page; i < end; i++) {
-                    getUpdates().put(i, update(i).allocated());
-                }
                 getUpdates().put(page, update);
-                
-                return parent.pageFile.slice(type, update.page, count);
             }
-            return parent.pageFile.slice(type, update.page(), count);
+            
+            // translate the page..
+            page = update.page;
         }
+        return parent.pageFile.slice(type, page, count);
         
     }
     

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java Sat Oct 24 13:59:01 2009
@@ -25,7 +25,8 @@
 
 
 /**
- * Provides Paged access to a MemoryMappedFile.
+ * Provides a {@link Paged} interface to a {@link MemoryMappedFile}. 
+ * 
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -37,30 +38,22 @@
     private final MemoryMappedFile file;
     
     
-    @Override
-    public String toString() {
-        return "{ header size: "+headerSize+", page size: "+pageSize+", allocator: "+allocator+" }";
-    }
-
     public PageFile(MemoryMappedFile file, short pageSize, int headerSize, int maxPages) {
         this.file = file;
         this.allocator = new SimpleAllocator(maxPages);
         this.pageSize = pageSize;
         this.headerSize = headerSize;
     }
-		
+    
+    ///////////////////////////////////////////////////////////////////
+    //
+    // Paged interface implementation.
+    //
+    ///////////////////////////////////////////////////////////////////
     public SimpleAllocator allocator() {
         return allocator;
     }
 
-    public int getHeaderSize() {
-        return headerSize;
-    }
-
-    public MemoryMappedFile getFile() {
-        return file;
-    }
-
 	public void read(int pageId, Buffer buffer) {
 		file.read(offset(pageId), buffer);
 	}
@@ -69,10 +62,6 @@
 		file.write(offset(pageId), buffer);
 	}
 	
-    public void write(int pageId, ByteBuffer buffer) {
-        file.write(offset(pageId), buffer);
-    }
-
 	public ByteBuffer slice(SliceType type, int pageId, int size) {
         assert size > 0;
         return file.slice(type==SliceType.READ, offset(pageId), pageSize*size);
@@ -81,12 +70,7 @@
     public void unslice(ByteBuffer buffer) {
         file.unslice(buffer);
     }
-    
-    
-    public long offset(long pageId) {
-        assert pageId >= 0;
-        return headerSize+(pageId*pageSize);
-    }
+
 	
     public int getPageSize() {
         return pageSize;
@@ -112,5 +96,34 @@
     public <T> void clear(EncoderDecoder<T> encoderDecoder, int page) {
         encoderDecoder.remove(this, page);
     }
+    
+    ///////////////////////////////////////////////////////////////////
+    //
+    // PageFile public methods.
+    //
+    ///////////////////////////////////////////////////////////////////
+
+    public void write(int pageId, ByteBuffer buffer) {
+        file.write(offset(pageId), buffer);
+    }
+
+    public long offset(long pageId) {
+        assert pageId >= 0;
+        return headerSize+(pageId*pageSize);
+    }
+    
+    public int getHeaderSize() {
+        return headerSize;
+    }
+
+    public MemoryMappedFile getFile() {
+        return file;
+    }
+
+    @Override
+    public String toString() {
+        return "{ header size: "+headerSize+", page size: "+pageSize+", allocator: "+allocator+" }";
+    }
+
 
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java Sat Oct 24 13:59:01 2009
@@ -17,7 +17,19 @@
 package org.apache.hawtdb.internal.page;
 
 /**
- * 
+ * <p>
+ * Snapshot objects are created for transactions so that they can access
+ * a consistent point in time view of the page file.
+ * </p><p>
+ * The are two parts to a snapshot: the base and the head. The base and head
+ * track the range of updates which were not yet performed against the page file
+ * when the snapshot was opened.  This range is tracked to ensure the snapshot
+ * view remains consistent.  Direct updates to data in that range is now allowed 
+ * while the snapshot is open.
+ * </p><p>
+ * When a snapshot is opened and closed, reference counters on the all 
+ * Batch objects between the base and the head get adjusted.
+ * </p>
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final class Snapshot {
@@ -26,24 +38,44 @@
     private final SnapshotHead head;
     private final Batch base;
     
-    Snapshot(HawtPageFile hawtPageFile, SnapshotHead head, Batch base) {
+    public  Snapshot(HawtPageFile hawtPageFile, SnapshotHead head, Batch base) {
         parent = hawtPageFile;
         this.head = head;
         this.base = base;
     }
-
-    Snapshot open() {
-        head.open(base);
+    
+    public Snapshot open() {
+        head.snapshots++;
+        Batch cur = base;
+        while( true ) {
+            cur.snapshots++;
+            if(cur == head.parent ) {
+                break;
+            }
+            cur = cur.getNext();
+        }
         return this;
     }
     
-    void close() {
+    public void close() {
         synchronized(parent.TRANSACTION_MUTEX) {
-            head.close(base);
+            head.snapshots--;
+            Batch cur = base;
+            while( true ) {
+                cur.snapshots--;
+                if(cur == head.parent ) {
+                    break;
+                }
+                cur = cur.getNext();
+            }
+
+            if( head.snapshots==0 ) {
+                head.unlink();
+            }        
         }
     }
 
-    SnapshotHead getHead() {
+    public SnapshotHead getHead() {
         return head;
     }
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java Sat Oct 24 13:59:01 2009
@@ -16,13 +16,9 @@
  */
 package org.apache.hawtdb.internal.page;
 
-import java.nio.ByteBuffer;
 import java.util.Map;
 
-import org.apache.activemq.util.buffer.Buffer;
 import org.apache.hawtdb.api.EncoderDecoder;
-import org.apache.hawtdb.api.IOPagingException;
-import org.apache.hawtdb.api.Paged.SliceType;
 
 /**
  * 
@@ -37,105 +33,64 @@
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 final class SnapshotHead extends BatchEntry {
-    /**
-     * 
-     */
-    private final HawtPageFile hawtPageFile;
+
     final Batch parent;
     
-    public SnapshotHead(HawtPageFile hawtPageFile, Batch parent) {
-        this.hawtPageFile = hawtPageFile;
+    public SnapshotHead(Batch parent) {
         this.parent = parent;
     }
 
     /** The number of times this snapshot has been opened. */
-    protected int references;
+    protected int snapshots;
     
     public String toString() { 
-        return "{ references: "+this.references+" }";
+        return "{ references: "+this.snapshots+" }";
     }
 
-    SnapshotHead isSnapshotHead() {
+    public SnapshotHead isSnapshotHead() {
         return this;
     }
     
-    public void read(int pageId, Buffer buffer) throws IOPagingException {
-        pageId = mapPageId(pageId);
-        this.hawtPageFile.pageFile.read(pageId, buffer);
-    }
-
-    public ByteBuffer slice(int pageId, int count) {
-        pageId = mapPageId(pageId);
-        return this.hawtPageFile.pageFile.slice(SliceType.READ, pageId, count);
-    }
-    
-    public void open(Batch base) {
-        references++;
-        while( true ) {
-            base.references++;
-            if(base == parent ) {
-                break;
-            }
-            base = base.getNext();
-        }
-    }
-    
-    public void close(Batch base) {
-        references--;
-        while( true ) {
-            base.references--;
-            if(base == parent ) {
-                break;
-            }
-            base = base.getNext();
-        }
-
-        if( references==0 ) {
-            unlink();
-            // TODO: trigger merging of adjacent commits. 
-        }
-    }
-
-    public int mapPageId(int page) {
+    public int translatePage(int page) {
         // Look for the page in the previous commits..
-        Batch curRedo = parent;
-        BatchEntry curEntry = getPrevious();
+        Batch batch = parent;
+        BatchEntry entry = getPrevious();
         while( true ) {
-            if( curRedo.isPerformed() ) {
+            if( batch.isPerformed() ) {
                 break;
             }
             
-            while( curEntry!=null ) {
-                Commit commit = curEntry.isCommit();
+            while( entry!=null ) {
+                Commit commit = entry.isCommit();
                 if( commit !=null ) {
                     Update update = commit.updates.get(page);
                     if( update!=null ) {
                         return update.page();
                     }
                 }
-                curEntry = curEntry.getPrevious();
+                entry = entry.getPrevious();
             }
             
-            curRedo = curRedo.getPrevious();
-            if( curRedo==null ) {
+            batch = batch.getPrevious();
+            if( batch==null ) {
                 break;
             }
-            curEntry = curRedo.entries.getTail();
+            entry = batch.entries.getTail();
         }
         return page;
     }
     
     
-    public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
-        Batch curRedo = parent;
-        BatchEntry curEntry = getPrevious();
+    public <T> T get(EncoderDecoder<T> marshaller, int page) {
+        Batch batch = parent;
+        BatchEntry entry = getPrevious();
         while( true ) {
-            if( curRedo.isPerformed() ) {
+            if( batch.isPerformed() ) {
                 break;
             }
             
-            while( curEntry!=null ) {
-                Commit commit = curEntry.isCommit();
+            while( entry!=null ) {
+                Commit commit = entry.isCommit();
                 if( commit !=null ) {
                     Update update = commit.updates.get(page);
                     if( update!=null ) {
@@ -145,36 +100,36 @@
                         }
                     }
                 }
-                curEntry = curEntry.getPrevious();
+                entry = entry.getPrevious();
             }
             
-            curRedo = curRedo.getPrevious();
-            if( curRedo==null ) {
+            batch = batch.getPrevious();
+            if( batch==null ) {
                 break;
             }
-            curEntry = curRedo.entries.getTail();
+            entry = batch.entries.getTail();
         }
-        return this.hawtPageFile.readCache.cacheLoad(marshaller, page);
+        return null;
     }
     
     public long commitCheck(Map<Integer, Update> pageUpdates) {
-        long rc=0;
-        Batch curRedo = parent;
-        BatchEntry curEntry = getNext();
+        long rc=parent.head;
+        Batch batch = parent;
+        BatchEntry entry = getNext();
         while( true ) {
-            while( curEntry!=null ) {
-                Commit commit = curEntry.isCommit();
+            while( entry!=null ) {
+                Commit commit = entry.isCommit();
                 if( commit!=null ) {
                     rc = commit.commitCheck(pageUpdates);
                 }
-                curEntry = curEntry.getNext();
+                entry = entry.getNext();
             }
             
-            curRedo = curRedo.getNext();
-            if( curRedo==null ) {
+            batch = batch.getNext();
+            if( batch==null ) {
                 break;
             }
-            curEntry = curRedo.entries.getHead();
+            entry = batch.entries.getHead();
         }
         return rc;
     }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java Sat Oct 24 13:59:01 2009
@@ -24,6 +24,28 @@
 import org.apache.hawtdb.api.PagingException;
 
 /**
+ * <p>Tracks one page update.
+ * </p><p>  
+ * To be able to provide snapshot isolation and to make 
+ * sure a set of updates can be performed atomically,  updates
+ * to existing pages are stored in a 'redo' page.  Once all the updates
+ * that are part of the transaction have been verified to be on disk,
+ * and no open snapshot would need to access the data on the original page,
+ * the contents of the 'redo' page are copied to the original page location
+ * and the 'redo' page gets freed.
+ * </p><p>
+ * A Update object is stored in the updates map in a Commit object.  That map
+ * is keyed off the original page location.  The Update page is the location
+ * of the 'redo' page.
+ * </p><p>
+ * Updates to pages which were allocated in the same transaction get done
+ * directly against the allocated page since no snapshot would have a view onto 
+ * that page.  In this case the update map key would match the update's page.
+ * </p><p>
+ * An update maintains some bit flags to know if the page was a new allocation
+ * or if the update was just freeing a previously allocated page, etc.  This data
+ * is used to properly maintain the persisted free page list.
+ * </p>
  * 
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java?rev=829372&r1=829371&r2=829372&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/page/ConcurrentPageFileTest.java Sat Oct 24 13:59:01 2009
@@ -216,7 +216,7 @@
 
         // Apply the updates.
         pf.flush();
-        pf.performRedos();
+        pf.performBatches();
 
         // Should still be there..
         assertEquals("Hello", load(pff.getPageFile(), 0));
@@ -239,7 +239,7 @@
 
         // Apply them
         pf.flush();
-        pf.performRedos();
+        pf.performBatches();
 
         // We should see them now.
         assertEquals("Good", load(pff.getPageFile(), 0));



Mime
View raw message