activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r828824 - in /activemq/sandbox/activemq-apollo/hawtdb/src: main/java/org/apache/hawtdb/internal/page/ test/java/org/apache/hawtdb/internal/index/
Date Thu, 22 Oct 2009 19:22:13 GMT
Author: chirino
Date: Thu Oct 22 19:22:13 2009
New Revision: 828824

URL: http://svn.apache.org/viewvc?rev=828824&view=rev
Log:
re-organized how the HawtPageFile keeps track of the snapshots.  Fixed an allocation leak issue that was apearing in the BTree benchmark.
still need to work through the new issues that poped up on the index tests.


Added:
    activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/DefferedBTreeIndexTest.java
Modified:
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
    activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java
    activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java
    activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java
    activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java?rev=828824&r1=828823&r2=828824&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java Thu Oct 22 19:22:13 2009
@@ -25,9 +25,10 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NoSuchElementException;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.zip.CRC32;
@@ -114,6 +115,15 @@
         }
     }
 
+    abstract static class RedoEntry extends LinkedNode<RedoEntry> {
+        Commit isCommit() {
+            return null;
+        }
+        SnapshotHead isSnapshotHead() {
+            return null;
+        }
+    }
+    
     /**
      * Tracks the page changes that were part of a commit.
      * 
@@ -121,35 +131,41 @@
      * 
      * @author chirino
      */
