activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r692288 [1/3] - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/ main/java/org/apache/kahadb/impl/ main/java/org/apache/kahadb/impl/async/ main/java/org/apache/kahadb/impl/container/ main/java/org/apache/kahadb/impl/data/ main/...
Date Thu, 04 Sep 2008 22:46:44 GMT
Author: chirino
Date: Thu Sep  4 15:46:42 2008
New Revision: 692288

URL: http://svn.apache.org/viewvc?rev=692288&view=rev
Log:
Removed all the old un-needed kaha bits.
Moved:
  - the index bits to the index package.
  - the AsyncDataManager to the journal package


Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ControlFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAccessorPool.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java   (with props)
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/NIODataFileAppender.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyDataFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ReadOnlyJournal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/package.html   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/HashIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexBenchmark.java   (with props)
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/index/IndexTestSupport.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/JournalTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/journal/NioJournalTest.java
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ContainerId.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/IndexMBean.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/ListContainer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/MapContainer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/RuntimeStoreException.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/Store.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StoreEntry.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StoreFactory.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/StoreLocation.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/DataManager.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/IndexRootContainer.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/KahaStore.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/StoreLockedExcpetion.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/async/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/container/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/data/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/BadMagicException.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/DiskIndexLinkedList.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/Index.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/IndexItem.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/IndexLinkedList.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/IndexManager.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/RedoStoreIndexItem.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/StoreIndexReader.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/StoreIndexWriter.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/VMIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/VMIndexLinkedList.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/hash/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/robusthash/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/index/tree/
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/impl/package.html
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/BTreeNode.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashBin.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/HashIndex.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/Index.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/ListContainerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/LoadTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/Loader.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/MapContainerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/StoreTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/VolumeTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/async/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/impl/index/
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/BTreeIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexBenchMark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/HashIndexTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexBenchmark.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/page/IndexTestSupport.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/KahaDBPersistenceAdaptor.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeIndex.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.page.Transaction.Closure;
+
+/**
+ * BTreeIndex represents a Variable Magnitude B+Tree in a Page File.
+ * A BTree is a bit flexible in that it can be used for set or
+ * map-based indexing.  Leaf nodes are linked together for faster
+ * iteration of the values. 
+ *
+ * <br>
+ * The Variable Magnitude attribute means that the BTree attempts
+ * to store as many values and pointers on one page as is possible.
+ * 
+ * <br>
+ * The implementation can optionally a be Simple-Prefix B+Tree.
+ * 
+ * <br>
+ * For those who don't know how a Simple-Prefix B+Tree works, the primary
+ * distinction is that instead of promoting actual keys to branch pages,
+ * when leaves are split, a shortest-possible separator is generated at
+ * the pivot.  That separator is what is promoted to the parent branch
+ * (and continuing up the list).  As a result, actual keys and pointers
+ * can only be found at the leaf level.  This also affords the index the
+ * ability to ignore costly merging and redistribution of pages when
+ * deletions occur.  Deletions only affect leaf pages in this
+ * implementation, and so it is entirely possible for a leaf page to be
+ * completely empty after all of its keys have been removed.
+ *
+ * @version $Revision: 541508 $, $Date: 2007-05-24 21:54:12 -0400 (Thu, 24 May 2007) $
+ */
+public class BTreeIndex<Key,Value> implements Index<Key,Value> {
+
+    private static final Log LOG = LogFactory.getLog(BTreeIndex.class);
+
+    /**
+     * Interface used to determine the simple prefix of two keys.
+     *
+     * @version $Revision: 541508 $, $Date: 2007-05-24 21:54:12 -0400 (Thu, 24 May 2007) $
+     */
+    static public interface Prefixer<Key> {
+        
+        /**
+         * This methods should return shortest prefix of value2 where the following still holds:<br/>
+         * value1 <= prefix <= value2.<br/><br/>
+         * 
+         * When this method is called, the following is guaranteed:<br/>
+         * value1 < value2<br/><br/>
+         * 
+         * 
+         * @param value1
+         * @param value2
+         * @return
+         */
+        public Key getSimplePrefix(Key value1, Key value2);
+    }
+    
+    /**
+     * StringPrefixer is a Prefixer implementation that works on strings.
+     */
+    static public class StringPrefixer implements Prefixer<String> {
+        
+        /**
+         * Example:
+         * If value1 is "Hello World"
+         * and value 2 is "Help Me"
+         * then the result will be: "Help"
+         * 
+         * @see  Prefixer#getSimplePrefix
+         */
+        public String getSimplePrefix(String value1, String value2) {
+            char[] c1 = value1.toCharArray();
+            char[] c2 = value2.toCharArray();
+            int n = Math.min(c1.length, c2.length);
+            int i =0;
+            while (i < n) {
+                if (c1[i] != c2[i]) {
+                    return value2.substring(0,i+1);
+                }
+                i++;
+            }
+            
+            if( n == c2.length ) {
+                return value2;
+            }
+            return value2.substring(0,n);
+        }
+    }    
+
+    private final PageFile pageFile;
+    private final long pageId;
+    private AtomicBoolean loaded = new AtomicBoolean();
+    
+    private final BTreeNode.Marshaller<Key, Value> marshaller = new BTreeNode.Marshaller<Key, Value>(this);
+    private Marshaller<Key> keyMarshaller;
+    private Marshaller<Value> valueMarshaller;
+    private Prefixer<Key> prefixer;
+
+    private BTreeNode<Key,Value> root;
+
+    public BTreeIndex(PageFile pageFile, long rootPageId) {
+        this.pageFile = pageFile;
+        this.pageId = rootPageId;
+    }
+
+    public BTreeIndex(PageFile pageFile, Page page) {
+        this(pageFile, page.getPageId());
+    }
+
+    synchronized public void load() throws IOException {
+        if (loaded.compareAndSet(false, true)) {
+            LOG.debug("loading");
+            if( keyMarshaller == null ) {
+                throw new IllegalArgumentException("The keyMarshaller must be set before loading the BTreeIndex");
+            }
+            if( valueMarshaller == null ) {
+                throw new IllegalArgumentException("The valueMarshaller must be set before loading the BTreeIndex");
+            }
+            
+            Transaction tx = pageFile.tx();
+            final Page<BTreeNode<Key,Value>> p = tx.load(pageId, null);
+            if( p.getType() == Page.PAGE_FREE_TYPE ) {
+                 // Need to initialize it..
+                tx.execute(new Transaction.Closure<IOException>(){
+                    public void execute(Transaction tx) throws IOException {
+                        root = createNode(p, null);
+                        storeNode(tx, root, true);
+                    }
+                });
+                pageFile.checkpoint();
+            } else {
+                root = loadNode(tx, pageId, null);    
+            }
+        }
+    }
+    
+    synchronized public void unload() {
+        if (loaded.compareAndSet(true, false)) {
+            root=null;
+        }    
+    }
+    
+    synchronized public boolean containsKey(Transaction tx, Key key) throws IOException {
+        assertLoaded();
+        return root.contains(tx, key);
+    }
+
+    synchronized public Value get(Transaction tx, Key key) throws IOException {
+        assertLoaded();
+        return root.get(tx, key);
+    }
+
+    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
+        assertLoaded();
+        return root.put(tx, key, value);
+    }
+
+    synchronized public Value remove(Transaction tx, Key key) throws IOException {
+        assertLoaded();
+        return root.remove(tx, key);
+    }
+    
+    public boolean isTransient() {
+        return false;
+    }
+
+    synchronized public void clear(Transaction tx) throws IOException {
+        root.clear(tx);
+    }
+
+    synchronized public int getMinLeafDepth(Transaction tx) throws IOException {
+        return root.getMinLeafDepth(tx, 0);
+    }
+
+    synchronized public int getMaxLeafDepth(Transaction tx) throws IOException {
+        return root.getMaxLeafDepth(tx, 0);
+    }
+
+    synchronized public void printStructure(Transaction tx, PrintWriter out) throws IOException {
+        root.printStructure(tx, out, "");
+    }
+    
+    synchronized public void printStructure(Transaction tx, OutputStream out) throws IOException {
+        PrintWriter pw = new PrintWriter(out,false);
+        root.printStructure(tx, pw, "");
+        pw.flush();
+    }
+
+    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
+        return root.iterator(tx);
+    }
+    
+    synchronized public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key initialKey) throws IOException {
+        return root.iterator(tx, initialKey);
+    }
+
+    synchronized Value getFirst(Transaction tx) throws IOException {
+        return root.getFirst(tx);
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal implementation methods
+    ///////////////////////////////////////////////////////////////////
+    
+    private void assertLoaded() throws IllegalStateException {
+        if( !loaded.get() ) {
+            throw new IllegalStateException("The BTreeIndex is not loaded");
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Internal methods made accessible to BTreeNode
+    ///////////////////////////////////////////////////////////////////
+
+    BTreeNode<Key,Value> loadNode(Transaction tx, long pageId, BTreeNode<Key,Value> parent) throws IOException {
+        Page<BTreeNode<Key,Value>> page = tx.load(pageId, marshaller);
+        BTreeNode<Key, Value> node = page.get();
+        node.setPage(page);
+        node.setParent(parent);
+        return node;
+    }
+
+    BTreeNode<Key,Value> createNode(Transaction tx, BTreeNode<Key,Value> parent) throws IOException {
+        Page<BTreeNode<Key,Value>> p = tx.allocate();
+        BTreeNode<Key,Value> node = new BTreeNode<Key,Value>(this);
+        node.setPage(p);
+        node.setParent(parent);
+        node.setEmpty();
+        p.set(node);
+        return node;
+    }
+
+    BTreeNode<Key,Value> createNode(Page<BTreeNode<Key,Value>> p, BTreeNode<Key,Value> parent) throws IOException {
+        BTreeNode<Key,Value> node = new BTreeNode<Key,Value>(this);
+        node.setPage(p);
+        node.setParent(parent);
+        node.setEmpty();
+        p.set(node);
+        return node;
+    }
+    
+    void storeNode(Transaction tx, BTreeNode<Key,Value> node, boolean overflow) throws IOException {
+        tx.store(node.getPage(), marshaller, overflow);
+    }
+        
+   
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors
+    ///////////////////////////////////////////////////////////////////
+
+    public PageFile getPageFile() {
+        return pageFile;
+    }
+    public long getPageId() {
+        return pageId;
+    }
+
+    public Marshaller<Key> getKeyMarshaller() {
+        return keyMarshaller;
+    }
+    public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
+        this.keyMarshaller = keyMarshaller;
+    }
+
+    public Marshaller<Value> getValueMarshaller() {
+        return valueMarshaller;
+    }
+    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
+        this.valueMarshaller = valueMarshaller;
+    }
+
+    public Prefixer<Key> getPrefixer() {
+        return prefixer;
+    }
+    public void setPrefixer(Prefixer<Key> prefixer) {
+        this.prefixer = prefixer;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/BTreeNode.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,722 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.index.BTreeIndex.Prefixer;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.page.Transaction.PageOverflowIOException;
+
+
+/**
+ * The BTreeNode class represents a node in the BTree object graph.  It is stored in 
+ * one Page of a PageFile.
+ */
+public final class BTreeNode<Key,Value> {
+    private static final Log LOG = LogFactory.getLog(BTreeNode.class);
+
+    // The index that this node is part of.
+    private final BTreeIndex<Key,Value> index;
+    // The parent node or null if this is the root node of the BTree
+    private BTreeNode<Key,Value> parent;
+    // The page associated with this node
+    private Page<BTreeNode<Key,Value>> page;
+    
+    // Order list of keys in the node
+    private Key[] keys;
+    // Values associated with the Keys. Null if this is a branch node.
+    private Value[] values;
+    // nodeId pointers to children BTreeNodes. Null if this is a leaf node.
+    private long[] children;
+    // The next leaf node after this one.  Used for fast iteration of the entries.
+    private long next = -1;
+    
+    private final class BTreeIterator implements Iterator<Map.Entry<Key, Value>> {
+        private final Transaction tx;
+        BTreeNode<Key,Value> current;
+        int nextIndex;
+        Map.Entry<Key,Value> nextEntry;
+
+        private BTreeIterator(Transaction tx, BTreeNode<Key,Value> current, int nextIndex) {
+            this.tx = tx;
+            this.current = current;
+            this.nextIndex=nextIndex;
+        }
+
+        synchronized private void findNextPage() {
+            if( nextEntry!=null ) {
+                return;
+            }
+            
+            try {
+                while( current!=null ) {
+                    if( nextIndex >= current.keys.length ) {
+                        // we need to roll to the next leaf..
+                        if( current.next >= 0 ) {
+                            current = index.loadNode(tx, current.next, null);
+                            nextIndex=0;
+                        } else {
+                            break;
+                        }
+                    }  else {
+                        nextEntry = new Map.Entry<Key, Value>() {
+                            private final Key key = current.keys[nextIndex];
+                            private final Value value = current.values[nextIndex];
+                            
+                            public Key getKey() {
+                                return key;
+                            }
+                            public Value getValue() {
+                                return value;
+                            }
+                            public Value setValue(Value value) {
+                                throw new UnsupportedOperationException();
+                            }
+                        };
+                        nextIndex++;
+                        break;
+                    }
+                    
+                }
+            } catch (IOException e) {
+            }
+        }
+
+        public boolean hasNext() {
+            findNextPage();
+            return nextEntry !=null;
+        }
+
+        public Entry<Key, Value> next() {
+            findNextPage(); 
+            if( nextEntry !=null ) {
+                Entry<Key, Value> lastEntry = nextEntry;
+                nextEntry=null;
+                return lastEntry;
+            } else {
+                throw new NoSuchElementException();
+            }
+        }
+
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    /**
+     * The Marshaller is used to store and load the data in the BTreeNode into a Page.
+     *  
+     * @param <Key>
+     * @param <Value>
+     */
+    static public class Marshaller<Key,Value> implements org.apache.kahadb.Marshaller<BTreeNode<Key,Value>> {
+        private final BTreeIndex<Key,Value> index;
+        
+        public Marshaller(BTreeIndex<Key,Value> index) {
+            this.index = index;
+        }
+
+        public Class<BTreeNode<Key,Value>> getType() {
+            return null;
+        }
+        
+        public void writePayload(BTreeNode<Key,Value> node, DataOutput os) throws IOException {
+            // Write the keys
+            short count = (short)node.keys.length; // cast may truncate value...
+            if( count != node.keys.length ) {
+                throw new IOException("Too many keys");
+            }
+            
+            os.writeShort(count);
+            for (int i = 0; i < node.keys.length; i++) {
+                index.getKeyMarshaller().writePayload(node.keys[i], os);
+            }
+            
+            if( node.isBranch() ) {
+                // If this is a branch...
+                os.writeBoolean(true);
+                for (int i = 0; i < count+1; i++) {
+                    os.writeLong(node.children[i]);
+                }
+                
+            } else {
+                // If this is a leaf
+                os.writeBoolean(false);
+                for (int i = 0; i < count; i++) {
+                    index.getValueMarshaller().writePayload(node.values[i], os);
+                }
+                os.writeLong(node.next);
+            }
+        }
+
+        @SuppressWarnings("unchecked")
+        public BTreeNode<Key,Value> readPayload(DataInput is) throws IOException {
+            BTreeNode<Key,Value>  node = new BTreeNode<Key,Value>(index);
+            int count = is.readShort();
+            
+            node.keys = (Key[])new Object[count];
+            for (int i = 0; i < count; i++) {
+                node.keys[i] = index.getKeyMarshaller().readPayload(is);
+            }
+            
+            if( is.readBoolean() ) {
+                node.children = new long[count+1];
+                for (int i = 0; i < count+1; i++) {
+                    node.children[i] = is.readLong();
+                }
+            } else {
+                node.values = (Value[])new Object[count];
+                for (int i = 0; i < count; i++) {
+                    node.values[i] = index.getValueMarshaller().readPayload(is);
+                }
+                node.next = is.readLong();
+            }
+            return node;
+        }
+    }
+
+    public BTreeNode(BTreeIndex<Key,Value> index) {
+        this.index = index;
+    }
+    
+    public void setEmpty() {
+        setLeafData(createKeyArray(0), createValueArray(0));
+    }
+    
+
+    /**
+     * Internal (to the BTreeNode) method. Because this method is called only by
+     * BTreeNode itself, no synchronization done inside of this method.
+     * @throws IOException 
+     */
+    private BTreeNode<Key,Value> getChild(Transaction tx, int idx) throws IOException {
+        if (isBranch() && idx >= 0 && idx < children.length) {
+            BTreeNode<Key, Value> result = this.index.loadNode(tx, children[idx], this);
+            return result;
+        } else {
+            return null;
+        }
+    }
+   
+    public Value remove(Transaction tx, Key key) throws IOException {
+
+        if(isBranch()) {
+            int idx = Arrays.binarySearch(keys, key);
+            idx = idx < 0 ? -(idx + 1) : idx + 1;
+            BTreeNode<Key, Value> child = getChild(tx, idx);
+            if( child.getPageId() == index.getPageId() ) {
+                throw new IOException("BTree corrupted: Cylce detected.");
+            }
+            Value rc = child.remove(tx, key);
+            
+            // child node is now empty.. remove it from the branch node.
+            if( child.keys.length == 0 ) {
+                
+                // If the child node is a branch, promote
+                if( child.isBranch() ) {
+                    // This is cause branches are never really empty.. they just go down to 1 child..
+                    children[idx] = child.children[0];
+                } else {
+                    
+                    // The child was a leaf. Then we need to actually remove it from this branch node..
+
+                    // We need to update the previous child's next pointer to skip over the child being removed....
+                    if( idx > 0 && children.length > 1) {
+                        BTreeNode<Key, Value> previousChild = getChild(tx, idx-1);
+                        previousChild.next = child.next;
+                        index.storeNode(tx, previousChild, true);
+                    }
+                    
+                    if( idx < children.length-1 ) {
+                        // Delete it and key to the right.
+                        setBranchData(arrayDelete(keys, idx), arrayDelete(children, idx));
+                    } else {
+                        // It was the last child.. Then delete it and key to the left
+                        setBranchData(arrayDelete(keys, idx-1), arrayDelete(children, idx));
+                    }
+                    
+                    // If we are the root node, and only have 1 child left.  Then 
+                    // make the root be the leaf node.
+                    if( children.length == 1 && parent==null ) {
+                        child = getChild(tx, 0);
+                        keys = child.keys;
+                        children = child.children;
+                        values = child.values;
+                        // free up the page..
+                        tx.free(child.getPage());
+                    }
+                    
+                }
+                index.storeNode(tx, this, true);
+            }
+            
+            return rc;
+        } else {
+            int idx = Arrays.binarySearch(keys, key);
+            if (idx < 0) {
+                return null;
+            } else {
+                Value oldValue = values[idx];
+                setLeafData(arrayDelete(keys, idx), arrayDelete(values, idx));
+                
+                if( keys.length!=0 ) {
+                    index.storeNode(tx, this, true);
+                } else {
+                    // If this leaf is empty and is not the root node..
+                    if( parent!=null ) {
+                        tx.free(getPage());
+                    }
+                }
+                
+                return oldValue;
+            }
+        }
+    }
+
+    public Value put(Transaction tx, Key key, Value value) throws IOException {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        if( isBranch() ) {
+            return getLeafNode(tx, this, key).put(tx, key, value);
+        } else {
+            int idx = Arrays.binarySearch(keys, key);
+            
+            Value oldValue=null;
+            if (idx >= 0) {
+                // Key was found... Overwrite
+                oldValue = values[idx];
+                values[idx] = value;
+                setLeafData(keys, values);
+            } else {
+                // Key was not found, Insert it
+                idx = -(idx + 1);
+                setLeafData(arrayInsert(keys, key, idx), arrayInsert(values, value, idx));
+            }
+            
+            try {
+                index.storeNode(tx, this, allowOverflow());
+            } catch ( Transaction.PageOverflowIOException e ) {
+                // If we get an overflow 
+                split(tx);
+            }
+            
+            return oldValue;
+        }
+    }
+
+    private void promoteValue(Transaction tx, Key key, long nodeId) throws IOException {
+
+        int idx = Arrays.binarySearch(keys, key);
+        idx = idx < 0 ? -(idx + 1) : idx + 1;
+        setBranchData(arrayInsert(keys, key, idx), arrayInsert(children, nodeId, idx + 1));
+
+        try {
+            index.storeNode(tx, this, allowOverflow());
+        } catch ( Transaction.PageOverflowIOException e ) {
+            split(tx);
+        }
+
+    }
+
+    /**
+     * Internal to the BTreeNode method
+     */
+    private void split(Transaction tx) throws IOException {
+        Key[] leftKeys;
+        Key[] rightKeys;
+        Value[] leftValues=null;
+        Value[] rightValues=null;
+        long[] leftChildren=null;
+        long[] rightChildren=null;
+        Key separator;
+
+        int vc = keys.length;
+        int pivot = vc / 2;
+
+        // Split the node into two nodes
+        if( isBranch() ) {
+
+            leftKeys = createKeyArray(pivot);
+            leftChildren = new long[leftKeys.length + 1];
+            rightKeys = createKeyArray(vc - (pivot + 1));
+            rightChildren = new long[rightKeys.length + 1];
+
+            System.arraycopy(keys, 0, leftKeys, 0, leftKeys.length);
+            System.arraycopy(children, 0, leftChildren, 0, leftChildren.length);
+            System.arraycopy(keys, leftKeys.length + 1, rightKeys, 0, rightKeys.length);
+            System.arraycopy(children, leftChildren.length, rightChildren, 0, rightChildren.length);
+
+            // Is it a Simple Prefix BTree??
+            Prefixer<Key> prefixer = index.getPrefixer();
+            if(prefixer!=null) {
+                separator = prefixer.getSimplePrefix(leftKeys[leftKeys.length - 1], rightKeys[0]);
+            } else {
+                separator = keys[leftKeys.length];
+            }
+                
+            
+        } else {
+
+            leftKeys = createKeyArray(pivot);
+            leftValues = createValueArray(leftKeys.length);
+            rightKeys = createKeyArray(vc - pivot);
+            rightValues = createValueArray(rightKeys.length);
+
+            System.arraycopy(keys, 0, leftKeys, 0, leftKeys.length);
+            System.arraycopy(values, 0, leftValues, 0, leftValues.length);
+            System.arraycopy(keys, leftKeys.length, rightKeys, 0, rightKeys.length);
+            System.arraycopy(values, leftValues.length, rightValues, 0, rightValues.length);
+
+            // separator = getSeparator(leftVals[leftVals.length - 1],
+            // rightVals[0]);
+            separator = rightKeys[0];
+
+        }
+
+        // Promote the pivot to the parent branch
+        if (parent == null) {
+            
+            // This can only happen if this is the root
+            BTreeNode<Key,Value> rNode = this.index.createNode(tx, this);
+            BTreeNode<Key,Value> lNode = this.index.createNode(tx, this);
+
+            if( isBranch() ) {
+                rNode.setBranchData(rightKeys, rightChildren);
+                lNode.setBranchData(leftKeys, leftChildren);
+            } else {
+                rNode.setLeafData(rightKeys, rightValues);
+                lNode.setLeafData(leftKeys, leftValues);
+            }
+
+            Key[] v = createKeyArray(1);
+            v[0]=separator;
+            setBranchData(v, new long[] { lNode.getPageId(), rNode.getPageId() });
+
+            index.storeNode(tx, this, true);
+            index.storeNode(tx, rNode, true);
+            index.storeNode(tx, lNode, true);
+            
+        } else {
+            BTreeNode<Key,Value> rNode = this.index.createNode(tx, parent);
+            
+            if( isBranch() ) {
+                setBranchData(leftKeys, leftChildren);
+                rNode.setBranchData(rightKeys, rightChildren);
+            } else {
+                rNode.setNext(next);
+                next = rNode.getPageId();
+                setLeafData(leftKeys, leftValues);
+                rNode.setLeafData(rightKeys, rightValues);
+            }
+
+            index.storeNode(tx, this, true);
+            index.storeNode(tx, rNode, true);
+            parent.promoteValue(tx, separator, rNode.getPageId());
+        }
+    }
+
+    public void printStructure(Transaction tx, PrintWriter out, String prefix) throws IOException {
+        if( prefix.length()>0 && parent == null ) {
+            throw new IllegalStateException("Cycle back to root node detected.");
+        }
+        
+        if( isBranch() ) {
+            for(int i=0 ; i < children.length; i++) {
+                BTreeNode<Key, Value> child = getChild(tx, i);
+                if( i == children.length-1) {
+                    out.println(prefix+"\\- "+child.getPageId()+(child.isBranch()?" ("+child.children.length+")":""));
+                    child.printStructure(tx, out, prefix+"   ");
+                } else {
+                    out.println(prefix+"|- "+child.getPageId()+(child.isBranch()?" ("+child.children.length+")":"")+" : "+keys[i]);
+                    child.printStructure(tx, out, prefix+"   ");
+                }
+            }
+        }
+    }
+    
+    
+    public int getMinLeafDepth(Transaction tx, int depth) throws IOException {
+        depth++;
+        if( isBranch() ) {
+            int min = Integer.MAX_VALUE;
+            for(int i=0 ; i < children.length; i++) {
+                min = Math.min(min, getChild(tx, i).getMinLeafDepth(tx, depth));
+            }
+            return min;
+        } else {
+//            print(depth*2, "- "+page.getPageId());
+            return depth;
+        }
+    }
+
+    public int getMaxLeafDepth(Transaction tx, int depth) throws IOException {
+        depth++;
+        if( isBranch() ) {
+            int v = 0;
+            for(int i=0 ; i < children.length; i++) {
+                v = Math.max(v, getChild(tx, i).getMaxLeafDepth(tx, depth));
+            }
+            depth = v;
+        } 
+        return depth;
+    }
+
+    public Value get(Transaction tx, Key key) throws IOException {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+        if( isBranch() ) {
+            return getLeafNode(tx, this, key).get(tx, key);
+        } else {
+            int idx = Arrays.binarySearch(keys, key);
+            if (idx < 0) {
+                return null;
+            } else {
+                return values[idx];
+            }
+        }
+    }
+    
+    public Value getFirst(Transaction tx) throws IOException {
+        BTreeNode<Key, Value> node = this;
+        while( node .isBranch() ) {
+            node = node.getChild(tx, 0);
+        }
+        if( node.values.length>0 ) {
+            return node.values[0];
+        } else {
+            return null;
+        }
+    }
+    
+    public BTreeNode<Key,Value> getFirstLeafNode(Transaction tx) throws IOException {
+        BTreeNode<Key, Value> node = this;
+        while( node .isBranch() ) {
+            node = node.getChild(tx, 0);
+        }
+        return node;
+    }
+    
+    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx, Key startKey) throws IOException {
+        if (startKey == null) {
+            return iterator(tx);
+        }
+        if( isBranch() ) {
+            return getLeafNode(tx, this, startKey).iterator(tx, startKey);
+        } else {
+            int idx = Arrays.binarySearch(keys, startKey);
+            if (idx < 0) {
+                idx = -(idx + 1);
+            }
+            return new BTreeIterator(tx, this, idx);
+        }
+    }
+
+    public Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException {
+        return new BTreeIterator(tx, getFirstLeafNode(tx), 0);
+    }
+    
+    public void clear(Transaction tx) throws IOException {
+        if( isBranch() ) {
+            for (int i = 0; i < children.length; i++) {
+                BTreeNode<Key, Value> node = index.loadNode(tx, children[i], this);
+                node.clear(tx);
+                tx.free(node.getPage());
+            }
+        }
+        // Reset the root node to be a leaf.
+        if( parent == null ) {
+            setLeafData(createKeyArray(0), createValueArray(0));
+            next=-1;
+            index.storeNode(tx, this, true);
+        }
+    }
+
+
+    private static <Key,Value> BTreeNode<Key, Value> getLeafNode(Transaction tx, final BTreeNode<Key, Value> node, Key key) throws IOException {
+        BTreeNode<Key, Value> current = node;
+        while( true ) {
+            if( current.isBranch() ) {
+                int idx = Arrays.binarySearch(current.keys, key);
+                idx = idx < 0 ? -(idx + 1) : idx + 1;
+                BTreeNode<Key, Value> child = current.getChild(tx, idx);        
+
+                // A little cycle detection for sanity's sake
+                if( child == node ) {
+                    throw new IOException("BTree corrupted: Cylce detected.");
+                }
+                
+                current = child;
+            } else {
+                break;
+            }
+        }
+        return current;
+    }
+
+    public boolean contains(Transaction tx, Key key) throws IOException {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        if( isBranch() ) {
+            return getLeafNode(tx, this, key).contains(tx, key);
+        } else {
+            int idx = Arrays.binarySearch(keys, key);
+            if (idx < 0) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Implementation methods
+    ///////////////////////////////////////////////////////////////////
+ 
+
+    private boolean allowOverflow() {
+        // Only allow page overflow if there are <= 3 keys in the node.  Otherwise a split will occur on overflow
+        return this.keys.length<=3;
+    }
+
+
+    private void setLeafData(Key[] keys, Value[] values) {
+        this.keys = keys;
+        this.values = values;
+        this.children = null;
+    }
+    
+    private void setBranchData(Key[] keys, long[] nodeIds) {
+        this.keys = keys;
+        this.children = nodeIds;
+        this.values = null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Key[] createKeyArray(int size) {
+        return (Key[])new Object[size];
+    }
+
+    @SuppressWarnings("unchecked")
+    private Value[] createValueArray(int size) {
+        return (Value[])new Object[size];
+    }
+    
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayDelete(T[] vals, int idx) {
+        T[] newVals = (T[])new Object[vals.length - 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        if (idx < newVals.length) {
+            System.arraycopy(vals, idx + 1, newVals, idx, newVals.length - idx);
+        }
+        return newVals;
+    }
+    
+    static private long[] arrayDelete(long[] vals, int idx) {
+        long[] newVals = new long[vals.length - 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        if (idx < newVals.length) {
+            System.arraycopy(vals, idx + 1, newVals, idx, newVals.length - idx);
+        }
+        return newVals;
+    }
+
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayInsert(T[] vals, T val, int idx) {
+        T[] newVals = (T[])new Object[vals.length + 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        newVals[idx] = val;
+        if (idx < vals.length) {
+            System.arraycopy(vals, idx, newVals, idx + 1, vals.length - idx);
+        }
+        return newVals;
+    }
+
+
+    static private long[] arrayInsert(long[] vals, long val, int idx) {
+        
+        long[] newVals = new long[vals.length + 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        newVals[idx] = val;
+        if (idx < vals.length) {
+            System.arraycopy(vals, idx, newVals, idx + 1, vals.length - idx);
+        }
+        return newVals;
+    }
+
+    ///////////////////////////////////////////////////////////////////
+    // Property Accessors
+    ///////////////////////////////////////////////////////////////////
+    private boolean isBranch() {
+        return children!=null;
+    }
+
+    public long getPageId() {
+        return page.getPageId();
+    }
+
+    public BTreeNode<Key, Value> getParent() {
+        return parent;
+    }
+
+    public void setParent(BTreeNode<Key, Value> parent) {
+        this.parent = parent;
+    }
+
+    public Page<BTreeNode<Key, Value>> getPage() {
+        return page;
+    }
+
+    public void setPage(Page<BTreeNode<Key, Value>> page) {
+        this.page = page;
+    }
+
+    public long getNext() {
+        return next;
+    }
+
+    public void setNext(long next) {
+        this.next = next;
+    }
+
+}
+
+

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashBin.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashBin.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashBin.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashBin.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.Transaction;
+
+/**
+ * Bin in a HashIndex
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+class HashBin<Key, Value> {
+    
+    
+    static public class Marshaller<Key, Value> implements org.apache.kahadb.Marshaller<HashBin<Key, Value>> {
+        private final HashIndex<Key, Value> hashIndex;
+
+        public Marshaller(HashIndex<Key, Value> index) {
+            this.hashIndex = index;
+        }
+        
+        public Class<HashBin<Key, Value>> getType() {
+            return null;
+        }
+
+        public HashBin<Key, Value> readPayload(DataInput is) throws IOException {
+            HashBin<Key, Value> bin = new HashBin<Key, Value>();
+            int size = is.readInt();
+            for(int i=0; i < size; i++) {
+                Key key = hashIndex.getKeyMarshaller().readPayload(is);
+                Value value = hashIndex.getValueMarshaller().readPayload(is);
+                bin.data.put(key, value);
+            }
+            return bin;
+        }
+
+        public void writePayload(HashBin<Key, Value> bin, DataOutput os) throws IOException {
+            os.writeInt(bin.data.size());
+            for (Map.Entry<Key, Value> entry : bin.data.entrySet()) {
+                hashIndex.getKeyMarshaller().writePayload(entry.getKey(), os);
+                hashIndex.getValueMarshaller().writePayload(entry.getValue(), os);
+            }
+        }
+        
+    }
+    
+    private Page<HashBin<Key, Value>> page;
+    private TreeMap<Key, Value> data = new TreeMap<Key, Value>();
+    
+    public int size() {
+        return data.size();
+    }
+
+    public Value put(Key key, Value value) throws IOException {
+        return data.put(key, value);
+    }
+
+    public Value get(Key key) throws IOException {
+        return data.get(key);
+    }
+    
+    public boolean containsKey(Key key) throws IOException {
+        return data.containsKey(key);
+    }
+    
+    public Map<Key, Value> getAll(Transaction tx) throws IOException {
+        return data;
+    }
+    
+    public Value remove(Key key) throws IOException {
+        return data.remove(key);
+    }
+
+    public Page<HashBin<Key, Value>> getPage() {
+        return page;
+    }
+
+    public void setPage(Page<HashBin<Key, Value>> page) {
+        this.page = page;
+        this.page.set(this);
+    }
+
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.page.Page;
+import org.apache.kahadb.page.PageFile;
+import org.apache.kahadb.page.Transaction;
+import org.apache.kahadb.page.Transaction.Closure;
+
+/**
+ * BTree implementation
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class HashIndex<Key,Value> implements Index<Key,Value> {
+
+    public static final int CLOSED_STATE = 1;
+    public static final int OPEN_STATE = 2;
+    public static final int INITIALIZING_STATE = 3;
+
+    public static final int RESIZING_PHASE1_STATE = 4;
+    public static final int RESIZING_PHASE2_STATE = 5;
+
+    private static final Log LOG = LogFactory.getLog(HashIndex.class);
+
+    public static final int DEFAULT_BIN_CAPACITY;
+    public static final int DEFAULT_MAXIMUM_BIN_CAPACITY;
+    public static final int DEFAULT_MINIMUM_BIN_CAPACITY;
+    public static final int DEFAULT_LOAD_FACTOR;
+
+    static {
+        DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
+        DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("maximumCapacity", "16384"));
+        DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty("minimumCapacity", "16"));
+        DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty("defaultLoadFactor", "75"));
+    }
+
+    private AtomicBoolean loaded = new AtomicBoolean();
+
+
+    private int increaseThreshold;
+    private int decreaseThreshold;
+
+    // Where the bin page array starts at.
+    private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
+    private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
+
+
+
+    // Once binsActive/binCapacity reaches the loadFactor, then we need to
+    // increase the capacity
+    private int loadFactor = DEFAULT_LOAD_FACTOR;
+
+    private PageFile pageFile;
+    // This page holds the index metadata.
+    private long pageId;
+
+    static class Metadata {
+        
+        private Page<Metadata> page;
+        
+        // When the index is initializing or resizing.. state changes so that
+        // on failure it can be properly recovered.
+        private int state;
+        private long binPageId;
+        private int binCapacity = DEFAULT_BIN_CAPACITY;
+        private int binsActive;
+        private int size;
+        // While resizing, the following contains the new resize data.
+        private int resizeCapacity;
+        private long resizePageId;
+
+        
+        public void read(DataInput is) throws IOException {
+            state = is.readInt();
+            binPageId = is.readLong();
+            binCapacity = is.readInt();
+            size = is.readInt();
+            binsActive = is.readInt();
+            resizePageId = is.readLong();
+            resizeCapacity = is.readInt();
+        }
+        public void write(DataOutput os) throws IOException {
+            os.writeInt(state);
+            os.writeLong(binPageId);
+            os.writeInt(binCapacity);
+            os.writeInt(size);
+            os.writeInt(binsActive);
+            os.writeLong(resizePageId);
+            os.writeInt(resizeCapacity);
+        }
+        
+        static class Marshaller implements org.apache.kahadb.Marshaller<Metadata> {
+            public Class<Metadata> getType() {
+                return Metadata.class;
+            }
+
+            public Metadata readPayload(DataInput dataIn) throws IOException {
+                Metadata rc = new Metadata();
+                rc.read(dataIn);
+                return rc;
+            }
+
+            public void writePayload(Metadata object, DataOutput dataOut) throws IOException {
+                object.write(dataOut);
+            }
+        }
+    }
+    
+    private Metadata metadata = new Metadata();
+    
+    private Metadata.Marshaller metadataMarshaller = new Metadata.Marshaller();
+    private HashBin.Marshaller<Key,Value> hashBinMarshaller = new HashBin.Marshaller<Key,Value>(this);
+    private Marshaller<Key> keyMarshaller;
+    private Marshaller<Value> valueMarshaller;
+
+    
+    /**
+     * Constructor
+     * 
+     * @param directory
+     * @param name
+     * @param indexManager
+     * @param numberOfBins
+     * @throws IOException
+     */
+    public HashIndex(PageFile pageFile, long pageId) throws IOException {
+        this.pageFile = pageFile;
+        this.pageId = pageId;
+    }
+
+    public synchronized void load() {
+        Transaction tx = pageFile.tx();
+        try {
+            
+            if (loaded.compareAndSet(false, true)) {
+                try {
+                    final Page<Metadata> metadataPage = tx.load(pageId, metadataMarshaller);
+                    // Is this a brand new index?
+                    if (metadataPage.getType() == Page.PAGE_FREE_TYPE) {
+                        // We need to create the pages for the bins
+                        tx.execute(new Transaction.Closure<IOException>(){
+                            public void execute(Transaction tx) throws IOException {
+                                Page binPage = tx.allocate(metadata.binCapacity);
+                                metadata.binPageId = binPage.getPageId();
+                                metadata.state = INITIALIZING_STATE;
+                                metadata.page = metadataPage;
+                                metadataPage.set(metadata);
+                                tx.store(metadataPage, metadataMarshaller, true);
+                            }
+                        });
+                        pageFile.checkpoint();
+    
+                        // If failure happens now we can continue initializing the
+                        // the hash bins...
+                    } else {
+
+                        metadata = metadataPage.get();
+                        metadata.page = metadataPage;
+                        
+                        // If we did not have a clean shutdown...
+                        if (metadata.state == OPEN_STATE || metadata.state == RESIZING_PHASE1_STATE) {
+                            // Figure out the size and the # of bins that are
+                            // active. Yeah This loads the first page of every bin. :(
+                            // We might want to put this in the metadata page, but
+                            // then that page would be getting updated on every write.
+                            metadata.size = 0;
+                            for (int i = 0; i < metadata.binCapacity; i++) {
+                                int t = sizeOfBin(metadata.binPageId);
+                                if (t > 0) {
+                                    metadata.binsActive++;
+                                }
+                                metadata.size += t;
+                            }
+                        }
+                    }
+    
+                    if (metadata.state == INITIALIZING_STATE) {
+                        // TODO:
+                        // If a failure occurs mid way through us initializing the
+                        // bins.. will the page file still think we have the rest
+                        // of them previously allocated to us?
+    
+                        tx.execute(new Closure<IOException>(){
+                            public void execute(Transaction tx) throws IOException {
+                                clear(tx);
+                            }
+                        });
+                    }
+                    
+                    if (metadata.state == RESIZING_PHASE1_STATE) {
+                        // continue resize phase 1
+                        resizePhase1();
+                    }                
+                    if (metadata.state == RESIZING_PHASE2_STATE) {
+                        // continue resize phase 1
+                        resizePhase2();
+                    }                
+    
+                    calcThresholds();
+    
+                    metadata.state = OPEN_STATE;
+                    tx.execute(new Closure<IOException>(){
+                        public void execute(Transaction tx) throws IOException {
+                            tx.store(metadataPage, metadataMarshaller, true);
+                        }
+                    });
+                    pageFile.checkpoint();
+                    
+                    LOG.debug("HashIndex loaded. Using "+metadata.binCapacity+" bins starting at page "+metadata.binPageId);
+    
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+            
+        } finally {
+            // All pending updates should have been committed by now.
+            assert tx.isReadOnly();
+        }
+    }
+
+    private int sizeOfBin(long binPageId) {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    public synchronized void unload() throws IOException {
+        if (loaded.compareAndSet(true, false)) {
+            metadata.state = CLOSED_STATE;
+            pageFile.tx().execute(new Closure<IOException>(){
+                public void execute(Transaction tx) throws IOException {
+                    tx.store(metadata.page, metadataMarshaller, true);
+                }
+            });
+        }
+    }
+
+    public synchronized Value get(Transaction tx, Key key) throws IOException {
+        load();
+        return getBin(tx, key).get(key);
+    }
+    
+    public synchronized boolean containsKey(Transaction tx, Key key) throws IOException {
+        // TODO: multiple loads is smelly..
+        load();
+        return getBin(tx, key).containsKey(key);
+    }
+
+    synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
+        // TODO: multiple loads is smelly..
+        load();
+        HashBin<Key,Value> bin = getBin(tx, key);
+
+        int originalSize = bin.size();
+        Value result = bin.put(key,value);
+        store(tx, bin);
+
+        int newSize = bin.size();
+
+        if (newSize != originalSize) {
+            metadata.size++;
+            if (newSize == 1) {
+                metadata.binsActive++;
+            }
+        }
+
+        if (metadata.binsActive >= this.increaseThreshold) {
+            newSize = Math.min(maximumBinCapacity, metadata.binCapacity*2);
+            if(metadata.binCapacity!=newSize) {
+                resize(newSize);
+            }
+        }
+        return result;
+    }
+    
+    synchronized public Value remove(Transaction tx, Key key) throws IOException {
+        // TODO: multiple loads is smelly..
+        load();
+
+        HashBin<Key,Value> bin = getBin(tx, key);
+        int originalSize = bin.size();
+        Value result = bin.remove(key);
+        int newSize = bin.size();
+        
+        if (newSize != originalSize) {
+            store(tx, bin);
+
+            metadata.size--;
+            if (newSize == 0) {
+                metadata.binsActive--;
+            }
+        }
+
+        if (metadata.binsActive <= this.decreaseThreshold) {
+            newSize = Math.max(minimumBinCapacity, metadata.binCapacity/2);
+            if(metadata.binCapacity!=newSize) {
+                resize(newSize);
+            }
+        }
+        return result;
+    }
+    
+
+    public synchronized void clear(Transaction tx) throws IOException {
+        // TODO: multiple loads is smelly..
+        load();
+        for (int i = 0; i < metadata.binCapacity; i++) {
+            long pageId = metadata.binPageId + i;
+            clearBinAtPage(tx, pageId);
+        }
+        metadata.size = 0;
+        metadata.binsActive = 0;
+    }
+    
+    public Iterator<Entry<Key, Value>> iterator(Transaction tx) throws IOException, UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
+
+
+    /**
+     * @param tx
+     * @param pageId
+     * @throws IOException
+     */
+    private void clearBinAtPage(Transaction tx, long pageId) throws IOException {
+        Page<HashBin<Key,Value>> page = tx.load(pageId, null);
+        HashBin<Key, Value> bin = new HashBin<Key,Value>();
+        bin.setPage(page);
+        page.set(bin);
+        store(tx, bin);
+    }
+
+    public String toString() {
+        String str = "HashIndex" + System.identityHashCode(this) + ": " + pageFile;
+        return str;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Implementation Methods
+    // /////////////////////////////////////////////////////////////////
+
+    public synchronized void store(Transaction tx, HashBin<Key,Value> bin) throws IOException {
+        tx.store(bin.getPage(), hashBinMarshaller, true);
+    }
+
+    
+    private void resize(final int newSize) throws IOException {
+        LOG.debug("Resizing to: "+newSize);
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                metadata.state = RESIZING_PHASE1_STATE;
+                metadata.resizeCapacity = newSize;
+                metadata.resizePageId = tx.allocate(metadata.resizeCapacity).getPageId();
+                tx.store(metadata.page, metadataMarshaller, true);
+            }
+        });
+        pageFile.checkpoint();
+        
+        resizePhase1();
+        resizePhase2();        
+    }
+
+    private void resizePhase1() throws IOException {
+        // In Phase 1 we copy the data to the new bins..
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                
+                // Initialize the bins..
+                for (int i = 0; i < metadata.resizeCapacity; i++) {
+                    long pageId = metadata.resizePageId + i;
+                    clearBinAtPage(tx, pageId);
+                }
+
+                metadata.binsActive = 0;
+                // Copy the data from the old bins to the new bins.
+                for (int i = 0; i < metadata.binCapacity; i++) {
+                    
+                    HashBin<Key,Value> bin = getBin(tx, i);
+                    for (Map.Entry<Key, Value> entry : bin.getAll(tx).entrySet()) {
+                        HashBin<Key,Value> resizeBin = getResizeBin(tx, entry.getKey());
+                        resizeBin.put(entry.getKey(), entry.getValue());
+                        store(tx, resizeBin);
+                        if( resizeBin.size() == 1) {
+                            metadata.binsActive++;
+                        }
+                    }
+                }
+                
+                // Now we can release the old data.
+                metadata.state = RESIZING_PHASE2_STATE;
+                tx.store(metadata.page, metadataMarshaller, true);
+            }
+        });
+        pageFile.checkpoint();
+    }
+
+    private void resizePhase2() throws IOException {
+        // In phase 2 we free the old bins and switch the the new bins.
+        pageFile.tx().execute(new Closure<IOException>(){
+            public void execute(Transaction tx) throws IOException {
+                for (int i = 0; i < metadata.binCapacity; i++) {
+                    long pageId = metadata.binPageId + i;
+                    clearBinAtPage(tx, pageId);
+                }
+                tx.free(metadata.binPageId, metadata.binCapacity);
+                
+                metadata.binCapacity = metadata.resizeCapacity;
+                metadata.binPageId = metadata.resizePageId;
+                metadata.resizeCapacity=0;
+                metadata.resizePageId=0;
+                metadata.state = OPEN_STATE;
+                tx.store(metadata.page, metadataMarshaller, true);
+            }
+        });
+
+        pageFile.checkpoint();
+        calcThresholds();
+        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);
+    }
+
+    private void calcThresholds() {
+        increaseThreshold = (metadata.binCapacity * loadFactor)/100;
+        decreaseThreshold = (metadata.binCapacity * loadFactor * loadFactor ) / 20000;
+    }
+
+    private HashBin<Key,Value> getResizeBin(Transaction tx, Key key) throws IOException {
+        int i = indexFor(key, metadata.resizeCapacity);
+        return getResizeBin(tx, i);
+    }
+
+    private HashBin<Key,Value> getResizeBin(Transaction tx, int i) throws IOException {
+        Page<HashBin<Key, Value>> page = tx.load(metadata.resizePageId + i, hashBinMarshaller);
+        HashBin<Key, Value> rc = page.get();
+        rc.setPage(page);
+        return rc;
+    }
+
+    private HashBin<Key,Value> getBin(Transaction tx, Key key) throws IOException {
+        int i = indexFor(key, metadata.binCapacity);
+        return getBin(tx, i);
+    }
+
+    private HashBin<Key,Value> getBin(Transaction tx, int i) throws IOException {
+        Page<HashBin<Key, Value>> page = tx.load(metadata.binPageId + i, hashBinMarshaller);
+        HashBin<Key, Value> rc = page.get();
+        rc.setPage(page);
+        return rc;
+    }
+
+    int indexFor(Key x, int length) {
+        return Math.abs(x.hashCode()%length);
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Property Accessors
+    // /////////////////////////////////////////////////////////////////
+
+    public Marshaller<Key> getKeyMarshaller() {
+        return keyMarshaller;
+    }
+
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    public synchronized void setKeyMarshaller(Marshaller<Key> marshaller) {
+        this.keyMarshaller = marshaller;
+    }
+
+    public Marshaller<Value> getValueMarshaller() {
+        return valueMarshaller;
+    }
+    /**
+     * Set the marshaller for value objects
+     * 
+     * @param marshaller
+     */
+    public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
+        this.valueMarshaller = valueMarshaller;
+    }
+    
+    /**
+     * @return number of bins in the index
+     */
+    public int getBinCapacity() {
+        return metadata.binCapacity;
+    }
+
+    /**
+     * @param binCapacity
+     */
+    public void setBinCapacity(int binCapacity) {
+        if (loaded.get() && binCapacity != metadata.binCapacity) {
+            throw new RuntimeException("Pages already loaded - can't reset bin capacity");
+        }
+        metadata.binCapacity = binCapacity;
+    }
+
+    public boolean isTransient() {
+        return false;
+    }
+
+    /**
+     * @return the loadFactor
+     */
+    public int getLoadFactor() {
+        return loadFactor;
+    }
+
+    /**
+     * @param loadFactor the loadFactor to set
+     */
+    public void setLoadFactor(int loadFactor) {
+        this.loadFactor = loadFactor;
+    }
+
+    /**
+     * @return the maximumCapacity
+     */
+    public int setMaximumBinCapacity() {
+        return maximumBinCapacity;
+    }
+
+    /**
+     * @param maximumCapacity the maximumCapacity to set
+     */
+    public void setMaximumBinCapacity(int maximumCapacity) {
+        this.maximumBinCapacity = maximumCapacity;
+    }
+
+    public synchronized int size(Transaction tx) {
+        return metadata.size;
+    }
+
+    public synchronized int getActiveBins() {
+        return metadata.binsActive;
+    }
+
+    public long getBinPageId() {
+        return metadata.binPageId;
+    }
+
+    public PageFile getPageFile() {
+        return pageFile;
+    }
+
+    public int getBinsActive() {
+        return metadata.binsActive;
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/index/Index.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.index;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.kahadb.Marshaller;
+import org.apache.kahadb.page.Transaction;
+
+/**
+ * Simpler than a Map
+ * 
+ * @version $Revision: 1.2 $
+ */
+public interface Index<Key,Value> {
+    
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    void setKeyMarshaller(Marshaller<Key> marshaller);
+    
+    /**
+     * Set the marshaller for key objects
+     * 
+     * @param marshaller
+     */
+    void setValueMarshaller(Marshaller<Value> marshaller);
+
+    /**
+     * load indexes
+     */
+    void load() throws IOException;
+
+    /**
+     * unload indexes
+     * 
+     * @throws IOException
+     */
+    void unload() throws IOException;
+
+    /**
+     * clear the index
+     * 
+     * @throws IOException
+     * 
+     */
+    void clear(Transaction tx) throws IOException;
+
+    /**
+     * @param key
+     * @return true if it contains the key
+     * @throws IOException
+     */
+    boolean containsKey(Transaction tx, Key key) throws IOException;
+
+    /**
+     * remove the index key
+     * 
+     * @param key
+     * @return StoreEntry removed
+     * @throws IOException
+     */
+    Value remove(Transaction tx, Key key) throws IOException;
+
+    /**
+     * store the key, item
+     * 
+     * @param key
+     * @param entry
+     * @throws IOException
+     */
+    Value put(Transaction tx, Key key, Value entry) throws IOException;
+
+    /**
+     * @param key
+     * @return the entry
+     * @throws IOException
+     */
+    Value get(Transaction tx, Key key) throws IOException;
+
+    /**
+     * @return true if the index is transient
+     */
+    boolean isTransient();
+    
+    /**
+     * @param tx
+     * @return
+     * @throws IOException
+     * @trhows UnsupportedOperationException 
+     *         if the index does not support fast iteration of the elements.
+     */
+    Iterator<Map.Entry<Key,Value>> iterator(final Transaction tx) throws IOException, UnsupportedOperationException;
+    
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ControlFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ControlFile.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ControlFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ControlFile.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.apache.kahadb.util.ByteSequence;
+import org.apache.kahadb.util.IOExceptionSupport;
+
+/**
+ * Use to reliably store fixed sized state data. It stores the state in record
+ * that is versioned and repeated twice in the file so that a failure in the
+ * middle of the write of the first or second record do not not result in an
+ * unknown state.
+ * 
+ * @version $Revision: 1.1 $
+ */
+public final class ControlFile {
+
+    private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+    private final File file;
+
+    /** The File that holds the control data. */
+    private final RandomAccessFile randomAccessFile;
+    private final int maxRecordSize;
+    private final int firstRecordStart;
+    private final int secondRecordStart;
+    private final int firstRecordEnd;
+    private final int secondRecordEnd;
+
+    private long version;
+    private FileLock lock;
+    private boolean disposed;
+
+    public ControlFile(File file, int recordSize) throws IOException {
+        this.file = file;
+        this.maxRecordSize = recordSize + 4;
+        
+        // Calculate where the records start and end.
+        this.firstRecordStart = 8;
+        this.secondRecordStart = 8 + maxRecordSize + 8 + 8;
+        this.firstRecordEnd = firstRecordStart+maxRecordSize;
+        this.secondRecordEnd = secondRecordStart+maxRecordSize;
+
+        randomAccessFile = new RandomAccessFile(file, "rw");
+    }
+
+    /**
+     * Locks the control file.
+     * 
+     * @throws IOException
+     */
+    public void lock() throws IOException {
+        if (DISABLE_FILE_LOCK) {
+            return;
+        }
+
+        if (lock == null) {
+            try {
+                lock = randomAccessFile.getChannel().tryLock();
+            } catch (OverlappingFileLockException e) {
+                throw IOExceptionSupport.create("Control file '" + file + "' could not be locked.",e);
+            }
+            if (lock == null) {
+                throw new IOException("Control file '" + file + "' could not be locked.");
+            }
+        }
+    }
+
+    /**
+     * Un locks the control file.
+     * 
+     * @throws IOException
+     */
+    public void unlock() throws IOException {
+        if (DISABLE_FILE_LOCK) {
+            return;
+        }
+
+        if (lock != null) {
+            lock.release();
+            lock = null;
+        }
+    }
+
+    public void dispose() {
+        if (disposed) {
+            return;
+        }
+        disposed = true;
+        try {
+            unlock();
+        } catch (IOException ignore) {
+        }
+        try {
+            randomAccessFile.close();
+        } catch (IOException ignore) {
+        }
+    }
+
+    public synchronized ByteSequence load() throws IOException {
+        long l = randomAccessFile.length();
+        if (l < maxRecordSize) {
+            return null;
+        }
+
+        randomAccessFile.seek(firstRecordStart-8);
+        long v1 = randomAccessFile.readLong();
+        randomAccessFile.seek(firstRecordEnd);
+        long v1check = randomAccessFile.readLong();
+
+        randomAccessFile.seek(secondRecordStart - 8);
+        long v2 = randomAccessFile.readLong();
+        randomAccessFile.seek(secondRecordEnd);
+        long v2check = randomAccessFile.readLong();
+
+        byte[] data = null;
+        if (v2 == v2check) {
+            version = v2;
+            randomAccessFile.seek(secondRecordStart);
+            int size = randomAccessFile.readInt();
+            data = new byte[size];
+            randomAccessFile.readFully(data);
+        } else if (v1 == v1check) {
+            version = v1;
+            randomAccessFile.seek(firstRecordStart);
+            int size = randomAccessFile.readInt();
+            data = new byte[size];
+            randomAccessFile.readFully(data);
+        } else {
+            // Bummer.. Both checks are screwed. we don't know
+            // if any of the two buffer are ok. This should
+            // only happen is data got corrupted.
+            throw new IOException("Control data corrupted.");
+        }
+        return new ByteSequence(data, 0, data.length);
+    }
+
+    public void store(ByteSequence data, boolean sync) throws IOException {
+
+        version++;
+        randomAccessFile.setLength((maxRecordSize * 2) + 32);
+        randomAccessFile.seek(0);
+
+        // Write the first copy of the control data.
+        randomAccessFile.writeLong(version);
+        randomAccessFile.writeInt(data.getLength());
+        randomAccessFile.write(data.getData());
+        randomAccessFile.seek(firstRecordEnd);
+        randomAccessFile.writeLong(version);
+
+        // Write the second copy of the control data.
+        randomAccessFile.writeLong(version);
+        randomAccessFile.writeInt(data.getLength());
+        randomAccessFile.write(data.getData());
+        randomAccessFile.seek(secondRecordEnd);
+        randomAccessFile.writeLong(version);
+
+        if (sync) {
+            randomAccessFile.getFD().sync();
+        }
+    }
+
+}

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java?rev=692288&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/DataFile.java Thu Sep  4 15:46:42 2008
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.kahadb.util.IOHelper;
+import org.apache.kahadb.util.LinkedNode;
+
+/**
+ * DataFile
+ * 
+ * @version $Revision: 1.1.1.1 $
+ */
+public class DataFile extends LinkedNode implements Comparable<DataFile> {
+
+    protected final File file;
+    protected final Integer dataFileId;
+    protected final int preferedSize;
+
+    protected int length;
+    protected int referenceCount;
+
+    DataFile(File file, int number, int preferedSize) {
+        this.file = file;
+        this.preferedSize = preferedSize;
+        this.dataFileId = Integer.valueOf(number);
+        length = (int)(file.exists() ? file.length() : 0);
+    }
+    
+    File getFile() {
+        return file;
+    }
+
+    public Integer getDataFileId() {
+        return dataFileId;
+    }
+
+    public synchronized int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
+
+    public synchronized void incrementLength(int size) {
+        length += size;
+    }
+
+    public synchronized int increment() {
+        return ++referenceCount;
+    }
+
+    public synchronized int decrement() {
+        return --referenceCount;
+    }
+    
+    public synchronized int getReferenceCount(){
+    	return referenceCount;
+    }
+
+    public synchronized boolean isUnused() {
+        return referenceCount <= 0;
+    }
+
+    public synchronized String toString() {
+        String result = file.getName() + " number = " + dataFileId + " , length = " + length + " refCount = " + referenceCount;
+        return result;
+    }
+
+    public synchronized RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
+        RandomAccessFile rc = new RandomAccessFile(file, "rw");
+        // When we start to write files size them up so that the OS has a chance
+        // to allocate the file contigously.
+        if (appender) {
+            if (length < preferedSize) {
+                rc.setLength(preferedSize);
+            }
+        }
+        return rc;
+    }
+
+    public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        // On close set the file size to the real size.
+        if (length != file.length()) {
+            file.setLength(getLength());
+        }
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        return file.delete();
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        IOHelper.moveFile(file,targetDirectory);
+    }
+
+    public int compareTo(DataFile df) {
+        return dataFileId - df.dataFileId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof DataFile) {
+            result = compareTo((DataFile)o) == 0;
+        }
+        return result;
+    }
+
+    @Override
+    public int hashCode() {
+        return dataFileId;
+    }
+}



Mime
View raw message