activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r825564 [2/5] - in /activemq/sandbox/activemq-flow: ./ activemq-util/src/main/java/org/apache/activemq/util/buffer/ hawtdb/ hawtdb/src/ hawtdb/src/main/ hawtdb/src/main/java/ hawtdb/src/main/java/org/ hawtdb/src/main/java/org/apache/ hawtdb...
Date Thu, 15 Oct 2009 17:04:15 GMT
Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/BTreeNode.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hawtdb.api.IndexVisitor;
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.IndexException;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.internal.page.Extent;
+import org.apache.hawtdb.internal.page.ExtentInputStream;
+import org.apache.hawtdb.internal.page.ExtentOutputStream;
+import org.apache.hawtdb.internal.util.Ranges;
+
+
+/**
+ * The BTreeNode class represents a node in the BTree object graph. It is stored
+ * in one Page of a PageFile.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public final class BTreeNode<Key, Value> {
+
+    private static final Object [] EMPTY_ARRAY = new Object[]{};
+    
+    // The page associated with this node
+    transient private int page;
+
+    // The number of pages that this node takes on disk if known. -1 if it is
+    // not yet known.
+    transient int pageCount = -1;
+
+    Data<Key, Value> data;
+
+    static class Data<Key, Value> {
+
+        @SuppressWarnings("unchecked")
+        public Data() {
+            this(-1, (Key[])EMPTY_ARRAY, null, (Value[])EMPTY_ARRAY, -1);
+        }
+        
+        public Data(int parent, Key[] keys, int[] children, Value[] values, int next) {
+            this.parent = parent;
+            this.keys = keys;
+            this.values = values;
+            this.children = children;
+            this.next = next;
+        }
+
+        // The parent node or -1 if this is the root node of the BTree
+        int parent;
+
+        // Order list of keys in the node
+        Key[] keys;
+
+        // Values associated with the Keys. Null if this is a branch node.
+        Value[] values;
+
+        // nodeId pointers to children BTreeNodes. Null if this is a leaf node.
+        int[] children;
+
+        // The next leaf node after this one. Used for fast iteration of the
+        // entries. -1 if this is the last node.
+        int next;
+
+        public Data<Key, Value> values(Value[] values) {
+            return new Data<Key, Value>(parent, keys, children, values, next);
+        }
+
+        public Data<Key, Value> children(int[] children) {
+            return new Data<Key, Value>(parent, keys, children, values, next);
+        }
+
+        public Data<Key, Value> next(int next) {
+            return new Data<Key, Value>(parent, keys, children, values, next);
+        }
+        
+        public Data<Key, Value> change(Key[] keys, int[] children, Value[] values) {
+            return new Data<Key, Value>(parent, keys, children, values, next);
+        }
+        
+        public Data<Key, Value> branch(Key[] keys, int[] children) {
+            return new Data<Key, Value>(parent, keys, children, null, next);
+        }
+
+        public Data<Key, Value> branch(int parent, Key[] keys, int[] children) {
+            return new Data<Key, Value>(parent, keys, children, null, next);
+        }
+        
+        public Data<Key, Value> leaf(Key[] keys, Value[] values) {
+            return new Data<Key, Value>(parent, keys, null, values, next);
+        }
+
+        public Data<Key, Value> leaf(int parent, Key[] keys, Value[] values) {
+            return new Data<Key, Value>(parent, keys, null, values, next);
+        }
+        
+        public Data<Key, Value> leaf(Key[] keys, Value[] values, int next) {
+            return new Data<Key, Value>(parent, keys, null, values, next);
+        }
+
+        public Data<Key, Value> leaf(int parent, Key[] keys, Value[] values, int next) {
+            return new Data<Key, Value>(parent, keys, null, values, next);
+        }
+
+    }
+
+    static public class BTreeNodeEncoderDecoder<Key, Value> implements EncoderDecoder<BTreeNode<Key, Value>> {
+
+        private final BTreeIndex<Key, Value> index;
+
+        public BTreeNodeEncoderDecoder(BTreeIndex<Key, Value> index) {
+            this.index = index;
+        }
+
+        public List<Integer> store(Paged paged, int page, BTreeNode<Key, Value> node) {
+
+            short count = (short) node.data.keys.length; // cast may truncate
+                                                         // value...
+            if (count != node.data.keys.length) {
+                throw new IndexException("Too many keys");
+            }
+
+            // The node will be stored in an extent. This allows us to easily
+            // support huge nodes.
+            // The first extent is only 1 page long, extents linked off
+            // the first page will be up to 128 pages long.
+            ExtentOutputStream eos = new ExtentOutputStream(paged, page, (short) 1, (short) 128);
+            DataOutputStream os = new DataOutputStream(eos);
+            try {
+                os.writeInt(node.data.parent);
+                os.writeShort(count);
+                for (int i = 0; i < node.data.keys.length; i++) {
+                    index.getKeyMarshaller().writePayload(node.data.keys[i], os);
+                }
+
+                if (node.isBranch()) {
+                    // If this is a branch...
+                    os.writeBoolean(true);
+                    for (int i = 0; i < count + 1; i++) {
+                        os.writeInt(node.data.children[i]);
+                    }
+
+                } else {
+                    // If this is a leaf
+                    os.writeBoolean(false);
+                    for (int i = 0; i < count; i++) {
+                        index.getValueMarshaller().writePayload(node.data.values[i], os);
+                    }
+                    os.writeInt(node.data.next);
+                }
+                os.close();
+
+            } catch (IOException e) {
+                throw new IndexException(e);
+            }
+
+            Ranges pages = eos.getPages();
+            pages.remove(page);
+            if (pages.isEmpty()) {
+                node.pageCount = 1;
+                return Collections.emptyList();
+            }
+
+            List<Integer> rc = pages.values();
+            node.pageCount = rc.size() + 1;
+            return rc;
+        }
+
+        @SuppressWarnings("unchecked")
+        public BTreeNode<Key, Value> load(Paged paged, int page) {
+            ExtentInputStream eis = new ExtentInputStream(paged, page);
+            DataInputStream is = new DataInputStream(eis);
+
+            try {
+                int parent = is.readInt();
+                int count = is.readShort();
+                Key[] keys = (Key[]) new Object[count];
+                int[] children = null;
+                Value[] values = null;
+                int next = -1;
+
+                for (int i = 0; i < count; i++) {
+                    keys[i] = index.getKeyMarshaller().readPayload(is);
+                }
+
+                if (is.readBoolean()) {
+                    children = new int[count + 1];
+                    for (int i = 0; i < count + 1; i++) {
+                        children[i] = is.readInt();
+                    }
+                } else {
+                    values = (Value[]) new Object[count];
+                    for (int i = 0; i < count; i++) {
+                        values[i] = index.getValueMarshaller().readPayload(is);
+                    }
+                    next = is.readInt();
+                }
+                is.close();
+
+                BTreeNode<Key, Value> node = new BTreeNode<Key, Value>(new Data<Key, Value>(parent, keys, children, values, next));
+                node.pageCount = eis.getPages().size();
+                return node;
+
+            } catch (IOException e) {
+                throw new IndexException(e);
+            }
+
+        }
+
+        public void remove(Paged paged, int page) {
+            Extent.freeLinked(paged, page);
+        }
+
+    }
+
+    public BTreeNode(int page) {
+        this(page, new Data<Key, Value>());
+    }
+
+    public BTreeNode(Data<Key, Value> data) {
+        this.data = data;
+    }
+
+    public BTreeNode(int page, Data<Key, Value> data) {
+        this.page = page;
+        this.data = data;
+    }
+
+
+    /**
+     * Internal (to the BTreeNode) method. Because this method is called only by
+     * BTreeNode itself, no synchronization done inside of this method.
+     * 
+     * @throws IOException
+     */
+    private BTreeNode<Key, Value> getChild(BTreeIndex<Key, Value> index, int idx) {
+        if (isBranch() && idx >= 0 && idx < data.children.length) {
+            BTreeNode<Key, Value> result = index.loadNode(data.children[idx]);
+            return result;
+        } else {
+            return null;
+        }
+    }
+
+    public Value remove(BTreeIndex<Key, Value> index, Key key) {
+
+        if (isBranch()) {
+            int idx = Arrays.binarySearch(data.keys, key);
+            idx = idx < 0 ? -(idx + 1) : idx + 1;
+            BTreeNode<Key, Value> child = getChild(index, idx);
+            if (child.getPage() == index.getPage()) {
+                throw new IndexException("BTree corrupted: Cylce detected.");
+            }
+            Value rc = child.remove(index, key);
+
+            // child node is now empty.. remove it from the branch node.
+            if (child.data.keys.length == 0) {
+
+                // If the child node is a branch, promote
+                if (child.isBranch()) {
+                    // This is cause branches are never really empty.. they just
+                    // go down to 1 child..
+                    data = data.children(arrayUpdate(data.children, idx,child.data.children[0]));
+                    
+                } else {
+
+                    // The child was a leaf. Then we need to actually remove it
+                    // from this branch node..
+
+                    // We need to update the previous child's next pointer to
+                    // skip over the child being removed....
+                    int prevIdx = idx - 1;
+                    if (idx > 0 && data.children.length > 1) {
+                        getChild(index, prevIdx).setNext(index, child.data.next);
+                    }
+
+                    if (idx < data.children.length - 1) {
+                        // Delete it and key to the right.
+                        data = data.branch(arrayDelete(data.keys, idx), arrayDelete(data.children, idx));
+                    } else {
+                        // It was the last child.. Then delete it and key to the
+                        // left
+                        data = data.branch(arrayDelete(data.keys, prevIdx), arrayDelete(data.children, idx));
+                    }
+
+                    // If we are the root node, and only have 1 child left. Then
+                    // make the root be the leaf node.
+                    if (data.children.length == 1 && data.parent == -1) {
+                        child = getChild(index, 0);
+                        data = data.change(child.data.keys, child.data.children, child.data.values);
+                        // free up the page..
+                        index.free(child.getPage());
+                    }
+
+                }
+                index.storeNode(this);
+            }
+
+            return rc;
+        } else {
+            int idx = Arrays.binarySearch(data.keys, key);
+            if (idx < 0) {
+                return null;
+            } else {
+                Value oldValue = data.values[idx];
+                data = data.leaf(arrayDelete(data.keys, idx), arrayDelete(data.values, idx));
+
+                if (data.keys.length == 0 && data.parent != -1) {
+                    index.free(getPage());
+                } else {
+                    index.storeNode(this);
+                }
+
+                return oldValue;
+            }
+        }
+    }
+
+    private void setNext(BTreeIndex<Key, Value> index, int next) {
+        data = data.next(next);
+        index.storeNode(this);
+    }
+
+    public Value put(BTreeIndex<Key, Value> index, Key key, Value value) {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        if (isBranch()) {
+            return getLeafNode(index, this, key).put(index, key, value);
+        } else {
+            int idx = Arrays.binarySearch(data.keys, key);
+
+            Value oldValue = null;
+            if (idx >= 0) {
+                // Key was found... Overwrite
+                oldValue = data.values[idx];
+                data = data.leaf(data.keys, arrayUpdate(data.values, idx, value));
+            } else {
+                // Key was not found, Insert it
+                idx = -(idx + 1);
+                data = data.leaf(arrayInsert(data.keys, key, idx), arrayInsert(data.values, value, idx));
+            }
+
+            if (splitNeeded()) {
+                split(index);
+            } else {
+                index.storeNode(this);
+            }
+
+            return oldValue;
+        }
+    }
+
+    private void promoteValue(BTreeIndex<Key, Value> index, Key key, int nodeId) {
+
+        int idx = Arrays.binarySearch(data.keys, key);
+        idx = idx < 0 ? -(idx + 1) : idx + 1;
+        data = data.branch(arrayInsert(data.keys, key, idx), arrayInsert(data.children, nodeId, idx + 1));
+
+        if (splitNeeded()) {
+            split(index);
+        } else {
+            index.storeNode(this);
+        }
+
+    }
+
+    /**
+     * Internal to the BTreeNode method
+     */
+    private void split(BTreeIndex<Key, Value> index) {
+        Key[] leftKeys;
+        Key[] rightKeys;
+        Value[] leftValues = null;
+        Value[] rightValues = null;
+        int[] leftChildren = null;
+        int[] rightChildren = null;
+        Key separator;
+
+        int vc = data.keys.length;
+        int pivot = vc / 2;
+
+        // Split the node into two nodes
+        if (isBranch()) {
+
+            leftKeys = createKeyArray(pivot);
+            leftChildren = new int[leftKeys.length + 1];
+            rightKeys = createKeyArray(vc - (pivot + 1));
+            rightChildren = new int[rightKeys.length + 1];
+
+            System.arraycopy(data.keys, 0, leftKeys, 0, leftKeys.length);
+            System.arraycopy(data.children, 0, leftChildren, 0, leftChildren.length);
+            System.arraycopy(data.keys, leftKeys.length + 1, rightKeys, 0, rightKeys.length);
+            System.arraycopy(data.children, leftChildren.length, rightChildren, 0, rightChildren.length);
+
+            // Is it a Simple Prefix BTree??
+            Prefixer<Key> prefixer = index.getPrefixer();
+            if (prefixer != null) {
+                separator = prefixer.getSimplePrefix(leftKeys[leftKeys.length - 1], rightKeys[0]);
+            } else {
+                separator = data.keys[leftKeys.length];
+            }
+
+        } else {
+
+            leftKeys = createKeyArray(pivot);
+            leftValues = createValueArray(leftKeys.length);
+            rightKeys = createKeyArray(vc - pivot);
+            rightValues = createValueArray(rightKeys.length);
+
+            System.arraycopy(data.keys, 0, leftKeys, 0, leftKeys.length);
+            System.arraycopy(data.values, 0, leftValues, 0, leftValues.length);
+            System.arraycopy(data.keys, leftKeys.length, rightKeys, 0, rightKeys.length);
+            System.arraycopy(data.values, leftValues.length, rightValues, 0, rightValues.length);
+
+            // separator = getSeparator(leftVals[leftVals.length - 1],
+            // rightVals[0]);
+            separator = rightKeys[0];
+
+        }
+
+        // Promote the pivot to the parent branch
+        if (data.parent == -1) {
+
+            // This can only happen if this is the root
+            BTreeNode<Key, Value> rNode;
+            BTreeNode<Key, Value> lNode;
+
+            if (isBranch()) {
+                rNode = index.createNode(data.branch(page, rightKeys, rightChildren));
+                lNode = index.createNode(data.branch(page, leftKeys, leftChildren));
+            } else {
+                rNode = index.createNode(data.leaf(page, rightKeys, rightValues));
+                lNode = index.createNode(data.leaf(page, leftKeys, leftValues, rNode.getPage()));
+            }
+
+            Key[] v = createKeyArray(1);
+            v[0] = separator;
+            data = data.branch(v, new int[] { lNode.getPage(), rNode.getPage() });
+
+            index.storeNode(this);
+            index.storeNode(rNode);
+            index.storeNode(lNode);
+
+        } else {
+            BTreeNode<Key, Value> rNode;
+
+            if (isBranch()) {
+                rNode = index.createNode(data.branch(data.parent, rightKeys, rightChildren));
+                data = data.branch(leftKeys, leftChildren);
+            } else {
+                rNode = index.createNode(data.leaf(data.parent, rightKeys, rightValues, data.next));
+                data = data.leaf(leftKeys, leftValues, rNode.getPage());
+            }
+
+            index.storeNode(this);
+            index.storeNode(rNode);
+            index.loadNode(data.parent).promoteValue(index, separator, rNode.getPage());
+        }
+    }
+
+
+    public void printStructure(BTreeIndex<Key, Value> index, PrintWriter out, String prefix) {
+        if (prefix.length() > 0 && data.parent == -1) {
+            throw new IllegalStateException("Cycle back to root node detected.");
+        }
+
+        if (isBranch()) {
+            for (int i = 0; i < data.children.length; i++) {
+                BTreeNode<Key, Value> child = getChild(index, i);
+                if (i == data.children.length - 1) {
+                    out.println(prefix + "\\- " + child.getPage() + (child.isBranch() ? " (" + child.data.children.length + ")" : ""));
+                    child.printStructure(index, out, prefix + "   ");
+                } else {
+                    out.println(prefix + "|- " + child.getPage() + (child.isBranch() ? " (" + child.data.children.length + ")" : "") + " : " + data.keys[i]);
+                    child.printStructure(index, out, prefix + "   ");
+                }
+            }
+        }
+    }
+
+    public int getMinLeafDepth(BTreeIndex<Key, Value> index, int depth) {
+        depth++;
+        if (isBranch()) {
+            int min = Integer.MAX_VALUE;
+            for (int i = 0; i < data.children.length; i++) {
+                min = Math.min(min, getChild(index, i).getMinLeafDepth(index, depth));
+            }
+            return min;
+        } else {
+            // print(depth*2, "- "+page.getPageId());
+            return depth;
+        }
+    }
+    
+    public int size(BTreeIndex<Key, Value> index) {
+        int rc=0;
+        
+        BTreeNode<Key, Value> node = this;
+        while (node.isBranch()) {
+            node = node.getChild(index, 0);
+        }
+        while (node!=null) {
+            rc += node.data.values.length;
+            if( node.data.next!= -1 ) {
+                node = index.loadNode(node.data.next);
+            } else {
+                node = null;
+            }
+        }
+        return rc;
+    }    
+
+    public int getMaxLeafDepth(BTreeIndex<Key, Value> index, int depth) {
+        depth++;
+        if (isBranch()) {
+            int v = 0;
+            for (int i = 0; i < data.children.length; i++) {
+                v = Math.max(v, getChild(index, i).getMaxLeafDepth(index, depth));
+            }
+            depth = v;
+        }
+        return depth;
+    }
+
+    public Value get(BTreeIndex<Key, Value> index, Key key) {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+        if (isBranch()) {
+            return getLeafNode(index, this, key).get(index, key);
+        } else {
+            int idx = Arrays.binarySearch(data.keys, key);
+            if (idx < 0) {
+                return null;
+            } else {
+                return data.values[idx];
+            }
+        }
+    }
+
+    public void visit(BTreeIndex<Key, Value> index, IndexVisitor<Key, Value> visitor) {
+        if (visitor == null) {
+            throw new IllegalArgumentException("Visitor cannot be null");
+        }
+
+        if (visitor.isSatiated()) {
+            return;
+        }
+
+        if (isBranch()) {
+            for (int i = 0; i < this.data.children.length; i++) {
+                Key key1 = null;
+                if (i != 0) {
+                    key1 = data.keys[i - 1];
+                }
+                Key key2 = null;
+                if (i != this.data.children.length - 1) {
+                    key2 = data.keys[i];
+                }
+                if (visitor.isInterestedInKeysBetween(key1, key2)) {
+                    BTreeNode<Key, Value> child = getChild(index, i);
+                    child.visit(index, visitor);
+                }
+            }
+        } else {
+            visitor.visit(Arrays.asList(data.keys), Arrays.asList(data.values));
+        }
+    }
+
+    public Map.Entry<Key, Value> getFirst(BTreeIndex<Key, Value> index) {
+        BTreeNode<Key, Value> node = this;
+        while (node.isBranch()) {
+            node = node.getChild(index, 0);
+        }
+        if (node.data.values.length > 0) {
+            return new KeyValueEntry<Key, Value>(node.data.keys[0], node.data.values[0]);
+        } else {
+            return null;
+        }
+    }
+
+    public Map.Entry<Key, Value> getLast(BTreeIndex<Key, Value> index) {
+        BTreeNode<Key, Value> node = this;
+        while (node.isBranch()) {
+            node = node.getChild(index, node.data.children.length - 1);
+        }
+        if (node.data.values.length > 0) {
+            int idx = node.data.values.length - 1;
+            return new KeyValueEntry<Key, Value>(node.data.keys[idx], node.data.values[idx]);
+        } else {
+            return null;
+        }
+    }
+
+    public BTreeNode<Key, Value> getFirstLeafNode(BTreeIndex<Key, Value> index) {
+        BTreeNode<Key, Value> node = this;
+        while (node.isBranch()) {
+            node = node.getChild(index, 0);
+        }
+        return node;
+    }
+
+    public Iterator<Map.Entry<Key, Value>> iterator(BTreeIndex<Key, Value> index, final Key startKey) {
+        if (startKey == null) {
+            return iterator(index);
+        }
+        if (isBranch()) {
+            return getLeafNode(index, this, startKey).iterator(index, startKey);
+        } else {
+            int idx = Arrays.binarySearch(data.keys, startKey);
+            if (idx < 0) {
+                idx = -(idx + 1);
+            }
+            return new BTreeIterator<Key, Value>(index, this, idx);
+        }
+    }
+
+    public Iterator<Map.Entry<Key, Value>> iterator(final BTreeIndex<Key, Value> index) {
+        return new BTreeIterator<Key, Value>(index, getFirstLeafNode(index), 0);
+    }
+
+    @SuppressWarnings("unchecked")
+    public void clear(BTreeIndex<Key, Value> index) {
+        if (isBranch()) {
+            for (int i = 0; i < data.children.length; i++) {
+                BTreeNode<Key, Value> node = index.loadNode(data.children[i]);
+                node.clear(index);
+                index.free(node.getPage());
+            }
+        }
+        // Reset the root node to be a leaf.
+        if (data.parent == -1) {
+            data = data.leaf((Key[])EMPTY_ARRAY, (Value[])EMPTY_ARRAY, -1);
+            index.storeNode(this);
+        }
+    }
+
+    private static <Key, Value> BTreeNode<Key, Value> getLeafNode(BTreeIndex<Key, Value> index, final BTreeNode<Key, Value> node, Key key) {
+        BTreeNode<Key, Value> current = node;
+        while (true) {
+            if (current.isBranch()) {
+                int idx = Arrays.binarySearch(current.data.keys, key);
+                idx = idx < 0 ? -(idx + 1) : idx + 1;
+                BTreeNode<Key, Value> child = current.getChild(index, idx);
+
+                // A little cycle detection for sanity's sake
+                if (child == node) {
+                    throw new IndexException("BTree corrupted: Cylce detected.");
+                }
+
+                current = child;
+            } else {
+                break;
+            }
+        }
+        return current;
+    }
+
+    public boolean contains(BTreeIndex<Key, Value> index, Key key) {
+        if (key == null) {
+            throw new IllegalArgumentException("Key cannot be null");
+        }
+
+        if (isBranch()) {
+            return getLeafNode(index, this, key).contains(index, key);
+        } else {
+            int idx = Arrays.binarySearch(data.keys, key);
+            if (idx < 0) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Implementation methods
+    // /////////////////////////////////////////////////////////////////
+
+    private boolean splitNeeded() {
+        if (pageCount > 1 && data.keys.length > 1) {
+            if (pageCount > 128 || data.keys.length > 4) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Key[] createKeyArray(int size) {
+        return (Key[]) new Object[size];
+    }
+
+    @SuppressWarnings("unchecked")
+    private Value[] createValueArray(int size) {
+        return (Value[]) new Object[size];
+    }
+
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayUpdate(T[] vals, int idx, T value) {
+        T[] newVals = (T[]) new Object[vals.length];
+        System.arraycopy(vals, 0, newVals, 0, idx);
+        newVals[idx] = value;
+        return newVals;
+    }
+    
+    static private int[] arrayUpdate(int[] vals, int idx, int value) {
+        int[] newVals = new int[vals.length];
+        System.arraycopy(vals, 0, newVals, 0, idx);
+        newVals[idx] = value;
+        return newVals;
+    }    
+
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayDelete(T[] vals, int idx) {
+        T[] newVals = (T[]) new Object[vals.length - 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        if (idx < newVals.length) {
+            System.arraycopy(vals, idx + 1, newVals, idx, newVals.length - idx);
+        }
+        return newVals;
+    }
+
+    static private int[] arrayDelete(int[] vals, int idx) {
+        int[] newVals = new int[vals.length - 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        if (idx < newVals.length) {
+            System.arraycopy(vals, idx + 1, newVals, idx, newVals.length - idx);
+        }
+        return newVals;
+    }
+
+    @SuppressWarnings("unchecked")
+    static private <T> T[] arrayInsert(T[] vals, T val, int idx) {
+        T[] newVals = (T[]) new Object[vals.length + 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        newVals[idx] = val;
+        if (idx < vals.length) {
+            System.arraycopy(vals, idx, newVals, idx + 1, vals.length - idx);
+        }
+        return newVals;
+    }
+    
+    static private int[] arrayInsert(int[] vals, int val, int idx) {
+
+        int[] newVals = new int[vals.length + 1];
+        if (idx > 0) {
+            System.arraycopy(vals, 0, newVals, 0, idx);
+        }
+        newVals[idx] = val;
+        if (idx < vals.length) {
+            System.arraycopy(vals, idx, newVals, idx + 1, vals.length - idx);
+        }
+        return newVals;
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Property Accessors
+    // /////////////////////////////////////////////////////////////////
+    private boolean isBranch() {
+        return data.children != null;
+    }
+
+    public int getParent() {
+        return data.parent;
+    }
+
+    public int getPage() {
+        return page;
+    }
+    public void setPage(int page) {
+        this.page = page;
+    }
+
+    public int getNext() {
+        return data.next;
+    }
+
+    @Override
+    public String toString() {
+        return "[BTreeNode " + (isBranch() ? "branch" : "leaf") + ": " + Arrays.asList(data.keys) + "]";
+    }
+
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashBins.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hawtdb.api.EncoderDecoder;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.api.Paged;
+import org.apache.hawtdb.api.Paged.SliceType;
+
+import javolution.io.Struct;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class HashBins {
+    
+    public static final byte MAGIC [] = createMagic();
+    
+    static private class Header extends Struct {
+        public final UTF8String magic = new UTF8String(4);
+        public final Signed32 page = new Signed32();
+        public final Signed32 capacity = new Signed32();
+        public final Signed32 size = new Signed32();
+        public final Signed32 active = new Signed32();
+        
+        static Header create(ByteBuffer buffer) {
+            Header header = new Header();
+            header.setByteBuffer(buffer, buffer.position());
+            return header;
+        }
+    }
+
+    static public final EncoderDecoder<HashBins> ENCODER_DECODER = new EncoderDecoder<HashBins>() {
+        
+        public HashBins load(Paged paged, int page) {
+            ByteBuffer slice = paged.slice(SliceType.READ, page, 1);
+            try {
+                Header header = Header.create(slice);
+                HashBins rc = new HashBins();
+                rc.page = header.page.get();
+                rc.capacity = header.capacity.get();
+                rc.size = header.size.get();
+                rc.active = header.active.get();
+                return rc;
+            } finally {
+                paged.unslice(slice);
+            }
+        }
+
+        public List<Integer> store(Paged paged, int page, HashBins object) {
+            ByteBuffer slice = paged.slice(SliceType.WRITE, page, 1);
+            try {
+                Header header = Header.create(slice);
+                header.magic.set("HASH");
+                header.page.set(object.page);
+                header.capacity.set(object.capacity);
+                header.size.set(object.size);
+                header.active.set(object.active);
+                return Collections.emptyList();
+            } finally {
+                paged.unslice(slice);
+            }
+        }
+        
+        public void remove(Paged paged, int page) {
+        }
+
+    };
+    
+    
+    int page=-1;
+    int active;
+    int capacity;
+    int size;
+    
+    <Key,Value> void create(HashIndex<Key,Value> index, int capacity) {
+        this.size = 0;
+        this.active = 0;
+        this.capacity = capacity;
+        this.page = index.paged.allocator().alloc(capacity);
+        for (int i = 0; i < capacity; i++) {
+            index.BIN_FACTORY.create(index.paged, (page + i));
+        }
+        index.paged.put(ENCODER_DECODER, index.page, this);
+    }
+    
+    <Key,Value> Value put(HashIndex<Key,Value> index, Key key, Value value) {
+        Index<Key, Value> bin = bin(index, key);
+
+        int originalSize = bin.size();
+        Value result = bin.put(key,value);
+        int newSize = bin.size();
+
+        if (newSize != originalSize) {
+            size++;
+            if (newSize == 1) {
+                active++;
+            }
+            index.paged.put(ENCODER_DECODER, index.page, this);
+        }
+        return result;
+    }
+    
+    <Key,Value> Value remove(HashIndex<Key,Value> index, Key key) {
+        Index<Key, Value> bin = bin(index, key);
+        int originalSize = bin.size();
+        Value result = bin.remove(key);
+        int newSize = bin.size();
+        
+        if (newSize != originalSize) {
+            size--;
+            if (newSize == 0) {
+                active--;
+            }
+            index.paged.put(ENCODER_DECODER, index.page, this);
+        }
+        return result;
+    }
+    
+    <Key,Value> void clear(HashIndex<Key,Value> index) {
+        for (int i = 0; i < capacity; i++) {
+            bin(index, i).clear();
+        }
+        size = 0;
+        active = 0;
+    }
+    
+    <Key,Value> void destroy(HashIndex<Key,Value> index) {
+        for (int i = 0; i < capacity; i++) {
+            bin(index, i).clear();
+        }
+        index.paged.allocator().free(page, capacity);
+        size = 0;
+        active = 0;
+        capacity = 0;
+        page = -1;
+    }
+
+    <Key,Value> Index<Key,Value> bin(HashIndex<Key,Value> index, int bin) {
+        return index.BIN_FACTORY.open(index.paged, page+bin);
+    }
+
+    <Key,Value> Index<Key,Value> bin(HashIndex<Key,Value> index, Key key) {
+        int i = index(key);
+        return index.BIN_FACTORY.open(index.paged, page+i);
+    }
+
+    <Key,Value> int index(Key x) {
+        return Math.abs(x.hashCode()%capacity);
+    }
+    
+    @Override
+    public String toString() {
+        return "{ page:"+page+", capacity: "+capacity+", active: "+active+", size: "+size+" }";
+    }
+    
+    private static byte[] createMagic() {
+        try {
+            return "HASH".getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/HashIndex.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.activemq.util.marshaller.Marshaller;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawtdb.api.Index;
+import org.apache.hawtdb.api.Paged;
+
+
+/**
+ * Hash Index implementation.  The hash buckets use a BTree.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class HashIndex<Key,Value> implements Index<Key,Value> {
+    
+    private static final Log LOG = LogFactory.getLog(HashIndex.class);
+        
+    public static final String PROPERTY_PREFIX = HashIndex.class.getName()+".";
+    public static final int DEFAULT_BIN_CAPACITY = Integer.parseInt(System.getProperty(PROPERTY_PREFIX+"DEFAULT_BIN_CAPACITY", "1024"));
+    public static final int DEFAULT_MAXIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty(PROPERTY_PREFIX+"DEFAULT_MAXIMUM_BIN_CAPACITY", "16384"));
+    public static final int DEFAULT_MINIMUM_BIN_CAPACITY = Integer.parseInt(System.getProperty(PROPERTY_PREFIX+"DEFAULT_MINIMUM_BIN_CAPACITY", "16"));
+    public static final int DEFAULT_LOAD_FACTOR = Integer.parseInt(System.getProperty(PROPERTY_PREFIX+"DEFAULT_LOAD_FACTOR", "75"));
+
+    static class Factory<Key, Value> {
+        private Marshaller<Key> keyMarshaller;
+        private Marshaller<Value> valueMarshaller;
+        private int maximumBinCapacity = DEFAULT_MAXIMUM_BIN_CAPACITY;
+        private int minimumBinCapacity = DEFAULT_MINIMUM_BIN_CAPACITY;
+        private int loadFactor = DEFAULT_LOAD_FACTOR;
+
+        public HashIndex<Key, Value> open(Paged paged, int page) {
+            return docreate(paged, page).open();
+        }
+
+        public HashIndex<Key, Value> create(Paged paged, int page) {
+            return docreate(paged, page).create();
+        }
+
+        private HashIndex<Key, Value> docreate(Paged paged, int page) {
+            assertFieldsSet();
+            return new HashIndex<Key, Value>(paged, page, keyMarshaller, valueMarshaller, maximumBinCapacity, minimumBinCapacity, loadFactor);
+        }
+
+        private void assertFieldsSet() {
+            if (keyMarshaller == null) {
+                throw new IllegalArgumentException("The key marshaller must be set before calling open");
+            }
+            if (valueMarshaller == null) {
+                throw new IllegalArgumentException("The key marshaller must be set before calling open");
+            }
+        }
+
+        public Marshaller<Key> getKeyMarshaller() {
+            return keyMarshaller;
+        }
+
+        public void setKeyMarshaller(Marshaller<Key> keyMarshaller) {
+            this.keyMarshaller = keyMarshaller;
+        }
+
+        public Marshaller<Value> getValueMarshaller() {
+            return valueMarshaller;
+        }
+
+        public void setValueMarshaller(Marshaller<Value> valueMarshaller) {
+            this.valueMarshaller = valueMarshaller;
+        }
+
+        public int getMaximumBinCapacity() {
+            return maximumBinCapacity;
+        }
+
+        public void setMaximumBinCapacity(int maximumBinCapacity) {
+            this.maximumBinCapacity = maximumBinCapacity;
+        }
+
+        public int getMinimumBinCapacity() {
+            return minimumBinCapacity;
+        }
+
+        public void setMinimumBinCapacity(int minimumBinCapacity) {
+            this.minimumBinCapacity = minimumBinCapacity;
+        }
+
+        public int getLoadFactor() {
+            return loadFactor;
+        }
+
+        public void setLoadFactor(int loadFactor) {
+            this.loadFactor = loadFactor;
+        }
+        
+    }
+    
+    final BTreeIndex.Factory<Key, Value> BIN_FACTORY = new BTreeIndex.Factory<Key, Value>();
+
+    final Paged paged;
+    final int page;
+    final int maximumBinCapacity;
+    final int minimumBinCapacity;
+    private final int loadFactor;
+
+    private HashBins bins;
+    int increaseThreshold;
+    int decreaseThreshold;
+
+    public HashIndex(Paged paged, int page, Marshaller<Key> keyMarshaller, Marshaller<Value> valueMarshaller, int maximumBinCapacity, int minimumBinCapacity, int loadFactor) {
+        this.paged = paged;
+        this.page = page;
+        this.maximumBinCapacity = maximumBinCapacity;
+        this.minimumBinCapacity = minimumBinCapacity;
+        this.loadFactor = loadFactor;
+        this.BIN_FACTORY.setKeyMarshaller(keyMarshaller);
+        this.BIN_FACTORY.setValueMarshaller(valueMarshaller);
+    }
+
+    public HashIndex<Key, Value> create() {
+        this.bins = new HashBins();
+        this.bins.create(this, DEFAULT_BIN_CAPACITY);
+        paged.put(HashBins.ENCODER_DECODER, page, bins);
+        calcThresholds();
+        return this;
+    }
+
+    public HashIndex<Key, Value> open() {
+        this.bins = paged.get(HashBins.ENCODER_DECODER, page);
+        calcThresholds();
+        return this;
+    }
+
+    public Value get(Key key) {
+        return bins.bin(this, key).get(key);
+    }
+    
+    public boolean containsKey(Key key) {
+        return bins.bin(this, key).containsKey(key);
+    }
+
+    public Value put(Key key, Value value) {
+        Value put = bins.put(this, key, value);
+        if (bins.active >= this.increaseThreshold) {
+            int newSize = Math.min(this.maximumBinCapacity, bins.capacity*2);
+            if(bins.capacity!=newSize) {
+                this.resize(newSize);
+            }
+        }
+        return put;
+    }
+    
+    public Value remove(Key key) {
+        Value rc = bins.remove(this, key);
+        if (bins.active <= this.decreaseThreshold) {
+            int newSize = Math.max(minimumBinCapacity, bins.capacity/2);
+            if(bins.capacity!=newSize) {
+                resize(newSize);
+            }
+        }
+        return rc;
+    }
+
+    public void clear() {
+        bins.clear(this);
+        if (bins.active <= this.decreaseThreshold) {
+            int newSize = Math.max(minimumBinCapacity, bins.capacity/2);
+            if(bins.capacity!=newSize) {
+                resize(newSize);
+            }
+        }
+    }
+    
+    public Iterator<Entry<Key, Value>> iterator() throws UnsupportedOperationException {
+        throw new UnsupportedOperationException();
+    }
+    
+    public int size() {
+        return bins.size;
+    }
+    
+    public void destroy() {
+        bins.destroy(this);
+        bins = null;
+    }
+
+    public String toString() {
+        return "{ page: "+page+", bins: "+bins+" }";
+    }
+
+    // /////////////////////////////////////////////////////////////////
+    // Implementation Methods
+    // /////////////////////////////////////////////////////////////////
+    void resize(final int capacity) {
+        LOG.debug("Resizing to: "+capacity);
+        
+        HashBins newBins = new HashBins();
+        newBins.create(this, capacity);
+
+        // Copy the data from the old bins to the new bins.
+        for (int i = 0; i < bins.capacity; i++) {
+            Index<Key, Value> bin = bins.bin(this, i);
+            for (Map.Entry<Key, Value> entry : bin) {
+                newBins.put(this, entry.getKey(), entry.getValue());
+            }
+        }
+        
+        bins.destroy(this);
+        bins = newBins;
+        calcThresholds();
+        LOG.debug("Resizing done.  New bins start at: "+bins.page);        
+    }
+
+    private void calcThresholds() {
+        increaseThreshold = (bins.capacity * loadFactor)/100;
+        decreaseThreshold = (bins.capacity * loadFactor * loadFactor ) / 20000;
+    }
+
+    public int getPage() {
+        return page;
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/KeyValueEntry.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/KeyValueEntry.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/KeyValueEntry.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/KeyValueEntry.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+import java.util.Map;
+
+/**
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final class KeyValueEntry<Key, Value> implements Map.Entry<Key, Value> {
+    private final Key key;
+    private final Value value;
+
+    public KeyValueEntry(Key key, Value value) {
+        this.key = key;
+        this.value = value;
+    }
+
+    public Key getKey() {
+        return key;
+    }
+
+    public Value getValue() {
+        return value;
+    }
+
+    public Value setValue(Value value) {
+        throw new UnsupportedOperationException();
+    }
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/Prefixer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/Prefixer.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/Prefixer.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/Prefixer.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+/**
+ * Interface used to determine the simple prefix of two keys.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public interface Prefixer<Key> {
+
+    /**
+     * This methods should return shortest prefix of value2 where the
+     * following still holds:<br/>
+     * value1 <= prefix <= value2.<br/>
+     * <br/>
+     * 
+     * When this method is called, the following is guaranteed:<br/>
+     * value1 < value2<br/>
+     * <br/>
+     * 
+     * 
+     * @param value1
+     * @param value2
+     * @return
+     */
+    public Key getSimplePrefix(Key value1, Key value2);
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/StringPrefixer.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/StringPrefixer.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/StringPrefixer.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/StringPrefixer.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.index;
+
+/**
+ * StringPrefixer is a {@link Prefixer} implementation that works on strings.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class StringPrefixer implements Prefixer<String> {
+
+    /**
+     * Example: If value1 is "Hello World" and value 2 is "Help Me" then the
+     * result will be: "Help"
+     * 
+     * @see Prefixer#getSimplePrefix
+     */
+    public String getSimplePrefix(String value1, String value2) {
+        char[] c1 = value1.toCharArray();
+        char[] c2 = value2.toCharArray();
+        int n = Math.min(c1.length, c2.length);
+        int i = 0;
+        while (i < n) {
+            if (c1[i] != c2[i]) {
+                return value2.substring(0, i + 1);
+            }
+            i++;
+        }
+
+        if (n == c2.length) {
+            return value2;
+        }
+        return value2.substring(0, n);
+    }
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,32 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	Index implementations.
+</p>
+
+<p>
+  Indexes provide fast key/value storage.  They use the Paging API to store 
+  thier data.
+</p>
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/index/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.io;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.lang.reflect.Method;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.HashSet;
+
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.api.IOPagingException;
+
+/**
+ * Provides Memory Mapped access to a file.  It manages pooling the 
+ * direct buffers which mapped to the files.  Multiple direct buffers
+ * are used to deal with OS and Java restrictions.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final public class MemoryMappedFile {
+	
+	private final ByteBufferReleaser BYTE_BUFFER_RELEASER = createByteBufferReleaser();
+
+	private final int bufferSize;
+	private final ArrayList<MappedByteBuffer> buffers = new ArrayList<MappedByteBuffer>(10);
+	private final FileChannel channel;
+	private final FileDescriptor fd;
+    private final HashSet<ByteBuffer> bounderyBuffers = new HashSet<ByteBuffer>(10);
+
+
+	public MemoryMappedFile(File file, int bufferSize) throws IOException {
+		this.bufferSize = bufferSize;
+		RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+		this.fd = randomAccessFile.getFD();
+		this.channel = randomAccessFile.getChannel();
+	}
+
+	public void read(long position, byte[] data) throws IOPagingException {
+		this.read(position, data, 0, data.length);
+	}
+
+	public void read(long position, Buffer data) throws IOPagingException {
+		this.read(position, data.data, data.offset, data.length);
+	}
+	
+	public void read(long position, byte[] data, int offset, int length) throws IOPagingException {
+		int bufferIndex = (int) (position / bufferSize);
+		int bufferOffset = (int) (position % bufferSize);
+		ByteBuffer buffer = loadBuffer(bufferIndex);
+		buffer = position(buffer, bufferOffset);
+		int remaining = buffer.remaining();
+		while (length > remaining) {
+			buffer.get(data, offset, remaining);
+			offset += remaining;
+			length -= remaining;
+			bufferIndex++;
+			buffer = loadBuffer(bufferIndex).duplicate();
+
+		}
+		buffer.get(data, offset, length);
+	}
+
+	public ByteBuffer read(long position, int length) throws IOPagingException {
+		int bufferIndex = (int) (position / bufferSize);
+		int bufferOffset = (int) (position % bufferSize);
+		ByteBuffer buffer = loadBuffer(bufferIndex);
+		buffer = position(buffer, bufferOffset);
+		int remaining = buffer.remaining();
+		if (length > remaining) {
+			// In the case we can't contiguously read the entire buffer.. 
+			// fallback to using non-direct buffers..
+			byte[] data = new byte[length];
+			read(position, data);
+			return ByteBuffer.wrap(data);
+		} else {
+			return (ByteBuffer) buffer.limit(buffer.position()+length);
+		}
+	}
+	
+    public ByteBuffer slice(boolean readOnly, long position, int length) {
+        int bufferIndex = (int) (position / bufferSize);
+        int bufferOffset = (int) (position % bufferSize);
+        ByteBuffer buffer = loadBuffer(bufferIndex);
+        buffer = position(buffer, bufferOffset);
+        int remaining = buffer.remaining();
+        if (length > remaining) {
+            try {
+                buffer = channel.map( readOnly ? MapMode.READ_ONLY : MapMode.READ_WRITE, position, length);
+                bounderyBuffers.add(buffer);
+                return buffer;
+            } catch (IOException e) {
+                throw new IOPagingException(e);
+            }
+        }
+        return (ByteBuffer) buffer.limit(buffer.position()+length);
+    }
+    
+    public void unslice(ByteBuffer buffer) {
+        if( bounderyBuffers.remove(buffer) ) {
+            BYTE_BUFFER_RELEASER.release(buffer);
+        }
+    }
+
+	static public class ChannelTransfer {
+		private final FileChannel channel;
+		private final long position;
+		private final long length;
+
+		public ChannelTransfer(FileChannel channel, long position, long length) {
+			this. channel = channel;
+			this.position = position;
+			this.length = length;
+		}
+
+		/**
+		 * Writes the transfer to the destinations current file position.
+		 * 
+		 * @param destination
+		 * @throws IOException
+		 */
+		public void writeTo(FileChannel destination) throws IOException {
+			channel.transferTo(position, length, destination);
+		}
+	}
+
+	public ChannelTransfer readChannelTansfer(int position, int length) throws IOPagingException {
+		return new ChannelTransfer(channel, position, length);
+	}
+	
+	public void writeChannelTansfer(long position, ChannelTransfer transfer) throws IOPagingException {
+		try {
+            channel.position(position);
+            transfer.writeTo(channel);
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+	}
+	
+	public void write(long position, byte[] data) throws IOPagingException {
+		this.write(position, data, 0, data.length);
+	}
+
+	public void write(long position, Buffer data) throws IOPagingException {
+		this.write(position, data.data, data.offset, data.length);
+	}
+
+	public void write(long position, ByteBuffer data) throws IOPagingException {
+		int bufferIndex = (int) (position / bufferSize);
+		int bufferOffset = (int) (position % bufferSize);
+		ByteBuffer buffer = loadBuffer(bufferIndex);
+		buffer = position(buffer, bufferOffset);
+		int remaining = buffer.remaining();
+		while (data.remaining() > remaining) {
+			int l = data.limit();
+			data.limit(data.position()+remaining);
+			buffer.put(data);
+			data.limit(l);
+			bufferIndex++;
+			buffer = loadBuffer(bufferIndex).duplicate();
+		}
+		buffer.put(data);	
+	}
+
+	public void write(long position, byte[] data, int offset, int length)
+			throws IOPagingException {
+		int bufferIndex = (int) (position / bufferSize);
+		int bufferOffset = (int) (position % bufferSize);
+		ByteBuffer buffer = loadBuffer(bufferIndex);
+		buffer = position(buffer, bufferOffset);
+		int remaining = buffer.remaining();
+		while (length > remaining) {
+			buffer.put(data, offset, remaining);
+			offset += remaining;
+			length -= remaining;
+			bufferIndex++;
+			buffer = loadBuffer(bufferIndex).duplicate();
+
+		}
+		buffer.put(data, offset, length);
+	}
+
+	private ByteBuffer position(ByteBuffer buffer, int offset) {
+		return (ByteBuffer) buffer.duplicate().position(offset);
+	}
+
+	private MappedByteBuffer loadBuffer(int index) throws IOPagingException {
+		while (index >= buffers.size()) {
+			buffers.add(null);
+		}
+		MappedByteBuffer buffer = buffers.get(index);
+		if (buffer == null) {
+			try {
+                buffer = channel.map(MapMode.READ_WRITE, index*bufferSize, bufferSize);
+            } catch (IOException e) {
+                throw new IOPagingException(e);
+            }
+			buffers.set(index, buffer);
+		}
+		return buffer;
+	}
+
+	public void sync() throws IOPagingException {
+		for (MappedByteBuffer buffer : buffers) {
+			if (buffer != null) {
+				buffer.force();
+			}
+		}
+        try {
+            IOHelper.sync(fd);
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+	}
+	
+	public void close() throws IOPagingException {
+		sync();
+		for (MappedByteBuffer buffer : buffers) {
+			if (buffer != null) {
+				BYTE_BUFFER_RELEASER.release(buffer);
+			}
+		}
+		buffers.clear();
+		try {
+            channel.close();
+        } catch (IOException e) {
+            throw new IOPagingException(e);
+        }
+	}
+
+	private static interface ByteBufferReleaser {
+		public void release(ByteBuffer buffer);
+	}
+	
+	static private ByteBufferReleaser createByteBufferReleaser() {
+		
+		// Try to drill into the java.nio.DirectBuffer internals...
+		final Method[] cleanerMethods = AccessController.doPrivileged(new PrivilegedAction<Method[]>() {
+			public Method[] run() {
+				try {
+					ByteBuffer buffer = ByteBuffer.allocateDirect(1);
+					Class<?> bufferClazz = buffer.getClass();
+					Method cleanerMethod = bufferClazz.getMethod("cleaner", new Class[0]);
+					cleanerMethod.setAccessible(true);
+					Method cleanMethod = cleanerMethod.getReturnType().getMethod("clean");
+					return new Method[]{cleanerMethod, cleanMethod};
+				} catch (Exception e) {
+					return null;
+				}
+			}
+		});
+		
+		// Yay, we can actually release the buffers.
+		if( cleanerMethods !=null ) {
+			return new ByteBufferReleaser() {
+				public void release(ByteBuffer buffer) {
+					try {
+						Object cleaner = cleanerMethods[0].invoke(buffer);
+						if( cleaner!=null ) {
+							cleanerMethods[1].invoke(cleaner);
+						}
+					} catch (Throwable e) {
+						e.printStackTrace();
+					}
+				}
+			};
+		}
+		
+		// We can't really release the buffers.. Good Luck!
+		return new ByteBufferReleaser() {
+			public void release(ByteBuffer buffer) {
+			}
+		};
+	}
+
+}
\ No newline at end of file

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFileFactory.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFileFactory.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFileFactory.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/MemoryMappedFileFactory.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.io;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Factory for {@link MemoryMappedFile} objects.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class MemoryMappedFileFactory {
+    
+    private MemoryMappedFile memoryMappedFile;
+    
+    protected File file;
+    protected int mappingSegementSize=1024*1024*64;
+    
+    public void open() throws IOException {
+        if( memoryMappedFile == null ) {
+            if( file ==  null ) {
+                throw new IllegalArgumentException("file property not set");
+            }
+            if( mappingSegementSize <= 0 ) {
+                throw new IllegalArgumentException("mappingSegementSize property must be greater than 0");
+            }
+            // We auto create the parent directory.
+            file.getParentFile().mkdirs();
+            memoryMappedFile = new MemoryMappedFile(file, mappingSegementSize);
+        }
+    }
+    
+    public void close() {
+        if( memoryMappedFile!=null ) {
+            memoryMappedFile.close();
+            memoryMappedFile=null;
+        }
+    }
+
+    public MemoryMappedFile getMemoryMappedFile() throws IOException {
+        return memoryMappedFile;
+    }
+
+    public File getFile() {
+        return file;
+    }
+    public void setFile(File file) {
+        this.file = file;
+    }
+
+    public int getMappingSegementSize() {
+        return mappingSegementSize;
+    }
+    public void setMappingSegementSize(int mappingSegementSize) {
+        this.mappingSegementSize = mappingSegementSize;
+    }
+    
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/package.html
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/package.html?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/package.html (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/package.html Thu Oct 15 17:04:11 2009
@@ -0,0 +1,27 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+   
+    http://www.apache.org/licenses/LICENSE-2.0
+   
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+<p>
+	Provides low level IO access.
+</p>
+
+</body>
+</html>

Propchange: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/io/package.html
------------------------------------------------------------------------------
    svn:executable = *

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFile.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFile.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFile.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.list.LinkedNode;
+
+/**
+ * DataFile
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFile> {
+
+    protected final File file;
+    protected final Integer dataFileId;
+    protected int length;
+
+    DataFile(File file, int number, int preferedSize) {
+        this.file = file;
+        this.dataFileId = Integer.valueOf(number);
+        length = (int)(file.exists() ? file.length() : 0);
+    }
+    
+    public File getFile() {
+        return file;
+    }
+
+    public Integer getDataFileId() {
+        return dataFileId;
+    }
+
+    public synchronized int getLength() {
+        return length;
+    }
+
+    public void setLength(int length) {
+        this.length = length;
+    }
+
+    public synchronized void incrementLength(int size) {
+        length += size;
+    }
+
+    public synchronized String toString() {
+        return file.getName() + " number = " + dataFileId + " , length = " + length;
+    }
+
+    public synchronized RandomAccessFile openRandomAccessFile() throws IOException {
+        return new RandomAccessFile(file, "rw");
+    }
+
+    public synchronized void closeRandomAccessFile(RandomAccessFile file) throws IOException {
+        file.close();
+    }
+
+    public synchronized boolean delete() throws IOException {
+        return file.delete();
+    }
+    
+    public synchronized void move(File targetDirectory) throws IOException{
+        IOHelper.moveFile(file,targetDirectory);
+    }
+
+    public int compareTo(DataFile df) {
+        return dataFileId - df.dataFileId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        boolean result = false;
+        if (o instanceof DataFile) {
+            result = compareTo((DataFile)o) == 0;
+        }
+        return result;
+    }
+
+    @Override
+    public int hashCode() {
+        return dataFileId;
+    }
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessor.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessor.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessor.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessor.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Map;
+
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.buffer.Buffer;
+import org.apache.hawtdb.internal.journal.DataFileAppender.WriteCommand;
+import org.apache.hawtdb.internal.journal.DataFileAppender.WriteKey;
+
+/**
+ * Optimized Store reader and updater. Single threaded and synchronous. Use in
+ * conjunction with the DataFileAccessorPool of concurrent use.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+final class DataFileAccessor {
+
+    private final DataFile dataFile;
+    private final Map<WriteKey, WriteCommand> inflightWrites;
+    private final RandomAccessFile file;
+    private boolean disposed;
+
+    /**
+     * Construct a Store reader
+     * 
+     * @param fileId
+     * @throws IOException
+     */
+    public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOException {
+        this.dataFile = dataFile;
+        this.inflightWrites = dataManager.getInflightWrites();
+        this.file = dataFile.openRandomAccessFile();
+    }
+
+    public DataFile getDataFile() {
+        return dataFile;
+    }
+
+    public void dispose() {
+        if (disposed) {
+            return;
+        }
+        disposed = true;
+        try {
+            dataFile.closeRandomAccessFile(file);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public Buffer readRecord(Location location) throws IOException {
+
+        if (!location.isValid()) {
+            throw new IOException("Invalid location: " + location);
+        }
+
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            return asyncWrite.data;
+        }
+
+        try {
+
+            if (location.getSize() == Location.NOT_SET) {
+                file.seek(location.getOffset());
+                location.setSize(file.readInt());
+                location.setType(file.readByte());
+            } else {
+                file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
+            }
+
+            byte[] data = new byte[location.getSize() - Journal.RECORD_HEAD_SPACE];
+            file.readFully(data);
+            return new Buffer(data, 0, data.length);
+
+        } catch (RuntimeException e) {
+            throw new IOException("Invalid location: " + location + ", : " + e);
+        }
+    }
+    
+    public void read(long offset, byte data[]) throws IOException {
+       file.seek(offset);
+       file.readFully(data);
+    }
+
+    public void readLocationDetails(Location location) throws IOException {
+        WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+        if (asyncWrite != null) {
+            location.setSize(asyncWrite.location.getSize());
+            location.setType(asyncWrite.location.getType());
+        } else {
+            file.seek(location.getOffset());
+            location.setSize(file.readInt());
+            location.setType(file.readByte());
+        }
+    }
+
+//    public boolean readLocationDetailsAndValidate(Location location) {
+//        try {
+//            WriteCommand asyncWrite = (WriteCommand)inflightWrites.get(new WriteKey(location));
+//            if (asyncWrite != null) {
+//                location.setSize(asyncWrite.location.getSize());
+//                location.setType(asyncWrite.location.getType());
+//            } else {
+//                file.seek(location.getOffset());
+//                location.setSize(file.readInt());
+//                location.setType(file.readByte());
+//
+//                byte data[] = new byte[3];
+//                file.seek(location.getOffset() + Journal.ITEM_HEAD_OFFSET_TO_SOR);
+//                file.readFully(data);
+//                if (data[0] != Journal.ITEM_HEAD_SOR[0]
+//                    || data[1] != Journal.ITEM_HEAD_SOR[1]
+//                    || data[2] != Journal.ITEM_HEAD_SOR[2]) {
+//                    return false;
+//                }
+//                file.seek(location.getOffset() + location.getSize() - Journal.ITEM_FOOT_SPACE);
+//                file.readFully(data);
+//                if (data[0] != Journal.ITEM_HEAD_EOR[0]
+//                    || data[1] != Journal.ITEM_HEAD_EOR[1]
+//                    || data[2] != Journal.ITEM_HEAD_EOR[2]) {
+//                    return false;
+//                }
+//            }
+//        } catch (IOException e) {
+//            return false;
+//        }
+//        return true;
+//    }
+
+    public void updateRecord(Location location, Buffer data, boolean sync) throws IOException {
+
+        file.seek(location.getOffset() + Journal.RECORD_HEAD_SPACE);
+        int size = Math.min(data.getLength(), location.getSize());
+        file.write(data.getData(), data.getOffset(), size);
+        if (sync) {
+            IOHelper.sync(file.getFD());
+        }
+
+    }
+
+}

Added: activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessorPool.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessorPool.java?rev=825564&view=auto
==============================================================================
--- activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessorPool.java (added)
+++ activemq/sandbox/activemq-flow/hawtdb/src/main/java/org/apache/hawtdb/internal/journal/DataFileAccessorPool.java Thu Oct 15 17:04:11 2009
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hawtdb.internal.journal;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to pool DataFileAccessors.
+ * 
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+public class DataFileAccessorPool {
+
+    private final Journal journal;
+    private final Map<Integer, Pool> pools = new HashMap<Integer, Pool>();
+    private boolean closed;
+    private int maxOpenReadersPerFile = 5;
+
+    class Pool {
+
+        private final DataFile file;
+        private final List<DataFileAccessor> pool = new ArrayList<DataFileAccessor>();
+        private boolean used;
+        private int openCounter;
+        private boolean disposed;
+
+        public Pool(DataFile file) {
+            this.file = file;
+        }
+
+        public DataFileAccessor openDataFileReader() throws IOException {
+            DataFileAccessor rc = null;
+            if (pool.isEmpty()) {
+                rc = new DataFileAccessor(journal, file);
+            } else {
+                rc = pool.remove(pool.size() - 1);
+            }
+            used = true;
+            openCounter++;
+            return rc;
+        }
+
+        public synchronized void closeDataFileReader(DataFileAccessor reader) {
+            openCounter--;
+            if (pool.size() >= maxOpenReadersPerFile || disposed) {
+                reader.dispose();
+            } else {
+                pool.add(reader);
+            }
+        }
+
+        public synchronized void clearUsedMark() {
+            used = false;
+        }
+
+        public synchronized boolean isUsed() {
+            return used;
+        }
+
+        public synchronized void dispose() {
+            for (DataFileAccessor reader : pool) {
+                reader.dispose();
+            }
+            pool.clear();
+            disposed = true;
+        }
+
+        public synchronized int getOpenCounter() {
+            return openCounter;
+        }
+
+    }
+
+    public DataFileAccessorPool(Journal dataManager) {
+        this.journal = dataManager;
+    }
+
+    synchronized void clearUsedMark() {
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            pool.clearUsedMark();
+        }
+    }
+
+    synchronized void disposeUnused() {
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            if (!pool.isUsed()) {
+                pool.dispose();
+                iter.remove();
+            }
+        }
+    }
+
+    synchronized void disposeDataFileAccessors(DataFile dataFile) {
+        if (closed) {
+            throw new IllegalStateException("Closed.");
+        }
+        Pool pool = pools.get(dataFile.getDataFileId());
+        if (pool != null) {
+            if (pool.getOpenCounter() == 0) {
+                pool.dispose();
+                pools.remove(dataFile.getDataFileId());
+            } else {
+                throw new IllegalStateException("The data file is still in use: " + dataFile + ", use count: " + pool.getOpenCounter());
+            }
+        }
+    }
+
+    synchronized DataFileAccessor openDataFileAccessor(DataFile dataFile) throws IOException {
+        if (closed) {
+            throw new IOException("Closed.");
+        }
+
+        Pool pool = pools.get(dataFile.getDataFileId());
+        if (pool == null) {
+            pool = new Pool(dataFile);
+            pools.put(dataFile.getDataFileId(), pool);
+        }
+        return pool.openDataFileReader();
+    }
+
+    synchronized void closeDataFileAccessor(DataFileAccessor reader) {
+        Pool pool = pools.get(reader.getDataFile().getDataFileId());
+        if (pool == null || closed) {
+            reader.dispose();
+        } else {
+            pool.closeDataFileReader(reader);
+        }
+    }
+
+    public synchronized void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        for (Iterator<Pool> iter = pools.values().iterator(); iter.hasNext();) {
+            Pool pool = iter.next();
+            pool.dispose();
+        }
+        pools.clear();
+    }
+
+}



Mime
View raw message