activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r829190 - in /activemq/sandbox/activemq-apollo: activemq-util/src/main/java/org/apache/activemq/util/ hawtdb/src/main/java/org/apache/hawtdb/api/ hawtdb/src/main/java/org/apache/hawtdb/internal/index/ hawtdb/src/main/java/org/apache/hawtdb/...
Date Fri, 23 Oct 2009 19:34:52 GMT
Author: chirino
Date: Fri Oct 23 19:34:49 2009
New Revision: 829190

URL: http://svn.apache.org/viewvc?rev=829190&view=rev
Log:
Integrated how deferred updates are tracked with the standard update tracking.


Added:
    activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java
Modified:
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/HashIndexFactory.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Index.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Paged.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java
    activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java

Added: activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java?rev=829190&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java (added)
+++ activemq/sandbox/activemq-apollo/activemq-util/src/main/java/org/apache/activemq/util/StringSupport.java Fri Oct 23 19:34:49 2009
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.util;
+
+import java.util.Arrays;
+
+/**
+ * Helper class to hold common text/string manipulation methods.
+ *  
+ * @author chirino
+ */
+public class StringSupport {
+
+    public static String indent(String value, int spaces) {
+        String indent = fillString(spaces, ' ');
+        return value.replaceAll("(\\r?\\n)", "$1"+indent);
+    }
+
+    public static String fillString(int count, char character) {
+        char t[] = new char[count];
+        Arrays.fill(t, character);
+        return new String(t);
+    }
+    
+}

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/HashIndexFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/HashIndexFactory.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/HashIndexFactory.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/HashIndexFactory.java Fri Oct 23 19:34:49 2009
@@ -37,6 +37,7 @@
     private int maximumBucketCapacity = DEFAULT_MAXIMUM_BUCKET_CAPACITY;
     private int minimumBucketCapacity = DEFAULT_MINIMUM_BUCKET_CAPACITY;
     private int loadFactor = DEFAULT_LOAD_FACTOR;
+    private boolean deferredEncoding=true;
 
     public Index<Key, Value> open(Paged paged, int page) {
         return docreate(paged, page).open();
@@ -111,6 +112,15 @@
     public void setFixedCapacity(int value) {
         this.minimumBucketCapacity = this.maximumBucketCapacity = this.initialBucketCapacity = value;
     }
+
+    public boolean isDeferredEncoding() {
+        return deferredEncoding;
+    }
+
+    public void setDeferredEncoding(boolean deferredEncoding) {
+        this.deferredEncoding = deferredEncoding;
+    }
+    
     
     
 }
\ No newline at end of file

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Index.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Index.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Index.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Index.java Fri Oct 23 19:34:49 2009
@@ -74,6 +74,8 @@
     
     int size();
     
+    boolean isEmpty();
+    
     /**
      * @param tx
      * @return

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Paged.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Paged.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Paged.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/api/Paged.java Fri Oct 23 19:34:49 2009
@@ -124,6 +124,6 @@
      * @param page
      * @return
      */
-    <T> void remove(EncoderDecoder<T> encoderDecoder, int page);
+    <T> void clear(EncoderDecoder<T> encoderDecoder, int page);
 
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeIndex.java Fri Oct 23 19:34:49 2009
@@ -89,9 +89,13 @@
     public Value remove(Key key) {
         return root().remove(this, key);
     }
+    
+    public int size() {
+        return root().size(this);
+    }
 
