activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r829298 - /activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/
Date Sat, 24 Oct 2009 02:14:22 GMT
Author: chirino
Date: Sat Oct 24 02:14:21 2009
New Revision: 829298

URL: http://svn.apache.org/viewvc?rev=829298&view=rev
Log:
Aextracted out the HawtPageFile inner clasess as prep for implementing more rigorous thread synchronizations.


Added:
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java
Modified:
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Batch.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Map.Entry;
+
+import org.apache.activemq.util.list.LinkedNode;
+import org.apache.activemq.util.list.LinkedNodeList;
+import org.apache.hawtdb.api.Paged;
+
+/**
+ * Aggregates a group of commits so that they can be more efficiently
+ * stored to disk.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Batch extends LinkedNode<Batch> implements Externalizable, Iterable<Commit> {
+    private static final long serialVersionUID = 1188640492489990493L;
+    
+    /** the pageId that this redo batch is stored at */
+    int page=-1;
+    /** points to a previous redo batch page */
+    public int previous=-1;
+    /** was the redo loaded in the {@link recover} method */
+    boolean recovered;
+    
+    /** the commits and snapshots in the redo */ 
+    final LinkedNodeList<BatchEntry> entries = new LinkedNodeList<BatchEntry>();
+    /** tracks how many snapshots are referencing the redo */
+    int references;
+    /** the oldest commit in this redo */
+    public long base=-1;
+    /** the newest commit in this redo */
+    public long head;
+
+    boolean performed;
+    
+    public Batch() {
+    }
+    
+    public boolean isPerformed() {
+        return performed;
+    }
+
+    public Batch(long head) {
+        this.head = head;
+    }
+
+    public String toString() { 
+        return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+references+", entries: "+entries.size()+" }";
+    }
+    
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(head);
+        out.writeLong(base);
+        out.writeInt(previous);
+
+        // Only need to store the commits.
+        ArrayList<Commit> l = new ArrayList<Commit>();
+        for (Commit commit : this) {
+            l.add(commit);
+        }
+        out.writeObject(l);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        head = in.readLong();
+        base = in.readLong();
+        previous = in.readInt();
+        ArrayList<Commit> l = (ArrayList<Commit>) in.readObject();
+        for (Commit commit : l) {
+            entries.addLast(commit);
+        }
+    }        
+
+    public int pageCount() {
+        int rc = 0;
+        for (Commit commit : this) {
+            rc += commit.updates.size();
+        }
+        return rc;
+    }
+    
+    @Override
+    public Iterator<Commit> iterator() {
+        return new Iterator<Commit>() {
+            Commit next = nextCommit(entries.getHead());
+            Commit last;
+            
+            @Override
+            public boolean hasNext() {
+                return next!=null;
+            }
+
+            @Override
+            public Commit next() {
+                if( next==null ) {
+                    throw new NoSuchElementException();
+                }
+                last = next;
+                next = nextCommit(next.getNext());
+                return last;
+            }
+
+            @Override
+            public void remove() {
+                if( last==null ) {
+                    throw new IllegalStateException();
+                }
+                last.unlink();
+            }
+        };
+    }
+
+
+    private Commit nextCommit(BatchEntry entry) {
+        while( entry != null ) {
+            Commit commit = entry.isCommit();
+            if( commit!=null ) {
+                return commit;
+            }
+            entry = entry.getNext();
+        }
+        return null;
+    }
+
+    public void performDefferedUpdates(Paged pageFile) {            
+        for (Commit commit : this) {
+            if( commit.updates != null ) {
+                for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
+                    DeferredUpdate du = entry.getValue().deferredUpdate();
+                    if( du == null ) {
+                        continue;
+                    }
+                    if( du.wasDeferredClear() ) {
+                        List<Integer> freePages = du.marshaller.remove(pageFile, du.page);
+                        for (Integer page : freePages) {
+                            commit.merge(pageFile.allocator(), page, Update.update(page).freed());
+                        }
+                    } else if( du.wasDeferredStore() ) {
+                        List<Integer> allocatedPages = du.store(pageFile);
+                        for (Integer page : allocatedPages) {
+                            // add any allocated pages to the update list so that the free 
+                            // list gets properly adjusted.
+                            commit.merge(pageFile.allocator(), page, Update.update(page).allocated());
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    public void freeRedoSpace(SimpleAllocator allocator) {
+        for (Commit commit : this) {
+            for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
+                int key = entry.getKey();
+                Update value = entry.getValue();
+                if( value.wasFreed() ) {
+                    allocator.free(key, 1);
+                } else if( key != value.page ) {
+                    // need to free the udpate page..
+                    allocator.free(value.page, 1);
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/BatchEntry.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import org.apache.activemq.util.list.LinkedNode;
+
+/**
+ * A BatchEntry is linked list know which may be a Commit record or a SnapshotHead.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+abstract class BatchEntry extends LinkedNode<BatchEntry> {
+    
+    Commit isCommit() {
+        return null;
+    }
+    
+    SnapshotHead isSnapshotHead() {
+        return null;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Commit.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hawtdb.api.Allocator;
+import org.apache.hawtdb.api.OptimisticUpdateException;
+
+/**
+ * Tracks the updates that were part of a transaction commit.
+ * Multiple commit objects can be merged into a single commit.
+ * 
+ * A Commit is a BatchEntry and stored in Batch object.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final class Commit extends BatchEntry implements Externalizable {
+
+    /** oldest revision in the commit range. */
+    private long base; 
+    /** newest revision in the commit range, will match base if this only tracks one commit */ 
+    private long head;
+    
+    /** all the page updates that are part of the redo */
+    ConcurrentHashMap<Integer, Update> updates;
+
+
+    public Commit() {
+    }
+    
+    public Commit(long version, ConcurrentHashMap<Integer, Update> updates) {
+        this.head = this.base = version;
+        this.updates = updates;
+    }
+    
+    
+    @Override
+    Commit isCommit() {
+        return this;
+    }
+
+    
+    public String toString() { 
+        int updateSize = updates==null ? 0 : updates.size();
+        return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+" }";
+    }
+
+    public long commitCheck(Map<Integer, Update> newUpdate) {
+        for (Integer page : newUpdate.keySet()) {
+            if( updates.containsKey( page ) ) {
+                throw new OptimisticUpdateException();
+            }
+        }
+        return head;
+    }
+
+    public void merge(Allocator allocator, long rev, ConcurrentHashMap<Integer, Update> updates) {
+        assert head+1 == rev;
+        head=rev;
+        // merge all the entries in the update..
+        for (Entry<Integer, Update> entry : updates.entrySet()) {
+            merge(allocator, entry.getKey(), entry.getValue());
+        }
+    }
+
+    /**
+     * merges one update..
+     * 
+     * @param page
+     * @param update
+     */
+    void merge(Allocator allocator, int page, Update update) {
+        Update previous = this.updates.put(page, update);
+        if (previous != null) {
+            
+            if( update.wasFreed() ) {
+                // we can undo the previous update
+                if( previous.page != page ) {
+                    allocator.free(previous.page, 1);
+                }
+                if( previous.wasAllocated() ) {
+                    allocator.free(page, 1);
+                }
+                this.updates.remove(page);
+                
+                // No other merging is needed now..
+                return;
+            }
+            
+            // we are undoing the previous update /w this new update.
+            if( previous.page != page ) {
+                allocator.free(previous.page, 1);
+            }
+            
+            // we may be updating a previously allocated page,
+            // if so we need to mark the new page as allocated too.
+            if( previous.wasAllocated() ) {
+                update.allocated();
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        base = in.readLong();
+        head = in.readLong();
+        updates = (ConcurrentHashMap<Integer, Update>) in.readObject();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeLong(base);
+        out.writeLong(head);
+        out.writeObject(updates);
+    }
+    
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/DeferredUpdate.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.ObjectStreamException;
+import java.util.List;
+
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.Paged;
+
+/**
+ * A deferred update is an update which has not yet been performed, but 
+ * which holds onto all the info needed to do the update.
+ * 
+ * Encoding java objects to do page updates can be CPU intensive, and if 
+ * the same pages are getting updated frequently then deferring updates
+ * will save encoding passes sine older updates may get discarded due 
+ * to a more more recent update of the same page.  
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class DeferredUpdate extends Update {
+    EncoderDecoder<?> marshaller;
+    Object value;
+
+    public DeferredUpdate(Update update) {
+        super(update);
+    }
+    public DeferredUpdate(int page) {
+        super(page);
+    }
+    
+    public static DeferredUpdate deferred(int page) {
+        return new DeferredUpdate(page);
+    }
+    
+    public static DeferredUpdate deferred(Update update) {
+        return new DeferredUpdate(update);
+    }
+
+    public DeferredUpdate deferredUpdate() {
+        return this;
+    }
+    
+    public DeferredUpdate store(Object value, EncoderDecoder<?> marshaller) {
+        this.value = value;
+        this.marshaller = marshaller;
+        flags = (byte) ((flags & ~PAGE_CLEAR) | PAGE_STORE);
+        return this;
+    }
+
+    public DeferredUpdate clear(EncoderDecoder<?> marshaller) {
+        this.marshaller= marshaller;
+        this.value=null;
+        flags = (byte) ((flags & ~PAGE_STORE) | PAGE_CLEAR);
+        return this;
+    }
+    
+    @SuppressWarnings("unchecked")
+    <T> T value() {
+        return (T) value;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public List<Integer> store(Paged paged) {
+        return ((EncoderDecoder)marshaller).store(paged, page, value);
+    }
+
+    public Object writeReplace() throws ObjectStreamException {
+        return new Update(this);
+    }
+    
+}
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java?rev=829298&r1=829297&r2=829298&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java Sat Oct 24 02:14:21 2009
@@ -16,20 +16,12 @@
  */
 package org.apache.hawtdb.internal.page;
 
-import java.io.Externalizable;
 import java.io.IOException;
-import java.io.ObjectInput;
 import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import java.io.ObjectStreamException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.zip.CRC32;
@@ -37,14 +29,9 @@
 import javolution.io.Struct;
 
 import org.apache.activemq.util.LRUCache;
-import org.apache.activemq.util.buffer.Buffer;
-import org.apache.activemq.util.list.LinkedNode;
 import org.apache.activemq.util.list.LinkedNodeList;
-import org.apache.hawtdb.api.Allocator;
 import org.apache.hawtdb.api.EncoderDecoder;
 import org.apache.hawtdb.api.IOPagingException;
-import org.apache.hawtdb.api.OptimisticUpdateException;
-import org.apache.hawtdb.api.Paged;
 import org.apache.hawtdb.api.PagingException;
 import org.apache.hawtdb.api.Transaction;
 import org.apache.hawtdb.api.Paged.SliceType;
@@ -72,10 +59,8 @@
  */
 public final class HawtPageFile {
 
-    private static final String MAGIC = "HawtDB:1.0\n";
-    private static final int FILE_HEADER_SIZE = 1024 * 4;
-
-    public static final int HEADER_SIZE = 1024*4;
+    public static final int FILE_HEADER_SIZE = 1024 * 4;
+    public static final String MAGIC = "HawtDB:1.0\n";
 
     /**
      * The first 4K of the file is used to hold 2 copies of the header.
@@ -115,463 +100,18 @@
         }
     }
 
-    abstract static class RedoEntry extends LinkedNode<RedoEntry> {
-        Commit isCommit() {
-            return null;
-        }
-        SnapshotHead isSnapshotHead() {
-            return null;
-        }
-    }
-    
-    /**
-     * Tracks the page changes that were part of a commit.
-     * 
-     * Commits can be merged, in that sense this then tracks range of commits.
-     * 
-     * @author chirino
-     */
-    final static class Commit extends RedoEntry implements Externalizable {
-
-        /** oldest revision in the commit range. */
-        private long base; 
-        /** newest revision in the commit range, will match base if this only tracks one commit */ 
-        private long head;
-        /** all the page updates that are part of the redo */
-        private ConcurrentHashMap<Integer, Update> updates;
-
-
-        public Commit() {
-        }
-        
-        public Commit(long version, ConcurrentHashMap<Integer, Update> updates) {
-            this.head = this.base = version;
-            this.updates = updates;
-        }
-        
-        
-        @Override
-        Commit isCommit() {
-            return this;
-        }
-
-        
-        public String toString() { 
-            int updateSize = updates==null ? 0 : updates.size();
-            return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+" }";
-        }
-
-        public long commitCheck(Map<Integer, Update> newUpdate) {
-            for (Integer page : newUpdate.keySet()) {
-                if( updates.containsKey( page ) ) {
-                    throw new OptimisticUpdateException();
-                }
-            }
-            return head;
-        }
-
-        public void merge(Allocator allocator, long rev, ConcurrentHashMap<Integer, Update> updates) {
-            assert head+1 == rev;
-            head=rev;
-            // merge all the entries in the update..
-            for (Entry<Integer, Update> entry : updates.entrySet()) {
-                merge(allocator, entry.getKey(), entry.getValue());
-            }
-        }
-
-        /**
-         * merges one update..
-         * 
-         * @param page
-         * @param update
-         */
-        private void merge(Allocator allocator, int page, Update update) {
-            Update previous = this.updates.put(page, update);
-            if (previous != null) {
-                
-                if( update.wasFreed() ) {
-                    // we can undo the previous update
-                    if( previous.page != page ) {
-                        allocator.free(previous.page, 1);
-                    }
-                    if( previous.wasAllocated() ) {
-                        allocator.free(page, 1);
-                    }
-                    this.updates.remove(page);
-                    
-                    // No other merging is needed now..
-                    return;
-                }
-                
-                // we are undoing the previous update /w this new update.
-                if( previous.page != page ) {
-                    allocator.free(previous.page, 1);
-                }
-                
-                // we may be updating a previously allocated page,
-                // if so we need to mark the new page as allocated too.
-                if( previous.wasAllocated() ) {
-                    update.allocated();
-                }
-            }
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            base = in.readLong();
-            head = in.readLong();
-            updates = (ConcurrentHashMap<Integer, Update>) in.readObject();
-        }
-
-        @Override
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(base);
-            out.writeLong(head);
-            out.writeObject(updates);
-        }
-        
-    }
-    
-    final class Snapshot {
-        final SnapshotHead head;
-        final Redo base;
-        
-        public Snapshot(SnapshotHead head, Redo base) {
-            this.head = head;
-            this.base = base;
-        }
-
-        public Snapshot open() {
-            head.open(base);
-            return this;
-        }
-        
-        public void close() {
-            synchronized(TRANSACTION_MUTEX) {
-                head.close(base);
-            }
-        }
-    }
-    
-    /**
-     * Provides a snapshot view of the page file.
-     *  
-     * @author chirino
-     */
-    final class SnapshotHead extends RedoEntry {
-        final Redo parent;
-        
-        public SnapshotHead(Redo parent) {
-            this.parent = parent;
-        }
-
-        /** The number of times this snapshot has been opened. */
-        protected int references;
-        
-        public String toString() { 
-            return "{ references: "+this.references+" }";
-        }
-
-        SnapshotHead isSnapshotHead() {
-            return this;
-        }
-        
-        public void read(int pageId, Buffer buffer) throws IOPagingException {
-            pageId = mapPageId(pageId);
-            pageFile.read(pageId, buffer);
-        }
-
-        public ByteBuffer slice(int pageId, int count) {
-            pageId = mapPageId(pageId);
-            return pageFile.slice(SliceType.READ, pageId, count);
-        }
-        
-        public void open(Redo base) {
-            references++;
-            while( true ) {
-                base.references++;
-                if(base == parent ) {
-                    break;
-                }
-                base = base.getNext();
-            }
-        }
-        
-        public void close(Redo base) {
-            references--;
-            while( true ) {
-                base.references--;
-                if(base == parent ) {
-                    break;
-                }
-                base = base.getNext();
-            }
-
-            if( references==0 ) {
-                unlink();
-                // TODO: trigger merging of adjacent commits. 
-            }
-        }
-
-        public int mapPageId(int page) {
-            // Look for the page in the previous commits..
-            Redo curRedo = parent;
-            RedoEntry curEntry = getPrevious();
-            while( true ) {
-                if( curRedo.isPerformed() ) {
-                    break;
-                }
-                
-                while( curEntry!=null ) {
-                    Commit commit = curEntry.isCommit();
-                    if( commit !=null ) {
-                        Update update = commit.updates.get(page);
-                        if( update!=null ) {
-                            return update.page();
-                        }
-                    }
-                    curEntry = curEntry.getPrevious();
-                }
-                
-                curRedo = curRedo.getPrevious();
-                if( curRedo==null ) {
-                    break;
-                }
-                curEntry = curRedo.entries.getTail();
-            }
-            return page;
-        }
-        
-        
-        public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
-            Redo curRedo = parent;
-            RedoEntry curEntry = getPrevious();
-            while( true ) {
-                if( curRedo.isPerformed() ) {
-                    break;
-                }
-                
-                while( curEntry!=null ) {
-                    Commit commit = curEntry.isCommit();
-                    if( commit !=null ) {
-                        Update update = commit.updates.get(page);
-                        if( update!=null ) {
-                            DeferredUpdate du  = update.deferredUpdate();
-                            if (du!=null) {
-                                return du.<T>value();
-                            }
-                        }
-                    }
-                    curEntry = curEntry.getPrevious();
-                }
-                
-                curRedo = curRedo.getPrevious();
-                if( curRedo==null ) {
-                    break;
-                }
-                curEntry = curRedo.entries.getTail();
-            }
-            return readCache.cacheLoad(marshaller, page);
-        }
-        
-        public long commitCheck(Map<Integer, Update> pageUpdates) {
-            long rc=0;
-            Redo curRedo = parent;
-            RedoEntry curEntry = getNext();
-            while( true ) {
-                while( curEntry!=null ) {
-                    Commit commit = curEntry.isCommit();
-                    if( commit!=null ) {
-                        rc = commit.commitCheck(pageUpdates);
-                    }
-                    curEntry = curEntry.getNext();
-                }
-                
-                curRedo = curRedo.getNext();
-                if( curRedo==null ) {
-                    break;
-                }
-                curEntry = curRedo.entries.getHead();
-            }
-            return rc;
-        }
-                
-    }
-    
-    /**
-     * Aggregates a group of commits so that they can be more efficiently operated against.
-     * 
-     */
-    static class Redo extends LinkedNode<Redo> implements Externalizable, Iterable<Commit> {
-        private static final long serialVersionUID = 1188640492489990493L;
-        
-        /** the pageId that this redo batch is stored at */
-        private  int page=-1;
-        /** points to a previous redo batch page */
-        public int previous=-1;
-        /** was the redo loaded in the {@link recover} method */
-        private boolean recovered;
-        
-        /** the commits and snapshots in the redo */ 
-        private final LinkedNodeList<RedoEntry> entries = new LinkedNodeList<RedoEntry>();
-        /** tracks how many snapshots are referencing the redo */
-        private int references;
-        /** the oldest commit in this redo */
-        public long base=-1;
-        /** the newest commit in this redo */
-        public long head;
-
-        private boolean performed;
-        
-        public Redo() {
-        }
-        
-        public boolean isPerformed() {
-            return performed;
-        }
-
-        public Redo(long head) {
-            this.head = head;
-        }
-
-        public String toString() { 
-            return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+references+", entries: "+entries.size()+" }";
-        }
-        
-        @Override
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeLong(head);
-            out.writeLong(base);
-            out.writeInt(previous);
-
-            // Only need to store the commits.
-            ArrayList<Commit> l = new ArrayList<Commit>();
-            for (Commit commit : this) {
-                l.add(commit);
-            }
-            out.writeObject(l);
-        }
-
-        @SuppressWarnings("unchecked")
-        @Override
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            head = in.readLong();
-            base = in.readLong();
-            previous = in.readInt();
-            ArrayList<Commit> l = (ArrayList<Commit>) in.readObject();
-            for (Commit commit : l) {
-                entries.addLast(commit);
-            }
-        }        
-
-        public int pageCount() {
-            int rc = 0;
-            for (Commit commit : this) {
-                rc += commit.updates.size();
-            }
-            return rc;
-        }
-        
-        @Override
-        public Iterator<Commit> iterator() {
-            return new Iterator<Commit>() {
-                Commit next = nextCommit(entries.getHead());
-                Commit last;
-                
-                @Override
-                public boolean hasNext() {
-                    return next!=null;
-                }
-
-                @Override
-                public Commit next() {
-                    if( next==null ) {
-                        throw new NoSuchElementException();
-                    }
-                    last = next;
-                    next = nextCommit(next.getNext());
-                    return last;
-                }
-
-                @Override
-                public void remove() {
-                    if( last==null ) {
-                        throw new IllegalStateException();
-                    }
-                    last.unlink();
-                }
-            };
-        }
-
-
-        private Commit nextCommit(RedoEntry entry) {
-            while( entry != null ) {
-                Commit commit = entry.isCommit();
-                if( commit!=null ) {
-                    return commit;
-                }
-                entry = entry.getNext();
-            }
-            return null;
-        }
-
-        public void performDefferedUpdates(Paged pageFile) {            
-            for (Commit commit : this) {
-                if( commit.updates != null ) {
-                    for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
-                        DeferredUpdate du = entry.getValue().deferredUpdate();
-                        if( du == null ) {
-                            continue;
-                        }
-                        if( du.wasDeferredClear() ) {
-                            List<Integer> freePages = du.marshaller.remove(pageFile, du.page);
-                            for (Integer page : freePages) {
-                                commit.merge(pageFile.allocator(), page, Update.update(page).freed());
-                            }
-                        } else if( du.wasDeferredStore() ) {
-                            List<Integer> allocatedPages = du.store(pageFile);
-                            for (Integer page : allocatedPages) {
-                                // add any allocated pages to the update list so that the free 
-                                // list gets properly adjusted.
-                                commit.merge(pageFile.allocator(), page, Update.update(page).allocated());
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-        public void freeRedoSpace(SimpleAllocator allocator) {
-            for (Commit commit : this) {
-                for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
-                    int key = entry.getKey();
-                    Update value = entry.getValue();
-                    if( value.wasFreed() ) {
-                        allocator.free(key, 1);
-                    } else if( key != value.page ) {
-                        // need to free the udpate page..
-                        allocator.free(value.page, 1);
-                    }
-                }
-            }
-        }
-
-    }
-
     private final MemoryMappedFile file;
     final SimpleAllocator allocator;
     final PageFile pageFile;
     private static final int updateBatchSize = 1024;
     private final boolean synch;
 
-
     /** The header structure of the file */
     private final Header header = new Header();
     
     int lastRedoPage = -1;
     
-    private final LinkedNodeList<Redo> redos = new LinkedNodeList<Redo>();
+    private final LinkedNodeList<Batch> redos = new LinkedNodeList<Batch>();
     
     //
     // The following Redo objects point to linked nodes in the previous redo list.  
@@ -579,22 +119,22 @@
     //
     
     /** The current redo that is currently being built */
-    Redo buildingRedo;
+    Batch buildingRedo;
     /** The stored redos.  These might be be recoverable. */
-    Redo storedRedos;
+    Batch storedRedos;
     /** The synced redos.  A file sync occurred after these redos were stored. */
-    Redo syncedRedos;
+    Batch syncedRedos;
     /** The performed redos.  Updates are actually performed to the original page file. */
-    Redo performedRedos;
+    Batch performedRedos;
     
     /** Used as read cache */
-    private ReadCache readCache = new ReadCache();
+    ReadCache readCache = new ReadCache();
 
     /** Mutex for data structures which are used during house keeping tasks like redo management. Once acquired, you can also acquire the TRANSACTION_MUTEX */
     private final Object HOUSE_KEEPING_MUTEX = "HOUSE_KEEPING_MUTEX";
 
     /** Mutex for data structures which transaction threads access. Never attempt to acquire the HOUSE_KEEPING_MUTEX once this mutex is acquired.  */
-    private final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX";
+    final Object TRANSACTION_MUTEX = "TRANSACTION_MUTEX";
     
 
     /**
@@ -638,10 +178,10 @@
      * @param to
      * @return string representation of the redo items from the specified redo up to (exclusive) the specified redo.
      */
-    private String toString(Redo from, Redo to) {
+    private String toString(Batch from, Batch to) {
         StringBuilder rc = new StringBuilder();
         rc.append("[ ");
-        Redo t = from;
+        Batch t = from;
         while( t!=null && t!=to ) {
             if( t!=from ) {
                 rc.append(", ");
@@ -679,7 +219,7 @@
                 
                 // Note: every deferred update has an entry in the pageUpdates, so no need to 
                 // check to see if that map also conflicts.
-                rev = snapshot.head.commitCheck(pageUpdates);
+                rev = snapshot.getHead().commitCheck(pageUpdates);
                 snapshot.close();
             } else {
                 rev = buildingRedo.head;
@@ -692,7 +232,7 @@
             buildingRedo.head = rev;
             
             Commit commit=null;
-            RedoEntry last = buildingRedo.entries.getTail();
+            BatchEntry last = buildingRedo.entries.getTail();
             if( last!=null ) {
                 commit = last.isCommit();
             }
@@ -726,7 +266,7 @@
     public void reset() {
         synchronized (HOUSE_KEEPING_MUTEX) {
             redos.clear();
-            performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1);
+            performedRedos = syncedRedos = storedRedos = buildingRedo = new Batch(-1);
             redos.addFirst(buildingRedo);
             
             lastRedoPage = -1;
@@ -756,7 +296,7 @@
         synchronized (HOUSE_KEEPING_MUTEX) {
 
             redos.clear();
-            performedRedos = syncedRedos = storedRedos = buildingRedo = new Redo(-1);
+            performedRedos = syncedRedos = storedRedos = buildingRedo = new Batch(-1);
             redos.addFirst(buildingRedo);
             lastRedoPage = -1;
             readCache.clear();
@@ -797,7 +337,7 @@
                 }
                 
                 
-                Redo redo = loadObject(pageId); 
+                Batch redo = loadObject(pageId); 
                 redo.page = pageId;
                 redo.recovered = true;
                 Extent.unfree(pageFile, pageId);
@@ -851,14 +391,14 @@
      * Attempts to perform a redo state change: building -> stored
      */
     private void storeRedos(boolean force) {
-        Redo redo;
+        Batch redo;
         
         // We synchronized /w the transactions so that they see the state change.
         synchronized (TRANSACTION_MUTEX) {
             // Re-checking since storing the redo may not be needed.
             if( (force && buildingRedo.base!=-1 ) || buildingRedo.pageCount() > updateBatchSize ) {
                 redo = buildingRedo;
-                buildingRedo = new Redo(redo.head);
+                buildingRedo = new Batch(redo.head);
                 redos.addLast(buildingRedo);
             } else {
                 return;
@@ -898,7 +438,7 @@
 
         // Update the base_revision with the last performed revision.
         if (performedRedos!=syncedRedos) {
-            Redo lastPerformedRedo = syncedRedos.getPrevious();
+            Batch lastPerformedRedo = syncedRedos.getPrevious();
             h.base_revision.set(lastPerformedRedo.head);
         }
 
@@ -906,7 +446,7 @@
         if (storedRedos!=buildingRedo) {
             
             // The last stored is actually synced now..
-            Redo lastStoredRedo = buildingRedo.getPrevious();
+            Batch lastStoredRedo = buildingRedo.getPrevious();
             // Let the header know about it..
             h.redo_page.set(lastStoredRedo.page);
             
@@ -964,7 +504,7 @@
               
         // The last performed redo MIGHT still have an open snapshot.
         // we can't transition from synced, until that snapshot closes.
-        Redo lastPerformed = syncedRedos.getPrevious();
+        Batch lastPerformed = syncedRedos.getPrevious();
         if( lastPerformed!=null && lastPerformed.references!=0) {
             return;
         }
@@ -1038,19 +578,19 @@
             SnapshotHead head=null;
 
             // re-use the last entry if it was a snapshot head..
-            RedoEntry entry = buildingRedo.entries.getTail();
+            BatchEntry entry = buildingRedo.entries.getTail();
             if( entry!=null ) {
                 head = entry.isSnapshotHead();
             }
             
             if( head == null ) {
                 // create a new snapshot head entry..
-                head = new SnapshotHead(buildingRedo);
+                head = new SnapshotHead(this, buildingRedo);
                 buildingRedo.entries.addLast(head);
             }
             
             // Open the snapshot off that head position.
-            return new Snapshot(head, syncedRedos).open();
+            return new Snapshot(this, head, syncedRedos).open();
         }
     }
     
@@ -1136,8 +676,7 @@
     final class ReadCache {
         private final Map<Integer, Object> map = Collections.synchronizedMap(new LRUCache<Integer, Object>(1024));
 
-        @SuppressWarnings("unchecked")
-        private <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
+        @SuppressWarnings("unchecked") <T> T cacheLoad(EncoderDecoder<T> marshaller, int pageId) {
             T rc = (T) map.get(pageId);
             if( rc ==null ) {
                 rc = marshaller.load(pageFile, pageId);
@@ -1149,144 +688,5 @@
         public void clear() {
             map.clear();
         }        
-    }
-    
-    public static final byte PAGE_ALLOCATED   = 0x01 << 0;
-    public static final byte PAGE_FREED       = 0x01 << 1;
-    public static final byte PAGE_STORE       = 0x01 << 2;
-    public static final byte PAGE_CLEAR       = 0x01 << 3;
-    
-    static class Update implements Externalizable {
-
-        private static final long serialVersionUID = -1128410792448869134L;
-        
-        byte flags;
-        int page;
-       
-        public Update() {
-        }
-        
-        public Update(Update udpate) {
-            this.page = udpate.page;
-            this.flags = (byte) (udpate.flags & (PAGE_ALLOCATED|PAGE_FREED));
-        }
-
-        public Update(int page) {
-            this.page = page;
-        }
-
-        public static Update update(Update update) {
-            return new Update(update);
-        }
-        public static Update update(int page) {
-            return new Update(page);
-        }
-        
-        public int page() {
-            if( wasFreed() ) {
-                throw new PagingException("You should never try to read or write page that has been freed.");
-            }
-            return page;
-        }
-
-        public DeferredUpdate deferredUpdate() {
-            return null;
-        }
-        
-        public Update allocated() {
-            flags = (byte) ((flags & ~PAGE_FREED) | PAGE_ALLOCATED);
-            return this;
-        }
-        
-        public Update freed() {
-            flags = (byte) ((flags & ~PAGE_ALLOCATED) | PAGE_FREED);
-            return this;
-        }
-
-        public boolean wasFreed() {
-            return (flags & PAGE_FREED)!=0 ;
-        }
-        public boolean wasAllocated() {
-            return (flags & PAGE_ALLOCATED)!=0;
-        }
-        public boolean wasDeferredStore() {
-            return (flags & PAGE_STORE)!=0 ;
-        }
-        public boolean wasDeferredClear() {
-            return (flags & PAGE_CLEAR)!=0;
-        }
-        
-        @Override
-        public String toString() {
-            return "{ page: "+page+", flags: "+flags+" }";
-        }
-
-        @Override
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            page = in.readInt();
-            flags = in.readByte();
-        }
-
-        @Override
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(page);
-            out.writeByte(flags);
-        }
-        
-    }
-    
-    
-    @SuppressWarnings("serial")
-    static class DeferredUpdate extends Update {
-        EncoderDecoder<?> marshaller;
-        Object value;
-
-        public DeferredUpdate(Update update) {
-            super(update);
-        }
-        public DeferredUpdate(int page) {
-            super(page);
-        }
-        
-        public static DeferredUpdate deferred(int page) {
-            return new DeferredUpdate(page);
-        }
-        
-        public static DeferredUpdate deferred(Update update) {
-            return new DeferredUpdate(update);
-        }
-
-        public DeferredUpdate deferredUpdate() {
-            return this;
-        }
-        
-        public DeferredUpdate store(Object value, EncoderDecoder<?> marshaller) {
-            this.value = value;
-            this.marshaller = marshaller;
-            flags = (byte) ((flags & ~PAGE_CLEAR) | PAGE_STORE);
-            return this;
-        }
-
-        public DeferredUpdate clear(EncoderDecoder<?> marshaller) {
-            this.marshaller= marshaller;
-            this.value=null;
-            flags = (byte) ((flags & ~PAGE_STORE) | PAGE_CLEAR);
-            return this;
-        }
-        
-        @SuppressWarnings("unchecked")
-        <T> T value() {
-            return (T) value;
-        }
-        
-        @SuppressWarnings("unchecked")
-        public List<Integer> store(Paged paged) {
-            return ((EncoderDecoder)marshaller).store(paged, page, value);
-        }
-
-        public Object writeReplace() throws ObjectStreamException {
-            return new Update(this);
-        }
-        
     }    
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java?rev=829298&r1=829297&r2=829298&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFileFactory.java Sat Oct 24 02:14:21 2009
@@ -32,7 +32,7 @@
     }
     
     public HawtPageFileFactory() {
-        super.setHeaderSize(HawtPageFile.HEADER_SIZE);
+        super.setHeaderSize(HawtPageFile.FILE_HEADER_SIZE);
     }
     
     @Override

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java?rev=829298&r1=829297&r2=829298&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java Sat Oct 24 02:14:21 2009
@@ -16,9 +16,6 @@
  */
 package org.apache.hawtdb.internal.page;
 
-import static org.apache.hawtdb.internal.page.HawtPageFile.DeferredUpdate.deferred;
-import static org.apache.hawtdb.internal.page.HawtPageFile.Update.update;
-
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -30,9 +27,10 @@
 import org.apache.hawtdb.api.OutOfSpaceException;
 import org.apache.hawtdb.api.PagingException;
 import org.apache.hawtdb.api.Transaction;
-import org.apache.hawtdb.internal.page.HawtPageFile.DeferredUpdate;
-import org.apache.hawtdb.internal.page.HawtPageFile.Snapshot;
-import org.apache.hawtdb.internal.page.HawtPageFile.Update;
+
+import static org.apache.hawtdb.internal.page.Update.*;
+import static org.apache.hawtdb.internal.page.DeferredUpdate.*;
+
 /**
  * Transaction objects are NOT thread safe. Users of this object should
  * guard it from concurrent access.
@@ -110,7 +108,7 @@
         }
         
         // No?  Then ask the snapshot to load the object.
-        return snapshot().head.cacheLoad(marshaller, page);
+        return snapshot().getHead().cacheLoad(marshaller, page);
     }
 
     public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
@@ -166,7 +164,7 @@
             parent.pageFile.read(update.page(), buffer);
         } else {
             // Get the data from the snapshot.
-            snapshot().head.read(pageId, buffer);
+            snapshot().getHead().read(pageId, buffer);
         }
     }
 
@@ -178,7 +176,7 @@
                 return parent.pageFile.slice(type, udpate.page(), count);
             } else {
                 // Get the data from the snapshot.
-                return snapshot().head.slice(page, count);
+                return snapshot().getHead().slice(page, count);
             }
             
         } else {
@@ -186,7 +184,7 @@
             if (update == null) {
                 update = update(parent.allocator.alloc(count)).allocated();
                 if (type==SliceType.READ_WRITE) {
-                    ByteBuffer slice = snapshot().head.slice(page, count);
+                    ByteBuffer slice = snapshot().getHead().slice(page, count);
                     try {
                         parent.pageFile.write(update.page, slice);
                     } finally { 

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Snapshot.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final class Snapshot {
+
+    private final HawtPageFile parent;
+    private final SnapshotHead head;
+    private final Batch base;
+    
+    Snapshot(HawtPageFile hawtPageFile, SnapshotHead head, Batch base) {
+        parent = hawtPageFile;
+        this.head = head;
+        this.base = base;
+    }
+
+    Snapshot open() {
+        head.open(base);
+        return this;
+    }
+    
+    void close() {
+        synchronized(parent.TRANSACTION_MUTEX) {
+            head.close(base);
+        }
+    }
+
+    SnapshotHead getHead() {
+        return head;
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SnapshotHead.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.IOPagingException;
+import org.apache.hawtdb.api.Paged.SliceType;
+
+/**
+ * 
+ * A SnapshotHead is BatchEntry and stored in a Batch, in the same
+ * list as the Commit objects.  It's main purpose is to separate 
+ * commits so that Commits after snapshot do not get merged with commits
+ * before the snapshot.
+ * 
+ * A SnapshotHead allows transactions to get a point in time view of the 
+ * page file.
+ *  
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final class SnapshotHead extends BatchEntry {
+    /**
+     * 
+     */
+    private final HawtPageFile hawtPageFile;
+    final Batch parent;
+    
+    public SnapshotHead(HawtPageFile hawtPageFile, Batch parent) {
+        this.hawtPageFile = hawtPageFile;
+        this.parent = parent;
+    }
+
+    /** The number of times this snapshot has been opened. */
+    protected int references;
+    
+    public String toString() { 
+        return "{ references: "+this.references+" }";
+    }
+
+    SnapshotHead isSnapshotHead() {
+        return this;
+    }
+    
+    public void read(int pageId, Buffer buffer) throws IOPagingException {
+        pageId = mapPageId(pageId);
+        this.hawtPageFile.pageFile.read(pageId, buffer);
+    }
+
+    public ByteBuffer slice(int pageId, int count) {
+        pageId = mapPageId(pageId);
+        return this.hawtPageFile.pageFile.slice(SliceType.READ, pageId, count);
+    }
+    
+    public void open(Batch base) {
+        references++;
+        while( true ) {
+            base.references++;
+            if(base == parent ) {
+                break;
+            }
+            base = base.getNext();
+        }
+    }
+    
+    public void close(Batch base) {
+        references--;
+        while( true ) {
+            base.references--;
+            if(base == parent ) {
+                break;
+            }
+            base = base.getNext();
+        }
+
+        if( references==0 ) {
+            unlink();
+            // TODO: trigger merging of adjacent commits. 
+        }
+    }
+
+    public int mapPageId(int page) {
+        // Look for the page in the previous commits..
+        Batch curRedo = parent;
+        BatchEntry curEntry = getPrevious();
+        while( true ) {
+            if( curRedo.isPerformed() ) {
+                break;
+            }
+            
+            while( curEntry!=null ) {
+                Commit commit = curEntry.isCommit();
+                if( commit !=null ) {
+                    Update update = commit.updates.get(page);
+                    if( update!=null ) {
+                        return update.page();
+                    }
+                }
+                curEntry = curEntry.getPrevious();
+            }
+            
+            curRedo = curRedo.getPrevious();
+            if( curRedo==null ) {
+                break;
+            }
+            curEntry = curRedo.entries.getTail();
+        }
+        return page;
+    }
+    
+    
+    public <T> T cacheLoad(EncoderDecoder<T> marshaller, int page) {
+        Batch curRedo = parent;
+        BatchEntry curEntry = getPrevious();
+        while( true ) {
+            if( curRedo.isPerformed() ) {
+                break;
+            }
+            
+            while( curEntry!=null ) {
+                Commit commit = curEntry.isCommit();
+                if( commit !=null ) {
+                    Update update = commit.updates.get(page);
+                    if( update!=null ) {
+                        DeferredUpdate du  = update.deferredUpdate();
+                        if (du!=null) {
+                            return du.<T>value();
+                        }
+                    }
+                }
+                curEntry = curEntry.getPrevious();
+            }
+            
+            curRedo = curRedo.getPrevious();
+            if( curRedo==null ) {
+                break;
+            }
+            curEntry = curRedo.entries.getTail();
+        }
+        return this.hawtPageFile.readCache.cacheLoad(marshaller, page);
+    }
+    
+    public long commitCheck(Map<Integer, Update> pageUpdates) {
+        long rc=0;
+        Batch curRedo = parent;
+        BatchEntry curEntry = getNext();
+        while( true ) {
+            while( curEntry!=null ) {
+                Commit commit = curEntry.isCommit();
+                if( commit!=null ) {
+                    rc = commit.commitCheck(pageUpdates);
+                }
+                curEntry = curEntry.getNext();
+            }
+            
+            curRedo = curRedo.getNext();
+            if( curRedo==null ) {
+                break;
+            }
+            curEntry = curRedo.entries.getHead();
+        }
+        return rc;
+    }
+            
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java?rev=829298&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java (added)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/Update.java Sat Oct 24 02:14:21 2009
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.page;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+
+import org.apache.hawtdb.api.PagingException;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Update implements Externalizable {
+    
+    public static final byte PAGE_ALLOCATED   = 0x01 << 0;
+    public static final byte PAGE_FREED       = 0x01 << 1;
+    public static final byte PAGE_STORE       = 0x01 << 2;
+    public static final byte PAGE_CLEAR       = 0x01 << 3;
+
+    private static final long serialVersionUID = -1128410792448869134L;
+    
+    byte flags;
+    int page;
+
+    public Update() {
+    }
+    
+    public Update(Update udpate) {
+        this.page = udpate.page;
+        this.flags = (byte) (udpate.flags & (PAGE_ALLOCATED|PAGE_FREED));
+    }
+
+    public Update(int page) {
+        this.page = page;
+    }
+
+    public static Update update(Update update) {
+        return new Update(update);
+    }
+    public static Update update(int page) {
+        return new Update(page);
+    }
+    
+    public int page() {
+        if( wasFreed() ) {
+            throw new PagingException("You should never try to read or write page that has been freed.");
+        }
+        return page;
+    }
+
+    public DeferredUpdate deferredUpdate() {
+        return null;
+    }
+    
+    public Update allocated() {
+        flags = (byte) ((flags & ~PAGE_FREED) | PAGE_ALLOCATED);
+        return this;
+    }
+    
+    public Update freed() {
+        flags = (byte) ((flags & ~PAGE_ALLOCATED) | PAGE_FREED);
+        return this;
+    }
+
+    public boolean wasFreed() {
+        return (flags & PAGE_FREED)!=0 ;
+    }
+    public boolean wasAllocated() {
+        return (flags & PAGE_ALLOCATED)!=0;
+    }
+    public boolean wasDeferredStore() {
+        return (flags & PAGE_STORE)!=0 ;
+    }
+    public boolean wasDeferredClear() {
+        return (flags & PAGE_CLEAR)!=0;
+    }
+    
+    @Override
+    public String toString() {
+        return "{ page: "+page+", flags: "+flags+" }";
+    }
+
+    @Override
+    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        page = in.readInt();
+        flags = in.readByte();
+    }
+
+    @Override
+    public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeInt(page);
+        out.writeByte(flags);
+    }
+    
+}
\ No newline at end of file



Mime
View raw message