-    static class Commit extends LinkedNode<Commit> {
+    static class Commit extends RedoEntry implements Externalizable {
 
-        /** the redo that this commit is stored in */
-        private final Redo redo;        
         /** oldest revision in the commit range. */
         private long base; 
         /** newest revision in the commit range, will match base if this only tracks one commit */ 
         private long head;
-        /** set to the open snapshot who's head is this commit */
-        private Snapshot snapshot;
         /** all the page updates that are part of the redo */
-        private HashMap<Integer, Integer> updates;
+        private ConcurrentHashMap<Integer, Integer> updates;
         /** the deferred updates that need to be done in this redo */
-        private HashMap<Integer, DeferredUpdate> deferredUpdates;
+        private ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates;
+
 
-        public Commit(long version, HashMap<Integer, Integer> updates, HashMap<Integer, DeferredUpdate> deferredUpdates, Redo redo) {
-            this.redo = redo;
+        public Commit() {
+        }
+        
+        public Commit(long version, ConcurrentHashMap<Integer, Integer> updates, ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates) {
             this.head = this.base = version;
             this.updates = updates;
             this.deferredUpdates = deferredUpdates;
         }
         
+        
+        @Override
+        Commit isCommit() {
+            return this;
+        }
+
+        
         public String toString() { 
             int updateSize = updates==null ? 0 : updates.size();
             int cacheSize = deferredUpdates==null ? 0 : deferredUpdates.size();
             return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+" }";
         }
 
-        public long commitCheck(HashMap<Integer, Integer> newUpdate) {
+        public long commitCheck(Map<Integer, Integer> newUpdate) {
             for (Integer page : newUpdate.keySet()) {
                 if( updates.containsKey( page ) ) {
                     throw new OptimisticUpdateException();
@@ -158,138 +174,95 @@
             return head;
         }
 
-        public void putAll(HashMap<Integer, Integer> udpates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
-            if( udpates!=null ) {
-                if( this.updates == null ) {
-                    this.updates = udpates;
-                } else {
-                    this.updates.putAll(udpates);
-                }
-            }
-            if( deferredUpdates!=null ) {
-                if( this.deferredUpdates == null ) {
+        public void merge(Paged paged, long rev, ConcurrentHashMap<Integer, Integer> updates, ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates) {
+            assert head+1 == rev;
+            head=rev;
+            if (deferredUpdates != null) {
+                if (this.deferredUpdates == null) {
                     this.deferredUpdates = deferredUpdates;
                 } else {
-                    this.deferredUpdates.putAll(deferredUpdates);
+                    for (Entry<Integer, DeferredUpdate> entry : deferredUpdates.entrySet()) {
+                        Integer page = entry.getKey();
+                        DeferredUpdate du = entry.getValue();
+                        if (du.value == null) {
+                            this.deferredUpdates.remove(page);
+                        } else {
+                            DeferredUpdate previous = this.deferredUpdates.put(page, du);
+                            // TODO: There was a previous deferred update in the redo...  we can just use it's 
+                            // redo allocation and release the new allocation.
+                            if (previous != null) {
+                                Integer allocated = updates.remove(page);
+                                assert allocated == du.page; // these should match...
+                                paged.allocator().free(du.page, 1);
+                                // since we replaced the previous entry,  
+                                du.page = previous.page;
+                            }
+                        }
+                    }
                 }
             }
-        }
-        
-    }
-    
-    /**
-     * Aggregates a group of commits so that they can be more efficiently operated against.
-     * 
-     */
-    static private class Redo extends LinkedNode<Redo> implements Externalizable {
-        private static final long serialVersionUID = 1188640492489990493L;
-        
-        /** the pageId that this redo batch is stored at */
-        private transient int page=-1;
-        /** points to a previous redo batch page */
-        public int previous=-1;
-        /** was the redo loaded in the {@link recover} method */
-        private transient boolean recovered;
-        /** the commits stored in the redo */ 
-        private transient LinkedNodeList<Commit> commits = new LinkedNodeList<Commit>();
-        /** all the page updates that are part of the redo */
-        private ConcurrentHashMap<Integer, Integer> updates = new ConcurrentHashMap<Integer, Integer>();
-        /** the deferred updates that need to be done in this redo */
-        private transient ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates = new ConcurrentHashMap<Integer, DeferredUpdate>();
-        /** tracks how many snapshots are referencing the redo */
-        private int references;
-        /** set to the open snapshot who's head is before this redo */
-        private Snapshot prevSnapshot;
-        /** set to the open snapshot who's head is this redo */
-        private Snapshot snapshot;
-        /** the oldest commit in this redo */
-        public long base=-1;
-        /** the newest commit in this redo */
-        public long head;
-        
-        @SuppressWarnings("unused")
-        public Redo() {
-        }
-        
-        public Redo(long head) {
-            this.head = head;
-        }
-
-        public String toString() { 
-            int count = updates==null ? 0 : updates.size();
-            return "{ page: "+this.page+", updates: "+count+", previous: "+previous+" }";
-        }
-        
-        @Override
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(head);
-            out.writeLong(base);
-            out.writeInt(previous);
-            out.writeObject(updates);
+            this.updates.putAll(updates);
         }
 
         @SuppressWarnings("unchecked")
         @Override
         public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            head = in.readLong();
             base = in.readLong();
-            previous = in.readInt();
+            head = in.readLong();
             updates = (ConcurrentHashMap<Integer, Integer>) in.readObject();
-        }        
-
-        public int pageCount() {
-            int rc = 0;
-            rc = updates.size();
-            // TODO: we can probably get an idea of how many pages the deferred update will use.
-            // rc += deferredUpdates.size();
-            return rc;
         }
 
-        public long commitCheck(HashMap<Integer, Integer> newUpdate) {
-            for (Integer page : newUpdate.keySet()) {
-                if( updates.containsKey( page ) ) {
-                    throw new OptimisticUpdateException();
-                }
-            }
-            return head;
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(base);
+            out.writeLong(head);
+            out.writeObject(updates);
+        }
+        
+    }
+    
+    class Snapshot {
+        final SnapshotHead head;
+        final Redo base;
+        
+        public Snapshot(SnapshotHead head, Redo base) {
+            this.head = head;
+            this.base = base;
         }
 
-        public void putAll(HashMap<Integer, Integer> updates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
-            if( updates!=null ) {
-                this.updates.putAll(updates);
-            }
-            if( deferredUpdates!=null ) {
-                for (Entry<Integer, DeferredUpdate> entry : deferredUpdates.entrySet()) {
-                    if( entry.getValue().value == null ) {
-                        this.deferredUpdates.remove(entry.getKey(), entry.getValue());
-                    } else {
-                        this.deferredUpdates.put(entry.getKey(), entry.getValue());
-                    }
-                }
+        public Snapshot open() {
+            head.open(base);
+            return this;
+        }
+        
+        public void close() {
+            synchronized(TRANSACTION_MUTEX) {
+                head.close(base);
             }
         }
-
     }
-
+    
     /**
      * Provides a snapshot view of the page file.
      *  
      * @author chirino
      */
-    abstract class Snapshot {
-        /** The number of transactions that are holding this snapshot open */
+    class SnapshotHead extends RedoEntry {
+        final Redo parent;
+        
+        public SnapshotHead(Redo parent) {
+            this.parent = parent;
+        }
+
+        /** The number of times this snapshot has been opened. */
         protected int references;
         
         public String toString() { 
             return "{ references: "+this.references+" }";
         }
 
-        public int mapPageId(int page) {
-            return page;
-        }
-        
-        public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
-            return readCache.cacheLoad(marshaller, page);
+        SnapshotHead isSnapshotHead() {
+            return this;
         }
         
         public void read(int pageId, Buffer buffer) throws IOPagingException {
@@ -302,208 +275,291 @@
             return pageFile.slice(SliceType.READ, pageId, count);
         }
         
-        abstract public Snapshot open();
-        abstract public void close();
-        abstract public long commitCheck(HashMap<Integer, Integer> pageUpdates);
-    }
-    
-    class PreviousSnapshot extends Snapshot {
-        private final Redo redo;
-
-        public PreviousSnapshot(Redo redo) {
-            this.redo = redo;
-        }
-        
-        @Override
-        public Snapshot open() {
+        public void open(Redo base) {
             references++;
-            if( references==1 ) {
-                redo.references++;
-                redo.prevSnapshot = this;
-            }
-            return this;
-        }
-
-        @Override
-        public void close() {
-            references--;
-            if( references==0 ) {
-                redo.references--;
-                redo.prevSnapshot = null;
-            }
-        }
-        
-        public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
-            long rc = 0;
-            Redo cur = redo;
-            while (cur != null) {
-                rc = cur.commitCheck(pageUpdates);
-                cur = cur.getNext();
+            while( true ) {
+                base.references++;
+                if(base == parent ) {
+                    break;
+                }
+                base = base.getNext();
             }
-            return rc;
-        }
-
-    }
-
-    class RedosSnapshot extends Snapshot {
-        
-        /** The snapshot will load page updates from the following redo list. */
-        protected final List<Redo> redosInSnapshot;
-        
-        public RedosSnapshot() {
-            this.redosInSnapshot = snapshotRedos();
         }
         
-        public Snapshot open() {
-            references++;
-            if( references==1 ) {
-                for (Redo redo : redosInSnapshot) {
-                    redo.references++;
+        public void close(Redo base) {
+            references--;
+            while( true ) {
+                base.references--;
+                if(base == parent ) {
+                    break;
                 }
-                redosInSnapshot.get(0).snapshot = this;
+                base = base.getNext();
             }
-            return this;
-        }
 
-        public void close() {
-            references--;
             if( references==0 ) {
-                for (Redo redo : redosInSnapshot) {
-                    redo.references--;
-                }
-                redosInSnapshot.get(0).snapshot = null;
+                unlink();
+                // TODO: trigger merging of adjacent commits. 
             }
         }
-        
+
         public int mapPageId(int page) {
-            // It may be in the redos..
-            for (Redo redo : redosInSnapshot) {
-                Integer updatedPage = redo.updates.get(page);
-                if (updatedPage != null) {
-                    switch (updatedPage) {
-                    case PAGE_FREED:
-                        throw new PagingException("You should never try to read page that has been freed.");
-                    case PAGE_ALLOCATED:
-                        return page;
-                    default:
-                        return updatedPage;
+            // Look for the page in the previous commits..
+            Redo curRedo = parent;
+            RedoEntry curEntry = getPrevious();
+            while( true ) {
+                if( curRedo.isPerformed() ) {
+                    break;
+                }
+                
+                while( curEntry!=null ) {
+                    Commit commit = curEntry.isCommit();
+                    if( commit !=null ) {
+                        Integer updatedPage = commit.updates.get(page);
+                        if (updatedPage != null) {
+                            switch (updatedPage) {
+                            case PAGE_FREED:
+                                throw new PagingException("You should never try to read page that has been freed.");
+                            case PAGE_ALLOCATED:
+                                return page;
+                            default:
+                                return updatedPage;
+                            }
+                        }
                     }
+                    curEntry = curEntry.getPrevious();
                 }
+                
+                curRedo = curRedo.getPrevious();
+                if( curRedo==null ) {
+                    break;
+                }
+                curEntry = curRedo.entries.getTail();
             }
-            
-            return super.mapPageId(page);
+            return page;
         }
         
+        
         public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
-            for (Redo redo : redosInSnapshot) {
-                DeferredUpdate cu  = redo.deferredUpdates.get(page);
-                if (cu != null) {
-                    return cu.<T>value();
+            Redo curRedo = parent;
+            RedoEntry curEntry = getPrevious();
+            while( true ) {
+                if( curRedo.isPerformed() ) {
+                    break;
+                }
+                
+                while( curEntry!=null ) {
+                    Commit commit = curEntry.isCommit();
+                    if( commit !=null ) {
+                        DeferredUpdate du  = commit.deferredUpdates.get(page);
+                        if (du!=null) {
+                            return du.<T>value();
+                        }
+                    }
+                    curEntry = curEntry.getPrevious();
                 }
+                
+                curRedo = curRedo.getPrevious();
+                if( curRedo==null ) {
+                    break;
+                }
+                curEntry = curRedo.entries.getTail();
             }
-            return super.cacheLoad(marshaller, page);
+            return readCache.cacheLoad(marshaller, page);
         }
         
-        public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
-            long rc = 0;
-            Redo cur = redosInSnapshot.get(0).getNext();
-            while (cur != null) {
-                rc = cur.commitCheck(pageUpdates);
-                cur = cur.getNext();
+        public long commitCheck(Map<Integer, Integer> pageUpdates) {
+            long rc=0;
+            Redo curRedo = parent;
+            RedoEntry curEntry = getNext();
+            while( true ) {
+                while( curEntry!=null ) {
+                    Commit commit = curEntry.isCommit();
+                    if( commit!=null ) {
+                        rc = commit.commitCheck(pageUpdates);
+                    }
+                    curEntry = curEntry.getNext();
+                }
+                
+                curRedo = curRedo.getNext();
+                if( curRedo==null ) {
+                    break;
+                }
+                curEntry = curRedo.entries.getHead();
             }
             return rc;
         }
-        
-        
+                
     }
     
-    class CommitsSnapshot extends RedosSnapshot {
-        /** The snapshot will load page updates from the following commit and all it's previous linked commits. */
-        private final Commit commit;
+    /**
+     * Aggregates a group of commits so that they can be more efficiently operated against.
+     * 
+     */
+    static class Redo extends LinkedNode<Redo> implements Externalizable, Iterable<Commit> {
+        private static final long serialVersionUID = 1188640492489990493L;
+        
+        /** the pageId that this redo batch is stored at */
+        private transient int page=-1;
+        /** points to a previous redo batch page */
+        public int previous=-1;
+        /** was the redo loaded in the {@link recover} method */
+        private transient boolean recovered;
+        
+        /** the commits and snapshots in the redo */ 
+        private transient LinkedNodeList<RedoEntry> entries = new LinkedNodeList<RedoEntry>();
+        /** tracks how many snapshots are referencing the redo */
+        private int references;
+
+        /** the oldest commit in this redo */
+        public long base=-1;
+        /** the newest commit in this redo */
+        public long head;
 
-        public CommitsSnapshot(Commit commit) {
-            this.commit= commit;
+        private boolean performed;
+        
+        public Redo() {
+        }
+        
+        public boolean isPerformed() {
+            return performed;
         }
 
-        public Snapshot open() {
-            references++;
-            if( references==1 ) {
-                for (Redo redo : redosInSnapshot) {
-                    redo.references++;
-                }
-                commit.redo.references++;
-                commit.snapshot = this;
-            }
-            return this;
+        public Redo(long head) {
+            this.head = head;
+        }
+
+        public String toString() { 
+            return "{ page: "+this.page+", previous: "+previous+" }";
         }
         
-        public void close() {
-            references--;
-            if( references==0 ) {
-                for (Redo redo : redosInSnapshot) {
-                    redo.references--;
-                }
-                commit.redo.references--;
-                commit.snapshot = null;
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeLong(head);
+            out.writeLong(base);
+            out.writeInt(previous);
+
+            // Only need to store the commits.
+            ArrayList<Commit> l = new ArrayList<Commit>();
+            for (Commit commit : this) {
+                l.add(commit);
             }
+            out.writeObject(l);
         }
 
+        @SuppressWarnings("unchecked")
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            head = in.readLong();
+            base = in.readLong();
+            previous = in.readInt();
+            ArrayList<Commit> l = (ArrayList<Commit>) in.readObject();
+            for (Commit commit : l) {
+                entries.addLast(commit);
+            }
+        }        
 
-        public int mapPageId(int page) {
-            
-            // Check to see if it's in the current update list...
-            Commit update = this.commit;
-            while (update!=null) {
-                Integer updatedPage = update.updates.get(page);
-                if (updatedPage != null) {
-                    switch (updatedPage) {
-                    case PAGE_FREED:
-                        throw new PagingException("You should never try to read page that has been freed.");
-                    case PAGE_ALLOCATED:
-                        return page;
-                    default:
-                        return updatedPage;
+        public int pageCount() {
+            int rc = 0;
+            for (Commit commit : this) {
+                rc += commit.updates.size();
+            }
+            return rc;
+        }
+        
+        @Override
+        public Iterator<Commit> iterator() {
+            return new Iterator<Commit>() {
+                Commit next = nextCommit(entries.getHead());
+                Commit last;
+                
+                @Override
+                public boolean hasNext() {
+                    return next!=null;
+                }
+
+                @Override
+                public Commit next() {
+                    if( next==null ) {
+                        throw new NoSuchElementException();
                     }
+                    last = next;
+                    next = nextCommit(next.getNext());
+                    return last;
                 }
-                update = update.getPrevious();
-            }
-            
-            return super.mapPageId(page);
+
+                @Override
+                public void remove() {
+                    if( last==null ) {
+                        throw new IllegalStateException();
+                    }
+                    last.unlink();
+                }
+            };
         }
 
-        public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
-            // Check to see if it's in the current update list...
-            Commit update = this.commit;
-            while (update!=null) {
-                DeferredUpdate du = update.deferredUpdates.get(page);
-                if (du != null) {
-                    return du.<T>value();
+
+        private Commit nextCommit(RedoEntry entry) {
+            while( entry != null ) {
+                Commit commit = entry.isCommit();
+                if( commit!=null ) {
+                    return commit;
                 }
-                update = update.getPrevious();
+                entry = entry.getNext();
             }
-            
-            return super.cacheLoad(marshaller, page);
+            return null;
         }
-        
-        public long commitCheck(HashMap<Integer, Integer> pageUpdates) {
-            long rc = 0;
-            
-            Commit next = this.commit.getNext();
-            while (next != null) {
-                rc = next.commitCheck(pageUpdates);
-                next = next.getNext();
+
+        public void performDefferedUpdates(Paged pageFile) {            
+            for (Commit commit : this) {
+                if( commit.deferredUpdates != null ) {
+                    for (Entry<Integer, DeferredUpdate> entry : commit.deferredUpdates.entrySet()) {
+                        DeferredUpdate cu = entry.getValue();
+                        if( cu.value == null ) {
+                            List<Integer> freePages = cu.marshaller.remove(pageFile, cu.page);
+                            for (Integer page : freePages) {
+                                // add any allocated pages to the update list so that the free 
+                                // list gets properly adjusted.
+                                commit.updates.put(page, PAGE_FREED);
+                            }
+                        } else {
+                            List<Integer> allocatedPages = cu.store(pageFile);
+                            for (Integer page : allocatedPages) {
+                                // add any allocated pages to the update list so that the free 
+                                // list gets properly adjusted.
+                                commit.updates.put(page, PAGE_ALLOCATED);
+                            }
+                        }
+                    }
+                }
             }
+        }
+
+        public void freeRedoSpace(SimpleAllocator allocator) {
+            for (Commit commit : this) {
+                for (Entry<Integer, Integer> entry : commit.updates.entrySet()) {
+                    int key = entry.getKey();
+                    int value = entry.getValue();
             
-            Redo cur = this.commit.redo.getNext();
-            while (cur != null) {
-                rc = cur.commitCheck(pageUpdates);
-                cur = cur.getNext();
+                    switch( value ) {
+                    case PAGE_ALLOCATED:
+                        // It was a new page that was written.. we don't need to 
+                        // free it.
+                        break;
+                    case PAGE_FREED:
+                        // update freed a previous page.. now is when we actually free it.
+                        allocator.free(key, 1);
+                        break;
+                    default:
+                        // This updated the 'key' page, now we can release the 'value' page
+                        // since it has been copied over the 'key' page and is no longer needed.
+                        allocator.free(value, 1);
+                    }
+                }
             }
-            return rc;
         }
-        
+
     }
-    
+
     private final MemoryMappedFile file;
     final SimpleAllocator allocator;
     final PageFile pageFile;
@@ -532,7 +588,7 @@
     /** The performed redos.  Updates are actually performed to the original page file. */
     Redo performedRedos;
     
-    /** Used as cache read objects */
+    /** Used as read cache */
     private ReadCache readCache = new ReadCache();
 
     /** Mutex for data structures which are used during house keeping tasks like redo management. Once acquired, you can also acquire the TRANSACTION_MUTEX */
@@ -569,18 +625,18 @@
     /**
      * Attempts to commit a set of page updates.
      * 
-     * @param updatedSnapshot
+     * @param snapshot
      * @param pageUpdates
      * @param deferredUpdates
      */
-    void commit(Snapshot updatedSnapshot, HashMap<Integer, Integer> pageUpdates, HashMap<Integer, DeferredUpdate> deferredUpdates) {
+    void commit(Snapshot snapshot, ConcurrentHashMap<Integer, Integer> pageUpdates, ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates) {
         
         boolean fullRedo=false;
         synchronized (TRANSACTION_MUTEX) {
             
             // we need to figure out the revision id of the this commit...
             long rev;
-            if( updatedSnapshot!=null ) {
+            if( snapshot!=null ) {
                 
                 // Lets check for an OptimisticUpdateException
                 // verify that the new commit's updates don't conflict with a commit that occurred
@@ -588,8 +644,8 @@
                 
                 // Note: every deferred update has an entry in the pageUpdates, so no need to 
                 // check to see if that map also conflicts.
-                rev = updatedSnapshot.commitCheck(pageUpdates);
-                updatedSnapshot.close();
+                rev = snapshot.head.commitCheck(pageUpdates);
+                snapshot.close();
             } else {
                 rev = buildingRedo.head;
             }
@@ -598,20 +654,19 @@
             if( buildingRedo.base == -1 ) {
                 buildingRedo.base = rev;
             }
-
             buildingRedo.head = rev;
             
-            // TODO: This map merging has to be a bit CPU intensive.. need 
-            // to look for ways to optimize it out.
-            buildingRedo.putAll(pageUpdates, deferredUpdates);
-            
-            Commit last = buildingRedo.commits.getTail();
-            if( last==null || last.snapshot!=null ) {
-                last = new Commit(rev, pageUpdates, deferredUpdates, buildingRedo);
-                buildingRedo.commits.addLast(last);
+            Commit commit=null;
+            RedoEntry last = buildingRedo.entries.getTail();
+            if( last!=null ) {
+                commit = last.isCommit();
+            }
+            
+            if( commit!=null ) {
+                // TODO: figure out how to do the merge outside the TRANSACTION_MUTEX
+                commit.merge(pageFile, rev, pageUpdates, deferredUpdates);
             } else {
-                // we can merge into the previous commit.
-                last.putAll(pageUpdates, deferredUpdates);
+                buildingRedo.entries.addLast(new Commit(rev, pageUpdates, deferredUpdates) );
             }
             
             if( buildingRedo.pageCount() > updateBatchSize ) {
@@ -761,26 +816,7 @@
         }
         
         // Write any outstanding deferred cache updates...
-        if( redo.deferredUpdates != null ) {
-            for (Entry<Integer, DeferredUpdate> entry : redo.deferredUpdates.entrySet()) {
-                DeferredUpdate cu = entry.getValue();
-                if( cu.value == null ) {
-                    List<Integer> freePages = cu.marshaller.remove(pageFile, cu.page);
-                    for (Integer page : freePages) {
-                        // add any allocated pages to the update list so that the free 
-                        // list gets properly adjusted.
-                        redo.updates.put(page, PAGE_FREED);
-                    }
-                } else {
-                    List<Integer> allocatedPages = cu.store(pageFile);
-                    for (Integer page : allocatedPages) {
-                        // add any allocated pages to the update list so that the free 
-                        // list gets properly adjusted.
-                        redo.updates.put(page, PAGE_ALLOCATED);
-                    }
-                }
-            }
-        }
+        redo.performDefferedUpdates(pageFile);
 
         // Link it to the last redo.
         redo.previous = lastRedoPage; 
@@ -839,25 +875,7 @@
             }
             
             // Free the update pages associated with the redo.
-            for (Entry<Integer, Integer> entry : performedRedos.updates.entrySet()) {
-                int key = entry.getKey();
-                int value = entry.getValue();
-        
-                switch( value ) {
-                case PAGE_ALLOCATED:
-                    // It was a new page that was written.. we don't need to 
-                    // free it.
-                    break;
-                case PAGE_FREED:
-                    // update freed a previous page.. now is when we actually free it.
-                    allocator.free(key, 1);
-                    break;
-                default:
-                    // This updated the 'key' page, now we can release the 'value' page
-                    // since it has been copied over the 'key' page and is no longer needed.
-                    allocator.free(value, 1);
-                }
-            }
+            performedRedos.freeRedoSpace(allocator);
             
             // Free the redo record itself.
             Extent.free(pageFile, performedRedos.page);
@@ -903,45 +921,49 @@
         
         while( syncedRedos!=storedRedos ) {
             
-            // Performing the redo actually applies the updates to the original page locations. 
-            for (Entry<Integer, Integer> entry : syncedRedos.updates.entrySet()) {
-                int key = entry.getKey();
-                int value = entry.getValue();
-                switch( value ) {
-                case PAGE_ALLOCATED:
-                    if( syncedRedos.recovered ) {
-                        // If we are recovering, the allocator MIGHT not have this 
-                        // page as being allocated.  This makes sure it's allocated so that
-                        // new transaction to get this page and overwrite it in error.
-                        allocator.unfree(key, 1);
-                    }
-                    // Update the persistent free list.  This gets stored on the next sync.
-                    baseRevisionFreePages.remove(key, 1);
-                    break;
-                case PAGE_FREED:
-                    // The actual free gets done on the next file sync.
-                    // Update the persistent free list.  This gets stored on the next sync.
-                    baseRevisionFreePages.add(key, 1);
-                    break;
-                default:
-                    if( syncedRedos.recovered ) {
-                        // If we are recovering, the allocator MIGHT not have this 
-                        // page as being allocated.  This makes sure it's allocated so that
-                        // new transaction to get this page and overwrite it in error.
-                        allocator.unfree(key, 1);
-                    }
-                    
-                    // Perform the update by copying the updated 'redo page' to the original
-                    // page location.
-                    ByteBuffer slice = pageFile.slice(SliceType.READ, value, 1);
-                    try {
-                        pageFile.write(key, slice);
-                    } finally { 
-                        pageFile.unslice(slice);
+            // Performing the redo actually applies the updates to the original page locations.
+            for (Commit commit : syncedRedos) {
+                for (Entry<Integer, Integer> entry : commit.updates.entrySet()) {
+                    int key = entry.getKey();
+                    int value = entry.getValue();
+                    switch( value ) {
+                    case PAGE_ALLOCATED:
+                        if( syncedRedos.recovered ) {
+                            // If we are recovering, the allocator MIGHT not have this 
+                            // page as being allocated.  This makes sure it's allocated so that
+                            // new transaction to get this page and overwrite it in error.
+                            allocator.unfree(key, 1);
+                        }
+                        // Update the persistent free list.  This gets stored on the next sync.
+                        baseRevisionFreePages.remove(key, 1);
+                        break;
+                    case PAGE_FREED:
+                        // The actual free gets done on the next file sync.
+                        // Update the persistent free list.  This gets stored on the next sync.
+                        baseRevisionFreePages.add(key, 1);
+                        break;
+                    default:
+                        if( syncedRedos.recovered ) {
+                            // If we are recovering, the allocator MIGHT not have this 
+                            // page as being allocated.  This makes sure it's allocated so that
+                            // new transaction to get this page and overwrite it in error.
+                            allocator.unfree(key, 1);
+                        }
+                        
+                        // Perform the update by copying the updated 'redo page' to the original
+                        // page location.
+                        ByteBuffer slice = pageFile.slice(SliceType.READ, value, 1);
+                        try {
+                            pageFile.write(key, slice);
+                        } finally { 
+                            pageFile.unslice(slice);
+                        }
                     }
                 }
             }
             
+            syncedRedos.performed = true;
+            
             // We synchronized /w the transactions so that they see the state change.
             synchronized (TRANSACTION_MUTEX) {
                 // Transition synced -> performed
@@ -962,51 +984,22 @@
     
     Snapshot openSnapshot() {
         synchronized(TRANSACTION_MUTEX) {
-            
-            // Is it a partial redo snapshot??  
-            // If there are commits in the next redo..
-            Snapshot snapshot;
-            Commit commit = buildingRedo.commits.getTail();
-            if( commit!=null ) {
-                snapshot = commit.snapshot != null ? commit.snapshot : new CommitsSnapshot(commit);
-                return snapshot.open();
-            }
+            SnapshotHead head=null;
 
-            // Perhaps this snapshot has to deal with full redos..
-            if( syncedRedos!=buildingRedo ) {
-                Redo lastRedo = buildingRedo.getPrevious();
-                snapshot = lastRedo.snapshot != null ? lastRedo.snapshot : new RedosSnapshot();
-                return snapshot.open();
+            // re-use the last entry if it was a snapshot head..
+            RedoEntry entry = buildingRedo.entries.getTail();
+            if( entry!=null ) {
+                head = entry.isSnapshotHead();
             }
             
-            // Then the snapshot does not have previous updates.
-            snapshot = buildingRedo.prevSnapshot != null ? buildingRedo.prevSnapshot : new PreviousSnapshot(buildingRedo);
-            return snapshot.open();
-        }
-    }
-    
-    private List<Redo> snapshotRedos() {
-        if( syncedRedos!=buildingRedo ) {
-            ArrayList<Redo> rc = new ArrayList<Redo>(4);
-            Redo cur = buildingRedo.getPrevious();
-            while( true ) {
-                rc.add(cur);
-                if( cur == syncedRedos ) {
-                    break;
-                }
-                cur = cur.getPrevious();
-            }
-            return rc;
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
-    void closeSnapshot(Snapshot snapshot) {
-        if( snapshot!=null ) {
-            synchronized(TRANSACTION_MUTEX) {
-                snapshot.close();
+            if( head == null ) {
+                // create a new snapshot head entry..
+                head = new SnapshotHead(buildingRedo);
+                buildingRedo.entries.addLast(head);
             }
+            
+            // Open the snapshot off that head position.
+            return new Snapshot(head, syncedRedos).open();
         }
     }
     
@@ -1108,7 +1101,7 @@
     }
     
     static class DeferredUpdate {
-        final int page;
+        int page;
         Object value;
         EncoderDecoder<?> marshaller;
 
@@ -1117,6 +1110,11 @@
             this.value = value;
             this.marshaller = marshaller;
         }
+        
+        @Override
+        public String toString() {
+            return "{ page: "+page+", removed: "+(value==null)+" }";
+        }
 
         public void reset(Object value, EncoderDecoder<?> marshaller) {
             this.value = value;

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java?rev=828824&r1=828823&r2=828824&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java Thu Oct 22 19:22:13 2009
@@ -17,8 +17,8 @@
 package org.apache.hawtdb.internal.page;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.hawtdb.api.Allocator;
@@ -49,8 +49,8 @@
         parent = concurrentPageFile;
     }
 
-    private HashMap<Integer, DeferredUpdate> deferredUpdates;
-    private HashMap<Integer, Integer> updates;
+    private ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates;
+    private ConcurrentHashMap<Integer, Integer> updates;
     private Snapshot snapshot;
     
     private final Allocator txallocator = new Allocator() {
@@ -103,7 +103,7 @@
         }
         
         // No?  Then ask the snapshot to load the object.
-        return snapshot().cacheLoad(marshaller, page);
+        return snapshot().head.cacheLoad(marshaller, page);
     }
 
     public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
@@ -166,7 +166,7 @@
             }
         } else {
             // Get the data from the snapshot.
-            snapshot().read(pageId, buffer);
+            snapshot().head.read(pageId, buffer);
         }
     }
 
@@ -186,7 +186,7 @@
                 return parent.pageFile.slice(type, page, count);
             } else {
                 // Get the data from the snapshot.
-                return snapshot().slice(page, count);
+                return snapshot().head.slice(page, count);
             }
             
         } else {
@@ -195,7 +195,7 @@
                 update = parent.allocator.alloc(count);
                 
                 if (type==SliceType.READ_WRITE) {
-                    ByteBuffer slice = snapshot().slice(page, count);
+                    ByteBuffer slice = snapshot().head.slice(page, count);
                     try {
                         parent.pageFile.write(update, slice);
                     } finally { 
@@ -289,10 +289,12 @@
                 }
             }
         } finally {
-            parent.closeSnapshot(snapshot);
+            if( snapshot!=null ) {
+                snapshot.close();
+                snapshot = null;
+            }
             updates = null;
             deferredUpdates = null;
-            snapshot = null;
         }
     }
 
@@ -307,16 +309,16 @@
         return updates == null;
     }
 
-    public HashMap<Integer, DeferredUpdate> getCacheUpdates() {
+    public ConcurrentHashMap<Integer, DeferredUpdate> getCacheUpdates() {
         if( deferredUpdates==null ) {
-            deferredUpdates = new HashMap<Integer, DeferredUpdate>();
+            deferredUpdates = new ConcurrentHashMap<Integer, DeferredUpdate>();
         }
         return deferredUpdates;
     }
 
-    private HashMap<Integer, Integer> getUpdates() {
+    private ConcurrentHashMap<Integer, Integer> getUpdates() {
         if (updates == null) {
-            updates = new HashMap<Integer, Integer>();
+            updates = new ConcurrentHashMap<Integer, Integer>();
         }
         return updates;
     }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java?rev=828824&r1=828823&r2=828824&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexBenchmark.java Thu Oct 22 19:22:13 2009
@@ -22,6 +22,8 @@
 import org.apache.hawtdb.api.BTreeIndexFactory;
 import org.apache.hawtdb.api.Index;
 import org.apache.hawtdb.api.Transaction;
+import org.apache.hawtdb.internal.Benchmarker.BenchmarkAction;
+import org.junit.Test;
 
 /**
  * 
@@ -29,6 +31,8 @@
  */
 public class BTreeIndexBenchmark extends IndexBenchmark {
 
+    private boolean deferredEncoding;
+
     public BTreeIndexBenchmark() {
         this.benchmark.setSamples(5);
     }
@@ -37,8 +41,28 @@
         BTreeIndexFactory<Long, Buffer> factory = new BTreeIndexFactory<Long, Buffer>();
         factory.setKeyMarshaller(LongMarshaller.INSTANCE);
         factory.setValueMarshaller(new FixedBufferMarshaller(DATA.length));
-        factory.setDeferredEncoding(true);
+        factory.setDeferredEncoding(deferredEncoding);
         return factory.create(tx, tx.allocator().alloc(1));
     }
 
+    @Test
+    public void insertDeffered() throws Exception {
+        deferredEncoding = true;
+        benchmark.benchmark(1, new BenchmarkAction<IndexActor>("insert with deffered encoding") {
+            protected void execute(IndexActor actor) throws InterruptedException {
+                actor.benchmarkIndex();
+            }
+        });        
+    }
+
+    @Test
+    public void insert() throws Exception {
+        deferredEncoding = false;
+        benchmark.benchmark(1, new BenchmarkAction<IndexActor>("insert without deffered encoding") {
+            protected void execute(IndexActor actor) throws InterruptedException {
+                actor.benchmarkIndex();
+            }
+        });        
+    }
+    
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java?rev=828824&r1=828823&r2=828824&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/BTreeIndexTest.java Thu Oct 22 19:22:13 2009
@@ -40,7 +40,6 @@
 public class BTreeIndexTest extends IndexTestSupport {
 
     private NumberFormat nf;
-    private boolean deferredEncoding=true;
 
     @Before
     public void setUp() throws Exception {
@@ -54,7 +53,7 @@
         BTreeIndexFactory<String,Long> factory = new BTreeIndexFactory<String,Long>();
         factory.setKeyMarshaller(StringMarshaller.INSTANCE);
         factory.setValueMarshaller(LongMarshaller.INSTANCE);
-        factory.setDeferredEncoding(deferredEncoding);
+        factory.setDeferredEncoding(false);
         if( page==-1 ) {
             return factory.create(tx, tx.allocator().alloc(1));
         } else {

Added: activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/DefferedBTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/DefferedBTreeIndexTest.java?rev=828824&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/DefferedBTreeIndexTest.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/DefferedBTreeIndexTest.java Thu Oct 22 19:22:13 2009
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import org.apache.activemq.util.marshaller.LongMarshaller;
+import org.apache.activemq.util.marshaller.StringMarshaller;
+import org.apache.hawtdb.api.BTreeIndexFactory;
+import org.apache.hawtdb.api.Index;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DefferedBTreeIndexTest extends BTreeIndexTest {
+
+    @Override
+    protected Index<String, Long> createIndex(int page) {
+        BTreeIndexFactory<String,Long> factory = new BTreeIndexFactory<String,Long>();
+        factory.setKeyMarshaller(StringMarshaller.INSTANCE);
+        factory.setValueMarshaller(LongMarshaller.INSTANCE);
+        factory.setDeferredEncoding(true);
+        if( page==-1 ) {
+            return factory.create(tx, tx.allocator().alloc(1));
+        } else {
+            return factory.open(tx, page);
+        }
+    }
+    
+}

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java?rev=828824&r1=828823&r2=828824&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/HashIndexBenchmark.java Thu Oct 22 19:22:13 2009
@@ -38,7 +38,7 @@
         HashIndexFactory<Long, Buffer> factory = new HashIndexFactory<Long, Buffer>();
         factory.setKeyMarshaller(LongMarshaller.INSTANCE);
         factory.setValueMarshaller(new FixedBufferMarshaller(DATA.length));
-        factory.setFixedCapacity(1024*100);
+        factory.setFixedCapacity(1024*10);
         return factory.create(tx, tx.allocator().alloc(1));
     }
     

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java?rev=828824&r1=828823&r2=828824&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/test/java/org/apache/hawtdb/internal/index/IndexBenchmark.java Thu Oct 22 19:22:13 2009
@@ -37,7 +37,7 @@
  */
 public abstract class IndexBenchmark {
     
-    private static final int KEY_SPACE = 10000;
+    private static final int KEY_SPACE = 5000000;
     private static final int VALUE_SIZE = 8;
     
     static final public byte[] DATA = new byte[VALUE_SIZE];



Mime
View raw message