-    public boolean isTransient() {
-        return false;
+    public boolean isEmpty() {
+        return root().isEmpty(this);
     }
 
     public void clear() {
@@ -238,7 +242,7 @@
     
     void free( BTreeNode<Key, Value> node ) {
         if( deferredEncoding ) {
-            paged.remove(DATA_ENCODER_DECODER, node.page);
+            paged.clear(DATA_ENCODER_DECODER, node.page);
         } else {
             if ( node.storedInExtent ) {
                 DATA_ENCODER_DECODER.remove(paged, node.page);
@@ -275,8 +279,4 @@
         // TODO Auto-generated method stub
     }
 
-    public int size() {
-        return root().size(this);
-    }
-
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java Fri Oct 23 19:34:49 2009
@@ -589,6 +589,10 @@
             }
         }
         return rc;
+    }
+    
+    public boolean isEmpty(BTreeIndex<Key, Value> index) {
+        return data.keys.length==0;
     }    
 
     public int getMaxLeafDepth(BTreeIndex<Key, Value> index, int depth) {

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java Fri Oct 23 19:34:49 2009
@@ -17,8 +17,10 @@
 package org.apache.hawtdb.internal.index;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -28,6 +30,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hawtdb.api.BTreeIndexFactory;
+import org.apache.hawtdb.api.EncoderDecoder;
 import org.apache.hawtdb.api.HashIndexFactory;
 import org.apache.hawtdb.api.IOPagingException;
 import org.apache.hawtdb.api.Index;
@@ -43,127 +46,6 @@
 public class HashIndex<Key,Value> implements Index<Key,Value> {
     
     private static final Log LOG = LogFactory.getLog(HashIndex.class);
-
-    /** 
-     * This is the data stored in the index header.  It knows where
-     * the hash buckets are stored at an keeps usage statistics about
-     * those buckets. 
-     */
-    static private class Buckets<Key,Value> {
-        
-        public static final int HEADER_SIZE = 16;
-        public static final Buffer MAGIC = new Buffer(new byte[] {'h', 'a', 's', 'h'});
-
-        final HashIndex<Key,Value> index;
-        int bucketsPage=-1;
-        int active;
-        int capacity;
-        int size;
-        
-        int increaseThreshold;
-        int decreaseThreshold;
-
-        public Buckets(HashIndex<Key, Value> index) {
-            this.index = index;
-        }
-
-        private void calcThresholds() {
-            increaseThreshold = (capacity * index.loadFactor)/100;
-            decreaseThreshold = (capacity * index.loadFactor * index.loadFactor ) / 20000;
-        }
-
-        void create(int capacity) {
-            this.size = 0;
-            this.active = 0;
-            this.capacity = capacity;
-            this.bucketsPage = index.paged.allocator().alloc(capacity);
-            for (int i = 0; i < capacity; i++) {
-                index.BIN_FACTORY.create(index.paged, (bucketsPage + i));
-            }
-            calcThresholds();
-            store();
-        }
-        
-        public void destroy() {
-            clear();
-            index.paged.allocator().free(bucketsPage, capacity);
-        }
-        
-        public void clear() {
-            for (int i = 0; i < index.buckets.capacity; i++) {
-                index.buckets.bucket(i).clear();
-            }
-            index.buckets.size = 0;
-            index.buckets.active = 0;
-            index.buckets.calcThresholds();
-        }
-
-        void store() {
-            DataByteArrayOutputStream os = new DataByteArrayOutputStream(HEADER_SIZE);
-            writeExternal(os);
-            Buffer buffer = os.toBuffer();
-            index.paged.write(index.page, buffer);
-        }
-
-        void load() {
-            Buffer buffer = new Buffer(HEADER_SIZE);
-            index.paged.read(index.page, buffer);
-            DataByteArrayInputStream is = new DataByteArrayInputStream(buffer);
-            readExternal(is);
-        }
-        
-        private void writeExternal(DataByteArrayOutputStream os) {
-            try {
-                Buffer magic2 = MAGIC;
-                os.write(magic2.data, MAGIC.offset, MAGIC.length);
-                os.writeInt(this.bucketsPage);
-                os.writeInt(this.capacity);
-                os.writeInt(this.size);
-                os.writeInt(this.active);
-            } catch (IOException e) {
-                throw new IOPagingException(e);
-            }
-        }
-        
-        private void readExternal(DataByteArrayInputStream is) {
-            Buffer magic = new Buffer(MAGIC.length);
-            is.readFully(magic.data, magic.offset, magic.length);
-            if (!magic.equals(MAGIC)) {
-                throw new IndexException("Not a hash page");
-            }
-            this.bucketsPage = is.readInt();
-            this.capacity = is.readInt();
-            this.size = is.readInt();
-            this.active = is.readInt();
-        }        
-
-        
-        Index<Key,Value> bucket(int bucket) {
-            return index.BIN_FACTORY.open(index.paged, bucketsPage+bucket);
-        }
-
-        Index<Key,Value> bucket(Key key) {
-            int i = index(key);
-            return index.BIN_FACTORY.open(index.paged, bucketsPage+i);
-        }
-
-        int index(Key x) {
-            try {
-                return Math.abs(x.hashCode()%capacity);
-            } catch (ArithmeticException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-                throw e;
-            }
-        }
-        
-        @Override
-        public String toString() {
-            return "{ page:"+bucketsPage+", size: "+size+", capacity: "+capacity+", active: "+active+", increase threshold: "+increaseThreshold+", decrease threshold: "+decreaseThreshold+" }";
-        }
-
-    }
-    
     private final BTreeIndexFactory<Key, Value> BIN_FACTORY = new BTreeIndexFactory<Key, Value>();
     
     private final Paged paged;
@@ -172,6 +54,7 @@
     private final int minimumBucketCapacity;
     private final int loadFactor;
     private final int initialBucketCapacity;
+    private final boolean deferredEncoding;
 
     private Buckets<Key,Value> buckets;
 
@@ -181,24 +64,26 @@
         this.maximumBucketCapacity = factory.getMaximumBucketCapacity();
         this.minimumBucketCapacity = factory.getMinimumBucketCapacity();
         this.loadFactor = factory.getLoadFactor();
+        this.deferredEncoding = factory.isDeferredEncoding();
         this.initialBucketCapacity = factory.getBucketCapacity();
         this.BIN_FACTORY.setKeyMarshaller(factory.getKeyMarshaller());
         this.BIN_FACTORY.setValueMarshaller(factory.getValueMarshaller());
-        this.BIN_FACTORY.setDeferredEncoding(false);
+        this.BIN_FACTORY.setDeferredEncoding(this.deferredEncoding);
     }
 
     public HashIndex<Key, Value> create() {
         buckets = new Buckets<Key, Value>(this);
         buckets.create(initialBucketCapacity);
+        storeBuckets();
         return this;
     }
 
     public HashIndex<Key, Value> open() {
-        buckets = new Buckets<Key, Value>(this);
-        buckets.load();
+        loadBuckets();
         return this;
     }
 
+
     public Value get(Key key) {
         return buckets.bucket(key).get(key);
     }
@@ -210,22 +95,18 @@
     public Value put(Key key, Value value) {
         Index<Key, Value> bucket = buckets.bucket(key);
 
-        int originalSize = bucket.size();
+        boolean wasEmpty = bucket.isEmpty();
         Value put = bucket.put(key,value);
-        int newSize = bucket.size();
 
-        if (newSize != originalSize) {
-            buckets.size++;
-            if (newSize == 1) {
-                buckets.active++;
-            }
-            buckets.store();
+        if (wasEmpty) {
+            buckets.active++;
+            storeBuckets();
         }
         
         if (buckets.active >= buckets.increaseThreshold) {
-            newSize = Math.min(this.maximumBucketCapacity, buckets.capacity*4);
-            if(buckets.capacity!=newSize) {
-                this.changeCapacity(newSize);
+            int capacity = Math.min(this.maximumBucketCapacity, buckets.capacity * 4);
+            if (buckets.capacity != capacity) {
+                this.changeCapacity(capacity);
             }
         }
         return put;
@@ -233,22 +114,19 @@
     
     public Value remove(Key key) {
         Index<Key, Value> bucket = buckets.bucket(key);
-        int originalSize = bucket.size();
+
         Value rc = bucket.remove(key);
-        int newSize = bucket.size();
+        boolean isEmpty = bucket.isEmpty();
         
-        if (newSize != originalSize) {
-            buckets.size--;
-            if (newSize == 0) {
-                buckets.active--;
-            }
-            buckets.store();
+        if (isEmpty) {
+            buckets.active--;
+            storeBuckets();
         }
 
         if (buckets.active <= buckets.decreaseThreshold) {
-            newSize = Math.max(minimumBucketCapacity, buckets.capacity/2);
-            if(buckets.capacity!=newSize) {
-                changeCapacity(newSize);
+            int capacity = Math.max(minimumBucketCapacity, buckets.capacity / 2);
+            if (buckets.capacity != capacity) {
+                changeCapacity(capacity);
             }
         }
         return rc;
@@ -266,9 +144,17 @@
     }
     
     public int size() {
-        return buckets.size;
+        int rc=0;
+        for (int i = 0; i < buckets.capacity; i++) {
+            rc += buckets.bucket(i).size();
+        }
+        return rc;
     }
     
+    public boolean isEmpty() {
+        return buckets.active==0;
+    }    
+    
     public void destroy() {
         buckets.destroy();
         buckets = null;
@@ -300,14 +186,150 @@
                 }
             }
         }
-        next.size = buckets.size;
         
         buckets.destroy();
         buckets = next;
+        storeBuckets();
+        
         LOG.debug("Resizing done.  New bins start at: "+buckets.bucketsPage);        
     }
 
     public String toString() {
         return "{ page: "+page+", buckets: "+buckets+" }";
     }
+    
+    private void storeBuckets() {
+        if( deferredEncoding ) {
+            paged.put(BUCKET_ENCODER_DECODER, page, buckets);
+        } else {
+            BUCKET_ENCODER_DECODER.store(paged, page, buckets);
+        }
+    }
+    
+    private void loadBuckets() {
+        if( deferredEncoding ) {
+            buckets = paged.get(BUCKET_ENCODER_DECODER, page);
+        } else {
+            buckets = BUCKET_ENCODER_DECODER.load(paged, page);
+        }
+    }
+    
+    // /////////////////////////////////////////////////////////////////
+    // Helper classes
+    // /////////////////////////////////////////////////////////////////
+
+    /** 
+     * This is the data stored in the index header.  It knows where
+     * the hash buckets are stored at an keeps usage statistics about
+     * those buckets. 
+     */
+    static private class Buckets<Key,Value> {
+
+        final HashIndex<Key,Value> index;
+        int bucketsPage=-1;
+        int active;
+        int capacity;
+        
+        int increaseThreshold;
+        int decreaseThreshold;
+
+        public Buckets(HashIndex<Key, Value> index) {
+            this.index = index;
+        }
+
+        private void calcThresholds() {
+            increaseThreshold = (capacity * index.loadFactor)/100;
+            decreaseThreshold = (capacity * index.loadFactor * index.loadFactor ) / 20000;
+        }
+
+        void create(int capacity) {
+            this.active = 0;
+            this.capacity = capacity;
+            this.bucketsPage = index.paged.allocator().alloc(capacity);
+            for (int i = 0; i < capacity; i++) {
+                index.BIN_FACTORY.create(index.paged, (bucketsPage + i));
+            }
+            calcThresholds();
+        }
+        
+        public void destroy() {
+            clear();
+            index.paged.allocator().free(bucketsPage, capacity);
+        }
+        
+        public void clear() {
+            for (int i = 0; i < index.buckets.capacity; i++) {
+                index.buckets.bucket(i).clear();
+            }
+            index.buckets.active = 0;
+            index.buckets.calcThresholds();
+        }
+        
+        Index<Key,Value> bucket(int bucket) {
+            return index.BIN_FACTORY.open(index.paged, bucketsPage+bucket);
+        }
+
+        Index<Key,Value> bucket(Key key) {
+            int i = index(key);
+            return index.BIN_FACTORY.open(index.paged, bucketsPage+i);
+        }
+
+        int index(Key x) {
+            return Math.abs(x.hashCode()%capacity);
+        }
+        
+        @Override
+        public String toString() {
+            return "{ page:"+bucketsPage+", capacity: "+capacity+", active: "+active+", increase threshold: "+increaseThreshold+", decrease threshold: "+decreaseThreshold+" }";
+        }
+        
+    }
+
+    public static final Buffer MAGIC = new Buffer(new byte[] {'h', 'a', 's', 'h'});
+    public static final int HEADER_SIZE = MAGIC.length+ 12; // bucketsPage, capacity, active
+
+    private final EncoderDecoder<Buckets<Key, Value>> BUCKET_ENCODER_DECODER = new EncoderDecoder<Buckets<Key, Value>>() {
+        @Override
+        public List<Integer> store(Paged paged, int page, Buckets<Key, Value> buckets) {
+            DataByteArrayOutputStream os = new DataByteArrayOutputStream(HEADER_SIZE);
+            try {
+                Buffer magic2 = MAGIC;
+                os.write(magic2.data, MAGIC.offset, MAGIC.length);
+                os.writeInt(buckets.bucketsPage);
+                os.writeInt(buckets.capacity);
+                os.writeInt(buckets.active);
+            } catch (IOException e) {
+                throw new IOPagingException(e);
+            }
+            
+            Buffer buffer = os.toBuffer();
+            paged.write(page, buffer);
+            return Collections.emptyList();
+        }
+        @Override
+        public Buckets<Key, Value> load(Paged paged, int page) {
+            Buckets<Key, Value> buckets = new Buckets<Key, Value>(HashIndex.this);
+            Buffer buffer = new Buffer(HEADER_SIZE);
+            paged.read(page, buffer);
+            DataByteArrayInputStream is = new DataByteArrayInputStream(buffer);
+            
+            Buffer magic = new Buffer(MAGIC.length);
+            is.readFully(magic.data, magic.offset, magic.length);
+            if (!magic.equals(MAGIC)) {
+                throw new IndexException("Not a hash page");
+            }
+            buckets.bucketsPage = is.readInt();
+            buckets.capacity = is.readInt();
+            buckets.active = is.readInt();
+            
+            return buckets;
+        }
+        
+        @Override
+        public List<Integer> remove(Paged paged, int page) {
+            return Collections.emptyList();
+        }
+        
+    };
+
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtPageFile.java Fri Oct 23 19:34:49 2009
@@ -22,7 +22,7 @@
 import java.io.ObjectInputStream;
 import java.io.ObjectOutput;
 import java.io.ObjectOutputStream;
-import java.io.Serializable;
+import java.io.ObjectStreamException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -139,17 +139,14 @@
         private long head;
         /** all the page updates that are part of the redo */
         private ConcurrentHashMap<Integer, Update> updates;
-        /** the deferred updates that need to be done in this redo */
-        private ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates;
 
 
         public Commit() {
         }
         
-        public Commit(long version, ConcurrentHashMap<Integer, Update> updates, ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates) {
+        public Commit(long version, ConcurrentHashMap<Integer, Update> updates) {
             this.head = this.base = version;
             this.updates = updates;
-            this.deferredUpdates = deferredUpdates;
         }
         
         
@@ -161,8 +158,7 @@
         
         public String toString() { 
             int updateSize = updates==null ? 0 : updates.size();
-            int cacheSize = deferredUpdates==null ? 0 : deferredUpdates.size();
-            return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+", cache: "+cacheSize+" }";
+            return "{ base: "+this.base+", head: "+this.head+", updates: "+updateSize+" }";
         }
 
         public long commitCheck(Map<Integer, Update> newUpdate) {
@@ -174,34 +170,9 @@
             return head;
         }
 
-        public void merge(Allocator allocator, long rev, ConcurrentHashMap<Integer, Update> updates, ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates) {
+        public void merge(Allocator allocator, long rev, ConcurrentHashMap<Integer, Update> updates) {
             assert head+1 == rev;
             head=rev;
-            if (deferredUpdates != null) {
-                if (this.deferredUpdates == null) {
-                    this.deferredUpdates = deferredUpdates;
-                } else {
-                    for (Entry<Integer, DeferredUpdate> entry : deferredUpdates.entrySet()) {
-                        Integer page = entry.getKey();
-                        DeferredUpdate du = entry.getValue();
-                        if (du.value == null) {
-                            this.deferredUpdates.remove(page);
-                        } else {
-                            DeferredUpdate previous = this.deferredUpdates.put(page, du);
-                            // TODO: There was a previous deferred update in the redo...  we can just use it's 
-                            // redo allocation and release the new allocation.
-                            if (previous != null) {
-                                Update allocated = updates.remove(page);
-                                assert allocated.update_location == du.page; // these should match...
-                                allocator.free(du.page, 1);
-                                // since we replaced the previous entry,  
-                                du.page = previous.page;
-                            }
-                        }
-                    }
-                }
-            }
-            
             // merge all the entries in the update..
             for (Entry<Integer, Update> entry : updates.entrySet()) {
                 merge(allocator, entry.getKey(), entry.getValue());
@@ -217,25 +188,30 @@
         private void merge(Allocator allocator, int page, Update update) {
             Update previous = this.updates.put(page, update);
             if (previous != null) {
+                
                 if( update.wasFreed() ) {
                     // we can undo the previous update
-                    if( previous.update_location != page ) {
-                        allocator.free(previous.update_location, 1);
+                    if( previous.page != page ) {
+                        allocator.free(previous.page, 1);
                     }
                     if( previous.wasAllocated() ) {
                         allocator.free(page, 1);
                     }
                     this.updates.remove(page);
-                } else {
-                    // we are undoing the previous update /w this new update.
-                    if( previous.update_location != page ) {
-                        allocator.free(previous.update_location, 1);
-                    }
-                    // we may be updating a previously allocated page,
-                    // if so we need to mark the new page as allocated too.
-                    if( previous.wasAllocated() ) {
-                        update.flags = PAGE_ALLOCATED;
-                    }                    
+                    
+                    // No other merging is needed now..
+                    return;
+                }
+                
+                // we are undoing the previous update /w this new update.
+                if( previous.page != page ) {
+                    allocator.free(previous.page, 1);
+                }
+                
+                // we may be updating a previously allocated page,
+                // if so we need to mark the new page as allocated too.
+                if( previous.wasAllocated() ) {
+                    update.allocated();
                 }
             }
         }
@@ -379,9 +355,12 @@
                 while( curEntry!=null ) {
                     Commit commit = curEntry.isCommit();
                     if( commit !=null ) {
-                        DeferredUpdate du  = commit.deferredUpdates.get(page);
-                        if (du!=null) {
-                            return du.<T>value();
+                        Update update = commit.updates.get(page);
+                        if( update!=null ) {
+                            DeferredUpdate du  = update.deferredUpdate();
+                            if (du!=null) {
+                                return du.<T>value();
+                            }
                         }
                     }
                     curEntry = curEntry.getPrevious();
@@ -428,17 +407,16 @@
         private static final long serialVersionUID = 1188640492489990493L;
         
         /** the pageId that this redo batch is stored at */
-        private transient int page=-1;
+        private  int page=-1;
         /** points to a previous redo batch page */
         public int previous=-1;
         /** was the redo loaded in the {@link recover} method */
-        private transient boolean recovered;
+        private boolean recovered;
         
         /** the commits and snapshots in the redo */ 
-        private transient LinkedNodeList<RedoEntry> entries = new LinkedNodeList<RedoEntry>();
+        private final LinkedNodeList<RedoEntry> entries = new LinkedNodeList<RedoEntry>();
         /** tracks how many snapshots are referencing the redo */
         private int references;
-
         /** the oldest commit in this redo */
         public long base=-1;
         /** the newest commit in this redo */
@@ -458,7 +436,7 @@
         }
 
         public String toString() { 
-            return "{ page: "+this.page+", previous: "+previous+" }";
+            return "{ page: "+this.page+", base: "+base+", head: "+head+", references: "+references+", entries: "+entries.size()+" }";
         }
         
         @Override
@@ -540,20 +518,23 @@
 
         public void performDefferedUpdates(Paged pageFile) {            
             for (Commit commit : this) {
-                if( commit.deferredUpdates != null ) {
-                    for (Entry<Integer, DeferredUpdate> entry : commit.deferredUpdates.entrySet()) {
-                        DeferredUpdate cu = entry.getValue();
-                        if( cu.value == null ) {
-                            List<Integer> freePages = cu.marshaller.remove(pageFile, cu.page);
+                if( commit.updates != null ) {
+                    for (Entry<Integer, Update> entry : commit.updates.entrySet()) {
+                        DeferredUpdate du = entry.getValue().deferredUpdate();
+                        if( du == null ) {
+                            continue;
+                        }
+                        if( du.wasDeferredClear() ) {
+                            List<Integer> freePages = du.marshaller.remove(pageFile, du.page);
                             for (Integer page : freePages) {
-                                commit.merge(pageFile.allocator(), page, Update.freed(page));
+                                commit.merge(pageFile.allocator(), page, Update.update(page).freed());
                             }
-                        } else {
-                            List<Integer> allocatedPages = cu.store(pageFile);
+                        } else if( du.wasDeferredStore() ) {
+                            List<Integer> allocatedPages = du.store(pageFile);
                             for (Integer page : allocatedPages) {
                                 // add any allocated pages to the update list so that the free 
                                 // list gets properly adjusted.
-                                commit.merge(pageFile.allocator(), page, Update.allocated(page));
+                                commit.merge(pageFile.allocator(), page, Update.update(page).allocated());
                             }
                         }
                     }
@@ -568,9 +549,9 @@
                     Update value = entry.getValue();
                     if( value.wasFreed() ) {
                         allocator.free(key, 1);
-                    } else if( key != value.update_location ) {
+                    } else if( key != value.page ) {
                         // need to free the udpate page..
-                        allocator.free(value.update_location, 1);
+                        allocator.free(value.page, 1);
                     }
                 }
             }
@@ -638,17 +619,18 @@
     
     @Override
     public String toString() {
-        return "{ allocator: "+allocator
-        +", synch: "+synch
-        +", read cache size: "+readCache.map.size()
-        +", base revision free pages: "+baseRevisionFreePages + ",\n"
-        + "  redos: {\n" 
-        + "    performed: "+toString(performedRedos, syncedRedos) + ",\n" 
-        + "    synced: "+toString(syncedRedos, storedRedos) + ",\n" 
-        + "    stored: "+toString(storedRedos, buildingRedo)+ ",\n" 
-        + "    building: "+toString(buildingRedo, null)+ ",\n"
-        + "  }"        
-        + "}";
+        return "{\n" +
+        		"  allocator: "+allocator+ ",\n"+
+        		"  synch: "+synch+ ",\n"+
+        		"  read cache size: "+readCache.map.size()+ ",\n"+
+        		"  base revision free pages: "+baseRevisionFreePages + ",\n"+
+        		"  redos: {\n"+ 
+        		"    performed: "+toString(performedRedos, syncedRedos) + ",\n"+ 
+        		"    synced: "+toString(syncedRedos, storedRedos) + ",\n"+
+        		"    stored: "+toString(storedRedos, buildingRedo)+ ",\n"+
+        		"    building: "+toString(buildingRedo, null)+ ",\n"+
+        		"  }"+ "\n"+
+        		"}";
     }
 
     /** 
@@ -682,7 +664,7 @@
      * @param pageUpdates
      * @param deferredUpdates
      */
-    void commit(Snapshot snapshot, ConcurrentHashMap<Integer, Update> pageUpdates, ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates) {
+    void commit(Snapshot snapshot, ConcurrentHashMap<Integer, Update> pageUpdates) {
         
         boolean fullRedo=false;
         synchronized (TRANSACTION_MUTEX) {
@@ -717,9 +699,9 @@
             
             if( commit!=null ) {
                 // TODO: figure out how to do the merge outside the TRANSACTION_MUTEX
-                commit.merge(pageFile.allocator(), rev, pageUpdates, deferredUpdates);
+                commit.merge(pageFile.allocator(), rev, pageUpdates);
             } else {
-                buildingRedo.entries.addLast(new Commit(rev, pageUpdates, deferredUpdates) );
+                buildingRedo.entries.addLast(new Commit(rev, pageUpdates) );
             }
             
             if( buildingRedo.pageCount() > updateBatchSize ) {
@@ -758,7 +740,7 @@
             Header h = header();
             h.setByteBufferPosition(0);
             h.magic.set(MAGIC);
-            h.base_revision.set(0);
+            h.base_revision.set(-1);
             h.free_list_page.set(-1);
             h.page_size.set(pageFile.getPageSize());
             h.reserved.set("");
@@ -797,9 +779,24 @@
                 baseRevisionFreePages.add(0, allocator.getLimit());
             }
             
-            // Load the redo batches.
+            boolean consistencyCheckNeeded=true;
+            int last_synced_redo = h.redo_page.get();
             pageId = h.unsynced_redo_page.get();
-            while( pageId >= 0 ) {
+            if( pageId<0 ) {
+                pageId = last_synced_redo;
+                consistencyCheckNeeded = false;
+            }
+            while( true ) {
+                if( pageId < 0 ) {
+                    break;
+                }
+
+                if( consistencyCheckNeeded ) {
+                    // TODO: when consistencyCheckNeeded==true, then we need to check the
+                    // Consistency of the redo, as it may have been partially written to disk.
+                }
+                
+                
                 Redo redo = loadObject(pageId); 
                 redo.page = pageId;
                 redo.recovered = true;
@@ -809,17 +806,17 @@
                     buildingRedo.head = redo.head;
                 }
     
-                pageId=-1;
                 if( baseRevision < redo.head ) {
-                    
                     // add first since we are loading redo objects oldest to youngest
                     // but want to put them in the list youngest to oldest.
                     redos.addFirst(redo);
-                    syncedRedos = redo;
-                    
-                    if( baseRevision < redo.base ) {
-                        pageId=redo.previous;
+                    performedRedos = syncedRedos = redo;
+                    pageId=redo.previous;
+                    if( pageId==last_synced_redo ) {
+                        consistencyCheckNeeded = false;
                     }
+                } else {
+                    break;
                 }
             }
             
@@ -980,7 +977,7 @@
                     int page = entry.getKey();
                     Update update = entry.getValue();
                     
-                    if( page != update.update_location ) {
+                    if( page != update.page ) {
                         
                         if( syncedRedos.recovered ) {
                             // If we are recovering, the allocator MIGHT not have this 
@@ -991,7 +988,7 @@
                         
                         // Perform the update by copying the updated page the original
                         // page location.
-                        ByteBuffer slice = pageFile.slice(SliceType.READ, update.update_location, 1);
+                        ByteBuffer slice = pageFile.slice(SliceType.READ, update.page, 1);
                         try {
                             pageFile.write(page, slice);
                         } finally { 
@@ -1154,85 +1151,142 @@
         }        
     }
     
-    final static class DeferredUpdate {
+    public static final byte PAGE_ALLOCATED   = 0x01 << 0;
+    public static final byte PAGE_FREED       = 0x01 << 1;
+    public static final byte PAGE_STORE       = 0x01 << 2;
+    public static final byte PAGE_CLEAR       = 0x01 << 3;
+    
+    static class Update implements Externalizable {
+
+        private static final long serialVersionUID = -1128410792448869134L;
+        
+        byte flags;
         int page;
-        Object value;
-        EncoderDecoder<?> marshaller;
+       
+        public Update() {
+        }
+        
+        public Update(Update udpate) {
+            this.page = udpate.page;
+            this.flags = (byte) (udpate.flags & (PAGE_ALLOCATED|PAGE_FREED));
+        }
 
-        public DeferredUpdate(int page, Object value, EncoderDecoder<?> marshaller) {
+        public Update(int page) {
             this.page = page;
-            this.value = value;
-            this.marshaller = marshaller;
+        }
+
+        public static Update update(Update update) {
+            return new Update(update);
+        }
+        public static Update update(int page) {
+            return new Update(page);
+        }
+        
+        public int page() {
+            if( wasFreed() ) {
+                throw new PagingException("You should never try to read or write page that has been freed.");
+            }
+            return page;
+        }
+
+        public DeferredUpdate deferredUpdate() {
+            return null;
+        }
+        
+        public Update allocated() {
+            flags = (byte) ((flags & ~PAGE_FREED) | PAGE_ALLOCATED);
+            return this;
+        }
+        
+        public Update freed() {
+            flags = (byte) ((flags & ~PAGE_ALLOCATED) | PAGE_FREED);
+            return this;
+        }
+
+        public boolean wasFreed() {
+            return (flags & PAGE_FREED)!=0 ;
+        }
+        public boolean wasAllocated() {
+            return (flags & PAGE_ALLOCATED)!=0;
+        }
+        public boolean wasDeferredStore() {
+            return (flags & PAGE_STORE)!=0 ;
+        }
+        public boolean wasDeferredClear() {
+            return (flags & PAGE_CLEAR)!=0;
         }
         
         @Override
         public String toString() {
-            return "{ page: "+page+", removed: "+(value==null)+" }";
+            return "{ page: "+page+", flags: "+flags+" }";
         }
 
-        public void reset(Object value, EncoderDecoder<?> marshaller) {
-            this.value = value;
-            this.marshaller = marshaller;
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            page = in.readInt();
+            flags = in.readByte();
         }
 
-        @SuppressWarnings("unchecked")
-        <T> T value() {
-            return (T) value;
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.writeInt(page);
+            out.writeByte(flags);
         }
         
-        @SuppressWarnings("unchecked")
-        public List<Integer> store(Paged paged) {
-            return ((EncoderDecoder)marshaller).store(paged, page, value);
-        }
     }
     
-    public static final byte PAGE_ALLOCATED = 1;
-    public static final byte PAGE_FREED = 2;
     
-    final static class Update implements Serializable {
+    @SuppressWarnings("serial")
+    static class DeferredUpdate extends Update {
+        EncoderDecoder<?> marshaller;
+        Object value;
 
-        private static final long serialVersionUID = -1128410792448869134L;
+        public DeferredUpdate(Update update) {
+            super(update);
+        }
+        public DeferredUpdate(int page) {
+            super(page);
+        }
         
-        byte flags;
-        final int update_location;
-       
-        public Update(int updateLocation, byte flags) {
-            this.update_location = updateLocation;
-            this.flags = flags;
+        public static DeferredUpdate deferred(int page) {
+            return new DeferredUpdate(page);
         }
-
-        public static Update updated(int page) {
-            return new Update(page, (byte) 0);
+        
+        public static DeferredUpdate deferred(Update update) {
+            return new DeferredUpdate(update);
         }
 
-        public static Update allocated(int page) {
-            return new Update(page, PAGE_ALLOCATED);
+        public DeferredUpdate deferredUpdate() {
+            return this;
         }
-
-        public static Update freed(int page) {
-            return new Update(page, PAGE_FREED);
+        
+        public DeferredUpdate store(Object value, EncoderDecoder<?> marshaller) {
+            this.value = value;
+            this.marshaller = marshaller;
+            flags = (byte) ((flags & ~PAGE_CLEAR) | PAGE_STORE);
+            return this;
         }
 
-        public boolean wasFreed() {
-            return flags == PAGE_FREED;
+        public DeferredUpdate clear(EncoderDecoder<?> marshaller) {
+            this.marshaller= marshaller;
+            this.value=null;
+            flags = (byte) ((flags & ~PAGE_STORE) | PAGE_CLEAR);
+            return this;
         }
         
-        public boolean wasAllocated() {
-            return flags == PAGE_ALLOCATED;
+        @SuppressWarnings("unchecked")
+        <T> T value() {
+            return (T) value;
         }
         
-        public int page() {
-            if( wasFreed() ) {
-                throw new PagingException("You should never try to read or write page that has been freed.");
-            }
-            return update_location;
+        @SuppressWarnings("unchecked")
+        public List<Integer> store(Paged paged) {
+            return ((EncoderDecoder)marshaller).store(paged, page, value);
         }
 
-        @Override
-        public String toString() {
-            return "{ page: "+update_location+", flags: "+flags+" }";
+        public Object writeReplace() throws ObjectStreamException {
+            return new Update(this);
         }
-
-    }
-    
+        
+    }    
 }

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/HawtTransaction.java Fri Oct 23 19:34:49 2009
@@ -16,9 +16,13 @@
  */
 package org.apache.hawtdb.internal.page;
 
+import static org.apache.hawtdb.internal.page.HawtPageFile.DeferredUpdate.deferred;
+import static org.apache.hawtdb.internal.page.HawtPageFile.Update.update;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.activemq.util.StringSupport;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.hawtdb.api.Allocator;
 import org.apache.hawtdb.api.EncoderDecoder;
@@ -29,7 +33,6 @@
 import org.apache.hawtdb.internal.page.HawtPageFile.DeferredUpdate;
 import org.apache.hawtdb.internal.page.HawtPageFile.Snapshot;
 import org.apache.hawtdb.internal.page.HawtPageFile.Update;
-
 /**
  * Transaction objects are NOT thread safe. Users of this object should
  * guard it from concurrent access.
@@ -49,7 +52,6 @@
         parent = concurrentPageFile;
     }
 
-    private ConcurrentHashMap<Integer, DeferredUpdate> deferredUpdates;
     private ConcurrentHashMap<Integer, Update> updates;
     private Snapshot snapshot;
     
@@ -59,7 +61,7 @@
             // TODO: this is not a very efficient way to handle allocation ranges.
             int end = pageId+count;
             for (int key = pageId; key < end; key++) {
-                Update previous = getUpdates().put(key, Update.freed(key));
+                Update previous = getUpdates().put(key, update(key).freed() );
                 if( previous!=null && previous.wasAllocated() ) {
                     getUpdates().remove(key);
                     HawtTransaction.this.parent.allocator.free(key, 1);
@@ -72,7 +74,7 @@
             // TODO: this is not a very efficient way to handle allocation ranges.
             int end = pageId+count;
             for (int key = pageId; key < end; key++) {
-                getUpdates().put(key, Update.allocated(key));
+                getUpdates().put(key, update(key).allocated() );
             }
             return pageId;
         }
@@ -97,9 +99,14 @@
 
     public <T> T get(EncoderDecoder<T> marshaller, int page) {
         // Perhaps the page was updated in the current transaction...
-        DeferredUpdate rc = deferredUpdates == null ? null : deferredUpdates.get(page);
-        if( rc != null ) {
-            return rc.<T>value();
+        Update update = updates == null ? null : updates.get(page);
+        if( update != null ) {
+            DeferredUpdate deferred = update.deferredUpdate();
+            if( deferred != null ) {
+                return deferred.<T>value();
+            } else {
+                throw new PagingException("That page was updated with the 'put' method.");
+            }
         }
         
         // No?  Then ask the snapshot to load the object.
@@ -107,40 +114,44 @@
     }
 
     public <T> void put(EncoderDecoder<T> marshaller, int page, T value) {
-        Update update = getUpdates().get(page);
+        ConcurrentHashMap<Integer, Update> updates = getUpdates();
+        Update update = updates.get(page);
         if (update == null) {
             // This is the first time this transaction updates the page...
             snapshot();
-            update = Update.allocated(parent.allocator.alloc(1));
-            getUpdates().put(page, update);
-            getCacheUpdates().put(page, new HawtPageFile.DeferredUpdate(update.update_location, value, marshaller));
+            updates.put(page, deferred(parent.allocator.alloc(1)).store(value, marshaller).allocated() );
         } else {
             // We have updated it before...
             if( update.wasFreed() ) {
-                throw new PagingException("You should never try to write a page that has been freed.");
+                throw new PagingException("You should never try to update a page that has been freed.");
             }
-            if( update.wasAllocated() ) {
-                getCacheUpdates().put(page, new HawtPageFile.DeferredUpdate(page, value, marshaller));
-            } else {
-                DeferredUpdate cu = getCacheUpdates().get(page);
-                if( cu == null ) {
-                    throw new PagingException("You should never try to store mix using the cached objects with normal page updates.");
-                }
-                cu.reset(value, marshaller);
+            
+            DeferredUpdate deferred = update.deferredUpdate();
+            if( deferred==null ) {
+                deferred = deferred(update);
+                updates.put(page, deferred);
             }
+            deferred.store(value, marshaller);
         }
     }
 
-    public <T> void remove(EncoderDecoder<T> marshaller, int page) {
-        DeferredUpdate deferredUpdate = getCacheUpdates().remove(page);
-        if( deferredUpdate==null ) {
-            // add a deferred update to remove the value.
-            getCacheUpdates().put(page, new DeferredUpdate(page, null, marshaller));
+    public <T> void clear(EncoderDecoder<T> marshaller, int page) {
+        ConcurrentHashMap<Integer, Update> updates = getUpdates();
+        Update update = updates.get(page);
+        
+        if( update == null ) {
+            updates.put(page, deferred(page).clear(marshaller) );
         } else {
-            if( deferredUpdate.value == null ) {
-                // undo.. user error.
-                getCacheUpdates().put(deferredUpdate.page, deferredUpdate);
-                throw new PagingException("You should never try to remove a page that has been removed.");
+            if( update.wasDeferredStore() ) {
+                if( update.wasAllocated() ) {
+                    // this transaction created it.. so it should remove it..
+                    updates.put(page, update(update));
+                } else { 
+                    // was an update of a previous location.... 
+                    updates.put(page, deferred(page).clear(marshaller));
+                }
+            } else {
+                throw new PagingException("You should never try to clear a page that was not put.");
             }
         }
     }
@@ -173,11 +184,11 @@
         } else {
             Update update = getUpdates().get(page);
             if (update == null) {
-                update = Update.allocated(parent.allocator.alloc(count));
+                update = update(parent.allocator.alloc(count)).allocated();
                 if (type==SliceType.READ_WRITE) {
                     ByteBuffer slice = snapshot().head.slice(page, count);
                     try {
-                        parent.pageFile.write(update.update_location, slice);
+                        parent.pageFile.write(update.page, slice);
                     } finally { 
                         parent.pageFile.unslice(slice);
                     }
@@ -185,11 +196,11 @@
                 
                 int end = page+count;
                 for (int i = page; i < end; i++) {
-                    getUpdates().put(i, Update.allocated(i));
+                    getUpdates().put(i, update(i).allocated());
                 }
                 getUpdates().put(page, update);
                 
-                return parent.pageFile.slice(type, update.update_location, count);
+                return parent.pageFile.slice(type, update.page, count);
             }
             return parent.pageFile.slice(type, update.page(), count);
         }
@@ -205,7 +216,7 @@
         if (update == null) {
             // We are updating an existing page in the snapshot...
             snapshot();
-            update = Update.allocated(parent.allocator.alloc(1));
+            update = update(parent.allocator.alloc(1)).allocated();
             getUpdates().put(page, update);
         }
         parent.pageFile.write(update.page(), buffer);
@@ -217,7 +228,7 @@
         try {
             if (updates!=null) {
                 // If the commit is successful it will release our snapshot..
-                parent.commit(snapshot, updates, deferredUpdates);
+                parent.commit(snapshot, updates);
             }
             failed = false;
         } finally {
@@ -227,7 +238,6 @@
                 rollback();
             }
             updates = null;
-            deferredUpdates = null;
             snapshot = null;
         }
     }
@@ -237,7 +247,7 @@
             if (updates!=null) {
                 for (Update update : updates.values()) {
                     if( !update.wasFreed() ) {
-                        parent.allocator.free(update.update_location, 1);
+                        parent.allocator.free(update.page, 1);
                     }
                 }
             }
@@ -247,7 +257,6 @@
                 snapshot = null;
             }
             updates = null;
-            deferredUpdates = null;
         }
     }
 
@@ -262,13 +271,6 @@
         return updates == null;
     }
 
-    public ConcurrentHashMap<Integer, DeferredUpdate> getCacheUpdates() {
-        if( deferredUpdates==null ) {
-            deferredUpdates = new ConcurrentHashMap<Integer, DeferredUpdate>();
-        }
-        return deferredUpdates;
-    }
-
     private ConcurrentHashMap<Integer, Update> getUpdates() {
         if (updates == null) {
             updates = new ConcurrentHashMap<Integer, Update>();
@@ -282,7 +284,11 @@
 
     public String toString() { 
         int updatesSize = updates==null ? 0 : updates.size();
-        return "{ snapshot: "+this.snapshot+", updates: "+updatesSize+", parent: "+parent+" }";
+        return "{ \n" +
+        	   "  snapshot: "+this.snapshot+", \n"+
+        	   "  updates: "+updatesSize+", \n" +
+        	   "  parent: "+StringSupport.indent(parent.toString(), 2)+"\n" +
+        	   "}";
     }
 
     public int pages(int length) {

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/PageFile.java Fri Oct 23 19:34:49 2009
@@ -109,7 +109,7 @@
         encoderDecoder.store(this, page, value);
     }
 
-    public <T> void remove(EncoderDecoder<T> encoderDecoder, int page) {
+    public <T> void clear(EncoderDecoder<T> encoderDecoder, int page) {
         encoderDecoder.remove(this, page);
     }
 

Modified: activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java?rev=829190&r1=829189&r2=829190&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java (original)
+++ activemq/sandbox/activemq-apollo/hawtdb/src/main/java/org/apache/hawtdb/internal/page/SimpleAllocator.java Fri Oct 23 19:34:49 2009
@@ -56,7 +56,7 @@
 
     
     /**
-     * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#remove(int, int)
+     * @see org.apache.hawtdb.api.hiramchirino.hawtdb.Allocator#clear(int, int)
      */
     synchronized public void free(int pageId, int count) {
         freeRanges.add(pageId, count);



Mime
View raw message