hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1387533 [2/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j...
Date Wed, 19 Sep 2012 11:52:24 GMT
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,1532 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.List;
+
+/**
+ * Node of a BTree.
+ * <p/>
+ * The node contains a number of key-value pairs. Keys are ordered to allow
+ * dichotomic search. If value is too big, it is stored in separate record and
+ * only recid reference is stored
+ * <p/>
+ * If the node is a leaf node, the keys and values are user-defined and
+ * represent entries inserted by the user.
+ * <p/>
+ * If the node is non-leaf, each key represents the greatest key in the
+ * underlying BTreeNode and the values are recids pointing to the children
+ * BTreeNodes. The only exception is the rightmost BTreeNode, which is
+ * considered to have an "infinite" key value, meaning that any insert will be
+ * to the left of this pseudo-key
+ */
+public final class BTreeNode<K, V> implements Serializer<BTreeNode<K, V>> {
+
+  private static final boolean DEBUG = false;
+
+  /**
+   * Parent B+Tree.
+   */
+  transient BTree<K, V> _btree;
+
+  /**
+   * This BTreeNode's record ID in the DB.
+   */
+  protected transient long _recid;
+
+  /**
+   * Flag indicating if this is a leaf BTreeNode.
+   */
+  protected boolean _isLeaf;
+
+  /**
+   * Keys of children nodes
+   */
+  protected K[] _keys;
+
+  /**
+   * Values associated with keys. (Only valid if leaf node)
+   */
+  protected Object[] _values;
+
+  /**
+   * Children nodes (recids) associated with keys. (Only valid if non-leaf node)
+   */
+  protected long[] _children;
+
+  /**
+   * Index of first used item at the node
+   */
+  protected byte _first;
+
+  /**
+   * Previous leaf node (only if this node is a leaf)
+   */
+  protected long _previous;
+
+  /**
+   * Next leaf node (only if this node is a leaf)
+   */
+  protected long _next;
+
+  /**
+   * Return the B+Tree that is the owner of this {@link BTreeNode}.
+   */
+  public BTree<K, V> getBTree() {
+    return _btree;
+  }
+
+  /**
+   * No-argument constructor used by serialization.
+   */
+  public BTreeNode() {
+    // empty
+  }
+
+  /**
+   * Root node overflow constructor
+   */
+  @SuppressWarnings("unchecked")
+  BTreeNode(BTree<K, V> btree, BTreeNode<K, V> root, BTreeNode<K, V> overflow)
+      throws IOException {
+    _btree = btree;
+
+    _isLeaf = false;
+
+    _first = BTree.DEFAULT_SIZE - 2;
+
+    _keys = (K[]) new Object[BTree.DEFAULT_SIZE];
+    _keys[BTree.DEFAULT_SIZE - 2] = overflow.getLargestKey();
+    _keys[BTree.DEFAULT_SIZE - 1] = root.getLargestKey();
+
+    _children = new long[BTree.DEFAULT_SIZE];
+    _children[BTree.DEFAULT_SIZE - 2] = overflow._recid;
+    _children[BTree.DEFAULT_SIZE - 1] = root._recid;
+
+    _recid = _btree._db.insert(this, this, false);
+  }
+
+  /**
+   * Root node (first insert) constructor.
+   */
+  @SuppressWarnings("unchecked")
+  BTreeNode(BTree<K, V> btree, K key, V value) throws IOException {
+    _btree = btree;
+
+    _isLeaf = true;
+
+    _first = BTree.DEFAULT_SIZE - 2;
+
+    _keys = (K[]) new Object[BTree.DEFAULT_SIZE];
+    _keys[BTree.DEFAULT_SIZE - 2] = key;
+    _keys[BTree.DEFAULT_SIZE - 1] = null; // I am the root BTreeNode for now
+
+    _values = new Object[BTree.DEFAULT_SIZE];
+    _values[BTree.DEFAULT_SIZE - 2] = value;
+    _values[BTree.DEFAULT_SIZE - 1] = null; // I am the root BTreeNode for now
+
+    _recid = _btree._db.insert(this, this, false);
+  }
+
+  /**
+   * Overflow node constructor. Creates an empty BTreeNode.
+   */
+  @SuppressWarnings("unchecked")
+  BTreeNode(BTree<K, V> btree, boolean isLeaf) {
+    _btree = btree;
+
+    _isLeaf = isLeaf;
+
+    // node will initially be half-full
+    _first = BTree.DEFAULT_SIZE / 2;
+
+    _keys = (K[]) new Object[BTree.DEFAULT_SIZE];
+    if (isLeaf) {
+      _values = new Object[BTree.DEFAULT_SIZE];
+    } else {
+      _children = new long[BTree.DEFAULT_SIZE];
+    }
+
+    try {
+      _recid = _btree._db.insert(this, this, false);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  /**
+   * Get largest key under this BTreeNode. Null is considered to be the greatest
+   * possible key.
+   */
+  K getLargestKey() {
+    return _keys[BTree.DEFAULT_SIZE - 1];
+  }
+
+  /**
+   * Return true if BTreeNode is empty.
+   */
+  boolean isEmpty() {
+    if (_isLeaf) {
+      return (_first == _values.length - 1);
+    } else {
+      return (_first == _children.length - 1);
+    }
+  }
+
+  /**
+   * Return true if BTreeNode is full.
+   */
+  boolean isFull() {
+    return (_first == 0);
+  }
+
+  /**
+   * Find the object associated with the given key.
+   * 
+   * @param height Height of the current BTreeNode (zero is leaf node)
+   * @param key The key
+   * @return TupleBrowser positionned just before the given key, or before next
+   *         greater key if key isn't found.
+   */
+  BTree.BTreeTupleBrowser<K, V> find(int height, final K key,
+      final boolean inclusive) throws IOException {
+    byte index = findChildren(key, inclusive);
+
+    height -= 1;
+
+    if (height == 0) {
+      // leaf node
+      return new Browser<K, V>(this, index);
+    } else {
+      // non-leaf node
+      BTreeNode<K, V> child = loadNode(_children[index]);
+      return child.find(height, key, inclusive);
+    }
+  }
+
+  /**
+   * Find value associated with the given key.
+   * 
+   * @param height Height of the current BTreeNode (zero is leaf node)
+   * @param key The key
+   * @return TupleBrowser positionned just before the given key, or before next
+   *         greater key if key isn't found.
+   */
+  V findValue(int height, K key) throws IOException {
+    byte index = findChildren(key, true);
+
+    height -= 1;
+
+    if (height == 0) {
+
+      K key2 = _keys[index];
+      // // get returns the matching key or the next ordered key, so we must
+      // // check if we have an exact match
+      if (key2 == null || compare(key, key2) != 0)
+        return null;
+
+      // leaf node
+      if (_values[index] instanceof BTreeLazyRecord)
+        return ((BTreeLazyRecord<V>) _values[index]).get();
+      else
+        return (V) _values[index];
+
+    } else {
+      // non-leaf node
+      BTreeNode<K, V> child = loadNode(_children[index]);
+      return child.findValue(height, key);
+    }
+  }
+
+  /**
+   * Find first entry and return a browser positioned before it.
+   * 
+   * @return TupleBrowser positionned just before the first entry.
+   */
+  BTree.BTreeTupleBrowser<K, V> findFirst() throws IOException {
+    if (_isLeaf) {
+      return new Browser<K, V>(this, _first);
+    } else {
+      BTreeNode<K, V> child = loadNode(_children[_first]);
+      return child.findFirst();
+    }
+  }
+
+  /**
+   * Deletes this BTreeNode and all children nodes from the record manager
+   */
+  void delete() throws IOException {
+    if (_isLeaf) {
+      if (_next != 0) {
+        BTreeNode<K, V> nextNode = loadNode(_next);
+        if (nextNode._previous == _recid) { // this consistency check can be
+                                            // removed in production code
+          nextNode._previous = _previous;
+          _btree._db.update(nextNode._recid, nextNode, nextNode);
+        } else {
+          throw new Error("Inconsistent data in BTree");
+        }
+      }
+      if (_previous != 0) {
+        BTreeNode<K, V> previousNode = loadNode(_previous);
+        if (previousNode._next != _recid) { // this consistency check can be
+                                            // removed in production code
+          previousNode._next = _next;
+          _btree._db.update(previousNode._recid, previousNode, previousNode);
+        } else {
+          throw new Error("Inconsistent data in BTree");
+        }
+      }
+    } else {
+      int left = _first;
+      int right = BTree.DEFAULT_SIZE - 1;
+
+      for (int i = left; i <= right; i++) {
+        BTreeNode<K, V> childNode = loadNode(_children[i]);
+        childNode.delete();
+      }
+    }
+
+    _btree._db.delete(_recid);
+  }
+
+  /**
+   * Insert the given key and value.
+   * <p/>
+   * Since the Btree does not support duplicate entries, the caller must specify
+   * whether to replace the existing value.
+   * 
+   * @param height Height of the current BTreeNode (zero is leaf node)
+   * @param key Insert key
+   * @param value Insert value
+   * @param replace Set to true to replace the existing value, if one exists.
+   * @return Insertion result containing existing value OR a BTreeNode if the
+   *         key was inserted and provoked a BTreeNode overflow.
+   */
+  InsertResult<K, V> insert(int height, K key, final V value,
+      final boolean replace) throws IOException {
+    InsertResult<K, V> result;
+    long overflow;
+
+    final byte index = findChildren(key, true);
+
+    height -= 1;
+    if (height == 0) {
+
+      // reuse InsertResult instance to avoid GC trashing on massive inserts
+      result = _btree.insertResultReuse;
+      _btree.insertResultReuse = null;
+      if (result == null)
+        result = new InsertResult<K, V>();
+
+      // inserting on a leaf BTreeNode
+      overflow = -1;
+      if (DEBUG) {
+        System.out.println("BTreeNode.insert() Insert on leaf node key=" + key
+            + " value=" + value + " index=" + index);
+      }
+      if (compare(_keys[index], key) == 0) {
+        // key already exists
+        if (DEBUG) {
+          System.out.println("BTreeNode.insert() Key already exists.");
+        }
+        boolean isLazyRecord = _values[index] instanceof BTreeLazyRecord;
+        if (isLazyRecord)
+          result._existing = ((BTreeLazyRecord<V>) _values[index]).get();
+        else
+          result._existing = (V) _values[index];
+        if (replace) {
+          // remove old lazy record if necesarry
+          if (isLazyRecord)
+            ((BTreeLazyRecord) _values[index]).delete();
+          _values[index] = value;
+          _btree._db.update(_recid, this, this);
+        }
+        // return the existing key
+        return result;
+      }
+    } else {
+      // non-leaf BTreeNode
+      BTreeNode<K, V> child = loadNode(_children[index]);
+      result = child.insert(height, key, value, replace);
+
+      if (result._existing != null) {
+        // return existing key, if any.
+        return result;
+      }
+
+      if (result._overflow == null) {
+        // no overflow means we're done with insertion
+        return result;
+      }
+
+      // there was an overflow, we need to insert the overflow node on this
+      // BTreeNode
+      if (DEBUG) {
+        System.out.println("BTreeNode.insert() Overflow node: "
+            + result._overflow._recid);
+      }
+      key = result._overflow.getLargestKey();
+      overflow = result._overflow._recid;
+
+      // update child's largest key
+      _keys[index] = child.getLargestKey();
+
+      // clean result so we can reuse it
+      result._overflow = null;
+    }
+
+    // if we get here, we need to insert a new entry on the BTreeNode before
+    // _children[ index ]
+    if (!isFull()) {
+      if (height == 0) {
+        insertEntry(this, index - 1, key, value);
+      } else {
+        insertChild(this, index - 1, key, overflow);
+      }
+      _btree._db.update(_recid, this, this);
+      return result;
+    }
+
+    // node is full, we must divide the node
+    final byte half = BTree.DEFAULT_SIZE >> 1;
+    BTreeNode<K, V> newNode = new BTreeNode<K, V>(_btree, _isLeaf);
+    if (index < half) {
+      // move lower-half of entries to overflow node, including new entry
+      if (DEBUG) {
+        System.out
+            .println("BTreeNode.insert() move lower-half of entries to overflow BTreeNode, including new entry.");
+      }
+      if (height == 0) {
+        copyEntries(this, 0, newNode, half, index);
+        setEntry(newNode, half + index, key, value);
+        copyEntries(this, index, newNode, half + index + 1, half - index - 1);
+      } else {
+        copyChildren(this, 0, newNode, half, index);
+        setChild(newNode, half + index, key, overflow);
+        copyChildren(this, index, newNode, half + index + 1, half - index - 1);
+      }
+    } else {
+      // move lower-half of entries to overflow node, new entry stays on this
+      // node
+      if (DEBUG) {
+        System.out
+            .println("BTreeNode.insert() move lower-half of entries to overflow BTreeNode. New entry stays");
+      }
+      if (height == 0) {
+        copyEntries(this, 0, newNode, half, half);
+        copyEntries(this, half, this, half - 1, index - half);
+        setEntry(this, index - 1, key, value);
+      } else {
+        copyChildren(this, 0, newNode, half, half);
+        copyChildren(this, half, this, half - 1, index - half);
+        setChild(this, index - 1, key, overflow);
+      }
+    }
+
+    _first = half - 1;
+
+    // nullify lower half of entries
+    for (int i = 0; i < _first; i++) {
+      if (height == 0) {
+        setEntry(this, i, null, null);
+      } else {
+        setChild(this, i, null, -1);
+      }
+    }
+
+    if (_isLeaf) {
+      // link newly created node
+      newNode._previous = _previous;
+      newNode._next = _recid;
+      if (_previous != 0) {
+        BTreeNode<K, V> previous = loadNode(_previous);
+        previous._next = newNode._recid;
+        _btree._db.update(_previous, previous, this);
+
+      }
+      _previous = newNode._recid;
+    }
+
+    _btree._db.update(_recid, this, this);
+    _btree._db.update(newNode._recid, newNode, this);
+
+    result._overflow = newNode;
+    return result;
+  }
+
+  /**
+   * Remove the entry associated with the given key.
+   * 
+   * @param height Height of the current BTreeNode (zero is leaf node)
+   * @param key Removal key
+   * @return Remove result object
+   */
+  RemoveResult<K, V> remove(int height, K key) throws IOException {
+    RemoveResult<K, V> result;
+
+    int half = BTree.DEFAULT_SIZE / 2;
+    byte index = findChildren(key, true);
+
+    height -= 1;
+    if (height == 0) {
+      // remove leaf entry
+      if (compare(_keys[index], key) != 0) {
+        throw new IllegalArgumentException("Key not found: " + key);
+      }
+      result = new RemoveResult<K, V>();
+
+      if (_values[index] instanceof BTreeLazyRecord) {
+        BTreeLazyRecord<V> r = (BTreeLazyRecord<V>) _values[index];
+        result._value = r.get();
+        r.delete();
+      } else {
+        result._value = (V) _values[index];
+      }
+      removeEntry(this, index);
+
+      // update this node
+      _btree._db.update(_recid, this, this);
+
+    } else {
+      // recurse into Btree to remove entry on a children node
+      BTreeNode<K, V> child = loadNode(_children[index]);
+      result = child.remove(height, key);
+
+      // update children
+      _keys[index] = child.getLargestKey();
+      _btree._db.update(_recid, this, this);
+
+      if (result._underflow) {
+        // underflow occured
+        if (child._first != half + 1) {
+          throw new IllegalStateException("Error during underflow [1]");
+        }
+        if (index < _children.length - 1) {
+          // exists greater brother node
+          BTreeNode<K, V> brother = loadNode(_children[index + 1]);
+          int bfirst = brother._first;
+          if (bfirst < half) {
+            // steal entries from "brother" node
+            int steal = (half - bfirst + 1) / 2;
+            brother._first += steal;
+            child._first -= steal;
+            if (child._isLeaf) {
+              copyEntries(child, half + 1, child, half + 1 - steal, half - 1);
+              copyEntries(brother, bfirst, child, 2 * half - steal, steal);
+            } else {
+              copyChildren(child, half + 1, child, half + 1 - steal, half - 1);
+              copyChildren(brother, bfirst, child, 2 * half - steal, steal);
+            }
+
+            for (int i = bfirst; i < bfirst + steal; i++) {
+              if (brother._isLeaf) {
+                setEntry(brother, i, null, null);
+              } else {
+                setChild(brother, i, null, -1);
+              }
+            }
+
+            // update child's largest key
+            _keys[index] = child.getLargestKey();
+
+            // no change in previous/next node
+
+            // update nodes
+            _btree._db.update(_recid, this, this);
+            _btree._db.update(brother._recid, brother, this);
+            _btree._db.update(child._recid, child, this);
+
+          } else {
+            // move all entries from node "child" to "brother"
+            if (brother._first != half) {
+              throw new IllegalStateException("Error during underflow [2]");
+            }
+
+            brother._first = 1;
+            if (child._isLeaf) {
+              copyEntries(child, half + 1, brother, 1, half - 1);
+            } else {
+              copyChildren(child, half + 1, brother, 1, half - 1);
+            }
+            _btree._db.update(brother._recid, brother, this);
+
+            // remove "child" from current node
+            if (_isLeaf) {
+              copyEntries(this, _first, this, _first + 1, index - _first);
+              setEntry(this, _first, null, null);
+            } else {
+              copyChildren(this, _first, this, _first + 1, index - _first);
+              setChild(this, _first, null, -1);
+            }
+            _first += 1;
+            _btree._db.update(_recid, this, this);
+
+            // re-link previous and next nodes
+            if (child._previous != 0) {
+              BTreeNode<K, V> prev = loadNode(child._previous);
+              prev._next = child._next;
+              _btree._db.update(prev._recid, prev, this);
+            }
+            if (child._next != 0) {
+              BTreeNode<K, V> next = loadNode(child._next);
+              next._previous = child._previous;
+              _btree._db.update(next._recid, next, this);
+
+            }
+
+            // delete "child" node
+            _btree._db.delete(child._recid);
+          }
+        } else {
+          // node "brother" is before "child"
+          BTreeNode<K, V> brother = loadNode(_children[index - 1]);
+          int bfirst = brother._first;
+          if (bfirst < half) {
+            // steal entries from "brother" node
+            int steal = (half - bfirst + 1) / 2;
+            brother._first += steal;
+            child._first -= steal;
+            if (child._isLeaf) {
+              copyEntries(brother, 2 * half - steal, child, half + 1 - steal,
+                  steal);
+              copyEntries(brother, bfirst, brother, bfirst + steal, 2 * half
+                  - bfirst - steal);
+            } else {
+              copyChildren(brother, 2 * half - steal, child, half + 1 - steal,
+                  steal);
+              copyChildren(brother, bfirst, brother, bfirst + steal, 2 * half
+                  - bfirst - steal);
+            }
+
+            for (int i = bfirst; i < bfirst + steal; i++) {
+              if (brother._isLeaf) {
+                setEntry(brother, i, null, null);
+              } else {
+                setChild(brother, i, null, -1);
+              }
+            }
+
+            // update brother's largest key
+            _keys[index - 1] = brother.getLargestKey();
+
+            // no change in previous/next node
+
+            // update nodes
+            _btree._db.update(_recid, this, this);
+            _btree._db.update(brother._recid, brother, this);
+            _btree._db.update(child._recid, child, this);
+
+          } else {
+            // move all entries from node "brother" to "child"
+            if (brother._first != half) {
+              throw new IllegalStateException("Error during underflow [3]");
+            }
+
+            child._first = 1;
+            if (child._isLeaf) {
+              copyEntries(brother, half, child, 1, half);
+            } else {
+              copyChildren(brother, half, child, 1, half);
+            }
+            _btree._db.update(child._recid, child, this);
+
+            // remove "brother" from current node
+            if (_isLeaf) {
+              copyEntries(this, _first, this, _first + 1, index - 1 - _first);
+              setEntry(this, _first, null, null);
+            } else {
+              copyChildren(this, _first, this, _first + 1, index - 1 - _first);
+              setChild(this, _first, null, -1);
+            }
+            _first += 1;
+            _btree._db.update(_recid, this, this);
+
+            // re-link previous and next nodes
+            if (brother._previous != 0) {
+              BTreeNode<K, V> prev = loadNode(brother._previous);
+              prev._next = brother._next;
+              _btree._db.update(prev._recid, prev, this);
+            }
+            if (brother._next != 0) {
+              BTreeNode<K, V> next = loadNode(brother._next);
+              next._previous = brother._previous;
+              _btree._db.update(next._recid, next, this);
+            }
+
+            // delete "brother" node
+            _btree._db.delete(brother._recid);
+          }
+        }
+      }
+    }
+
+    // underflow if node is more than half-empty
+    result._underflow = _first > half;
+
+    return result;
+  }
+
+  /**
+   * Find the first children node with a key equal or greater than the given
+   * key.
+   * 
+   * @return index of first children with equal or greater key.
+   */
+  private byte findChildren(final K key, final boolean inclusive) {
+    int left = _first;
+    int right = BTree.DEFAULT_SIZE - 1;
+    int middle;
+    final int D = inclusive ? 0 : 1;
+
+    // binary search
+    while (true) {
+      middle = (left + right) / 2;
+      if (compare(_keys[middle], key) < D) {
+        left = middle + 1;
+      } else {
+        right = middle;
+      }
+      if (left >= right) {
+        return (byte) right;
+      }
+    }
+  }
+
+  /**
+   * Insert entry at given position.
+   */
+  private static <K, V> void insertEntry(BTreeNode<K, V> node, int index,
+      K key, V value) {
+    K[] keys = node._keys;
+    Object[] values = node._values;
+    int start = node._first;
+    int count = index - node._first + 1;
+
+    // shift entries to the left
+    System.arraycopy(keys, start, keys, start - 1, count);
+    System.arraycopy(values, start, values, start - 1, count);
+    node._first -= 1;
+    keys[index] = key;
+    values[index] = value;
+  }
+
+  /**
+   * Insert child at given position.
+   */
+  private static <K, V> void insertChild(BTreeNode<K, V> node, int index,
+      K key, long child) {
+    K[] keys = node._keys;
+    long[] children = node._children;
+    int start = node._first;
+    int count = index - node._first + 1;
+
+    // shift entries to the left
+    System.arraycopy(keys, start, keys, start - 1, count);
+    System.arraycopy(children, start, children, start - 1, count);
+    node._first -= 1;
+    keys[index] = key;
+    children[index] = child;
+  }
+
+  /**
+   * Remove entry at given position.
+   */
+  private static <K, V> void removeEntry(BTreeNode<K, V> node, int index) {
+    K[] keys = node._keys;
+    Object[] values = node._values;
+    int start = node._first;
+    int count = index - node._first;
+
+    System.arraycopy(keys, start, keys, start + 1, count);
+    keys[start] = null;
+    System.arraycopy(values, start, values, start + 1, count);
+    values[start] = null;
+    node._first++;
+  }
+
+  /**
+   * Set the entry at the given index.
+   */
+  private static <K, V> void setEntry(BTreeNode<K, V> node, int index, K key,
+      V value) {
+    node._keys[index] = key;
+    node._values[index] = value;
+  }
+
+  /**
+   * Set the child BTreeNode recid at the given index.
+   */
+  private static <K, V> void setChild(BTreeNode<K, V> node, int index, K key,
+      long recid) {
+    node._keys[index] = key;
+    node._children[index] = recid;
+  }
+
+  /**
+   * Copy entries between two nodes
+   */
+  private static <K, V> void copyEntries(BTreeNode<K, V> source,
+      int indexSource, BTreeNode<K, V> dest, int indexDest, int count) {
+    System.arraycopy(source._keys, indexSource, dest._keys, indexDest, count);
+    System.arraycopy(source._values, indexSource, dest._values, indexDest,
+        count);
+  }
+
+  /**
+   * Copy child node recids between two nodes
+   */
+  private static <K, V> void copyChildren(BTreeNode<K, V> source,
+      int indexSource, BTreeNode<K, V> dest, int indexDest, int count) {
+    System.arraycopy(source._keys, indexSource, dest._keys, indexDest, count);
+    System.arraycopy(source._children, indexSource, dest._children, indexDest,
+        count);
+  }
+
+  /**
+   * Load the node at the given recid.
+   */
+  private BTreeNode<K, V> loadNode(long recid) throws IOException {
+    BTreeNode<K, V> child = _btree._db.fetch(recid, this);
+    child._recid = recid;
+    child._btree = _btree;
+    return child;
+  }
+
+  private final int compare(final K value1, final K value2) {
+    if (value1 == null) {
+      return 1;
+    }
+    if (value2 == null) {
+      return -1;
+    }
+
+    if (_btree._comparator == null) {
+      return ((Comparable) value1).compareTo(value2);
+    } else {
+      return _btree._comparator.compare(value1, value2);
+    }
+
+  }
+
+  /**
+   * Dump the structure of the tree on the screen. This is used for debugging
+   * purposes only.
+   */
+  private void dump(int height) {
+    String prefix = "";
+    for (int i = 0; i < height; i++) {
+      prefix += "    ";
+    }
+    System.out.println(prefix
+        + "-------------------------------------- BTreeNode recid=" + _recid);
+    System.out.println(prefix + "first=" + _first);
+    for (int i = 0; i < BTree.DEFAULT_SIZE; i++) {
+      if (_isLeaf) {
+        System.out.println(prefix + "BTreeNode [" + i + "] " + _keys[i] + " "
+            + _values[i]);
+      } else {
+        System.out.println(prefix + "BTreeNode [" + i + "] " + _keys[i] + " "
+            + _children[i]);
+      }
+    }
+    System.out.println(prefix + "--------------------------------------");
+  }
+
+  /**
+   * Recursively dump the state of the BTree on screen. This is used for
+   * debugging purposes only.
+   */
+  void dumpRecursive(int height, int level) throws IOException {
+    height -= 1;
+    level += 1;
+    if (height > 0) {
+      for (byte i = _first; i < BTree.DEFAULT_SIZE; i++) {
+        if (_keys[i] == null)
+          break;
+        BTreeNode<K, V> child = loadNode(_children[i]);
+        child.dump(level);
+        child.dumpRecursive(height, level);
+      }
+    }
+  }
+
+  /**
+   * Deserialize the content of an object from a byte array.
+   */
+  @SuppressWarnings("unchecked")
+  public BTreeNode<K, V> deserialize(DataInput ois2) throws IOException {
+    DataInputOutput ois = (DataInputOutput) ois2;
+
+    BTreeNode<K, V> node = new BTreeNode<K, V>();
+
+    switch (ois.readUnsignedByte()) {
+      case SerializationHeader.BTREE_NODE_LEAF:
+        node._isLeaf = true;
+        break;
+      case SerializationHeader.BTREE_NODE_NONLEAF:
+        node._isLeaf = false;
+        break;
+      default:
+        throw new InternalError("wrong BTreeNode header");
+    }
+
+    if (node._isLeaf) {
+      node._previous = LongPacker.unpackLong(ois);
+      node._next = LongPacker.unpackLong(ois);
+    }
+
+    node._first = ois.readByte();
+
+    if (!node._isLeaf) {
+      node._children = new long[BTree.DEFAULT_SIZE];
+      for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) {
+        node._children[i] = LongPacker.unpackLong(ois);
+      }
+    }
+
+    if (!_btree.loadValues)
+      return node;
+
+    try {
+
+      node._keys = readKeys(ois, node._first);
+
+    } catch (ClassNotFoundException except) {
+      throw new IOException(except.getMessage());
+    }
+
+    if (node._isLeaf) {
+
+      try {
+        readValues(ois, node);
+      } catch (ClassNotFoundException except) {
+        throw new IOException(except);
+      }
+    }
+
+    return node;
+
+  }
+
+  /**
+   * Serialize the content of an object into a byte array.
+   * 
+   * @param obj Object to serialize
+   * @return a byte array representing the object's state
+   */
+  public void serialize(DataOutput oos, BTreeNode<K, V> obj) throws IOException {
+
+    // note: It is assumed that BTreeNode instance doing the serialization is
+    // the parent
+    // of the BTreeNode object being serialized.
+
+    BTreeNode<K, V> node = obj;
+
+    oos.writeByte(node._isLeaf ? SerializationHeader.BTREE_NODE_LEAF
+        : SerializationHeader.BTREE_NODE_NONLEAF);
+    if (node._isLeaf) {
+      LongPacker.packLong(oos, node._previous);
+      LongPacker.packLong(oos, node._next);
+    }
+
+    oos.write(node._first);
+
+    if (!node._isLeaf) {
+      for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) {
+        LongPacker.packLong(oos, node._children[i]);
+      }
+    }
+
+    writeKeys(oos, node._keys, node._first);
+
+    if (node._isLeaf && _btree.hasValues()) {
+      writeValues(oos, node);
+    }
+  }
+
+  private void readValues(DataInputOutput ois, BTreeNode<K, V> node)
+      throws IOException, ClassNotFoundException {
+    node._values = new Object[BTree.DEFAULT_SIZE];
+    if (_btree.hasValues()) {
+      Serializer<V> serializer = _btree.valueSerializer != null ? _btree.valueSerializer
+          : (Serializer<V>) _btree.getRecordManager().defaultSerializer();
+      for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) {
+        int header = ois.readUnsignedByte();
+        if (header == BTreeLazyRecord.NULL) {
+          node._values[i] = null;
+        } else if (header == BTreeLazyRecord.LAZY_RECORD) {
+          long recid = LongPacker.unpackLong(ois);
+          node._values[i] = new BTreeLazyRecord(_btree._db, recid, serializer);
+        } else {
+          node._values[i] = BTreeLazyRecord.fastDeser(ois, serializer, header);
+        }
+      }
+    } else {
+      // create fake values
+      for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) {
+        if (node._keys[i] != null)
+          node._values[i] = JDBMUtils.EMPTY_STRING;
+      }
+    }
+  }
+
+  private void writeValues(DataOutput oos, BTreeNode<K, V> node)
+      throws IOException {
+
+    DataInputOutput output = null;
+    Serializer serializer = _btree.valueSerializer != null ? _btree.valueSerializer
+        : _btree.getRecordManager().defaultSerializer();
+    for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) {
+      if (node._values[i] instanceof BTreeLazyRecord) {
+        oos.write(BTreeLazyRecord.LAZY_RECORD);
+        LongPacker.packLong(oos, ((BTreeLazyRecord) node._values[i]).recid);
+      } else if (node._values[i] != null) {
+        if (output == null) {
+          output = new DataInputOutput();
+        } else {
+          output.reset();
+        }
+
+        serializer.serialize(output, node._values[i]);
+
+        if (output.getPos() > BTreeLazyRecord.MAX_INTREE_RECORD_SIZE) {
+          // write as separate record
+          long recid = _btree._db.insert(output.toByteArray(),
+              BTreeLazyRecord.FAKE_SERIALIZER, true);
+          oos.write(BTreeLazyRecord.LAZY_RECORD);
+          LongPacker.packLong(oos, recid);
+        } else {
+          // write as part of btree
+          oos.write(output.getPos());
+          oos.write(output.getBuf(), 0, output.getPos());
+        }
+      } else {
+        oos.write(BTreeLazyRecord.NULL);
+      }
+    }
+  }
+
+  private static final int ALL_NULL = 0;
+  private static final int ALL_INTEGERS = 1 << 5;
+  private static final int ALL_INTEGERS_NEGATIVE = 2 << 5;
+  private static final int ALL_LONGS = 3 << 5;
+  private static final int ALL_LONGS_NEGATIVE = 4 << 5;
+  private static final int ALL_STRINGS = 5 << 5;
+  private static final int ALL_OTHER = 6 << 5;
+
+  private K[] readKeys(DataInput ois, final int firstUse) throws IOException,
+      ClassNotFoundException {
+    Object[] ret = new Object[BTree.DEFAULT_SIZE];
+    final int type = ois.readUnsignedByte();
+    if (type == ALL_NULL) {
+      return (K[]) ret;
+    } else if (type == ALL_INTEGERS || type == ALL_INTEGERS_NEGATIVE) {
+      long first = LongPacker.unpackLong(ois);
+      if (type == ALL_INTEGERS_NEGATIVE)
+        first = -first;
+      ret[firstUse] = Integer.valueOf((int) first);
+      for (int i = firstUse + 1; i < BTree.DEFAULT_SIZE; i++) {
+        // ret[i] = Serialization.readObject(ois);
+        long v = LongPacker.unpackLong(ois);
+        if (v == 0)
+          continue; // null
+        v = v + first;
+        ret[i] = Integer.valueOf((int) v);
+        first = v;
+      }
+      return (K[]) ret;
+    } else if (type == ALL_LONGS || type == ALL_LONGS_NEGATIVE) {
+      long first = LongPacker.unpackLong(ois);
+      if (type == ALL_LONGS_NEGATIVE)
+        first = -first;
+
+      ret[firstUse] = Long.valueOf(first);
+      for (int i = firstUse + 1; i < BTree.DEFAULT_SIZE; i++) {
+        // ret[i] = Serialization.readObject(ois);
+        long v = LongPacker.unpackLong(ois);
+        if (v == 0)
+          continue; // null
+        v = v + first;
+        ret[i] = Long.valueOf(v);
+        first = v;
+      }
+      return (K[]) ret;
+    } else if (type == ALL_STRINGS) {
+      byte[] previous = null;
+      for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+        byte[] b = leadingValuePackRead(ois, previous, 0);
+        if (b == null)
+          continue;
+        ret[i] = new String(b, Serialization.UTF8);
+        previous = b;
+      }
+      return (K[]) ret;
+
+    } else if (type == ALL_OTHER) {
+
+      // TODO why this block is here?
+      if (_btree.keySerializer == null
+          || _btree.keySerializer == _btree.getRecordManager()
+              .defaultSerializer()) {
+        for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+          ret[i] = _btree.getRecordManager().defaultSerializer()
+              .deserialize(ois);
+        }
+        return (K[]) ret;
+      }
+
+      Serializer ser = _btree.keySerializer != null ? _btree.keySerializer
+          : _btree.getRecordManager().defaultSerializer();
+      DataInputOutput in2 = null;
+      byte[] previous = null;
+      for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+        byte[] b = leadingValuePackRead(ois, previous, 0);
+        if (b == null)
+          continue;
+        if (in2 == null) {
+          in2 = new DataInputOutput();
+        }
+        in2.reset(b);
+        ret[i] = ser.deserialize(in2);
+        previous = b;
+      }
+      return (K[]) ret;
+
+    } else {
+      throw new InternalError("unknown BTreeNode header type: " + type);
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  private void writeKeys(DataOutput oos, K[] keys, final int firstUse)
+      throws IOException {
+    if (keys.length != BTree.DEFAULT_SIZE)
+      throw new IllegalArgumentException("wrong keys size");
+
+    // check if all items on key are null
+    boolean allNull = true;
+    for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+      if (keys[i] != null) {
+        allNull = false;
+        break;
+      }
+    }
+    if (allNull) {
+      oos.write(ALL_NULL);
+      return;
+    }
+
+    /**
+     * Special compression to compress Long and Integer
+     */
+    if ((_btree._comparator == JDBMUtils.COMPARABLE_COMPARATOR || _btree._comparator == null)
+        && (_btree.keySerializer == null || _btree.keySerializer == _btree
+            .getRecordManager().defaultSerializer())) {
+      boolean allInteger = true;
+      for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+        if (keys[i] != null && keys[i].getClass() != Integer.class) {
+          allInteger = false;
+          break;
+        }
+      }
+      boolean allLong = true;
+      for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+        if (keys[i] != null && (keys[i].getClass() != Long.class ||
+        // special case to exclude Long.MIN_VALUE from conversion, causes
+        // problems to LongPacker
+            ((Long) keys[i]).longValue() == Long.MIN_VALUE)) {
+          allLong = false;
+          break;
+        }
+      }
+
+      if (allLong) {
+        // check that diff between MIN and MAX fits into PACKED_LONG
+        long max = Long.MIN_VALUE;
+        long min = Long.MAX_VALUE;
+        for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+          if (keys[i] == null)
+            continue;
+          long v = (Long) keys[i];
+          if (v > max)
+            max = v;
+          if (v < min)
+            min = v;
+        }
+        // now convert to Double to prevent overflow errors
+        double max2 = max;
+        double min2 = min;
+        double maxDiff = Long.MAX_VALUE;
+        if (max2 - min2 > maxDiff / 2) // divide by two just to by sure
+          allLong = false;
+
+      }
+
+      if (allLong && allInteger)
+        throw new InternalError();
+
+      if (allLong || allInteger) {
+        long first = ((Number) keys[firstUse]).longValue();
+        // write header
+        if (allInteger) {
+          if (first > 0)
+            oos.write(ALL_INTEGERS);
+          else
+            oos.write(ALL_INTEGERS_NEGATIVE);
+        } else if (allLong) {
+          if (first > 0)
+            oos.write(ALL_LONGS);
+          else
+            oos.write(ALL_LONGS_NEGATIVE);
+        } else {
+          throw new InternalError();
+        }
+
+        // write first
+        LongPacker.packLong(oos, Math.abs(first));
+        // write others
+        for (int i = firstUse + 1; i < BTree.DEFAULT_SIZE; i++) {
+          // Serialization.writeObject(oos, keys[i]);
+          if (keys[i] == null)
+            LongPacker.packLong(oos, 0);
+          else {
+            long v = ((Number) keys[i]).longValue();
+            if (v <= first)
+              throw new InternalError("not ordered");
+            LongPacker.packLong(oos, v - first);
+            first = v;
+          }
+        }
+        return;
+      } else {
+        // another special case for Strings
+        boolean allString = true;
+        for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+          if (keys[i] != null && (keys[i].getClass() != String.class)) {
+            allString = false;
+            break;
+          }
+        }
+        if (allString) {
+          oos.write(ALL_STRINGS);
+          byte[] previous = null;
+          for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+            if (keys[i] == null) {
+              leadingValuePackWrite(oos, null, previous, 0);
+            } else {
+              byte[] b = ((String) keys[i]).getBytes(Serialization.UTF8);
+              leadingValuePackWrite(oos, b, previous, 0);
+              previous = b;
+            }
+          }
+          return;
+        }
+      }
+    }
+
+    /**
+     * other case, serializer is provided or other stuff
+     */
+    oos.write(ALL_OTHER);
+    if (_btree.keySerializer == null
+        || _btree.keySerializer == _btree.getRecordManager()
+            .defaultSerializer()) {
+      for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+        _btree.getRecordManager().defaultSerializer().serialize(oos, keys[i]);
+      }
+      return;
+    }
+
+    // custom serializer is provided, use it
+
+    Serializer ser = _btree.keySerializer;
+    byte[] previous = null;
+
+    DataInputOutput out3 = new DataInputOutput();
+    for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) {
+      if (keys[i] == null) {
+        leadingValuePackWrite(oos, null, previous, 0);
+      } else {
+        out3.reset();
+        ser.serialize(out3, keys[i]);
+        byte[] b = out3.toByteArray();
+        leadingValuePackWrite(oos, b, previous, 0);
+        previous = b;
+      }
+    }
+
+  }
+
+  public void defrag(DBStore r1, DBStore r2) throws IOException {
+    if (_children != null)
+      for (long child : _children) {
+        if (child == 0)
+          continue;
+        byte[] data = r1.fetchRaw(child);
+        r2.forceInsert(child, data);
+        BTreeNode t = deserialize(new DataInputOutput(data));
+        t._btree = _btree;
+        t.defrag(r1, r2);
+      }
+  }
+
+  /**
+   * STATIC INNER CLASS Result from insert() method call
+   */
+  static final class InsertResult<K, V> {
+
+    /**
+     * Overflow node.
+     */
+    BTreeNode<K, V> _overflow;
+
+    /**
+     * Existing value for the insertion key.
+     */
+    V _existing;
+
+  }
+
+  /**
+   * STATIC INNER CLASS Result from remove() method call
+   */
+  static final class RemoveResult<K, V> {
+
+    /**
+     * Set to true if underlying nodes underflowed
+     */
+    boolean _underflow;
+
+    /**
+     * Removed entry value
+     */
+    V _value;
+  }
+
+  /**
+   * PRIVATE INNER CLASS Browser to traverse leaf nodes.
+   */
+  static final class Browser<K, V> implements BTree.BTreeTupleBrowser<K, V> {
+
+    /**
+     * Current node.
+     */
+    private BTreeNode<K, V> _node;
+
+    /**
+     * Current index in the node. The index positionned on the next tuple to
+     * return.
+     */
+    private byte _index;
+
+    private int expectedModCount;
+
+    /**
+     * Create a browser.
+     * 
+     * @param node Current node
+     * @param index Position of the next tuple to return.
+     */
+    Browser(BTreeNode<K, V> node, byte index) {
+      _node = node;
+      _index = index;
+      expectedModCount = node._btree.modCount;
+    }
+
+    public boolean getNext(BTree.BTreeTuple<K, V> tuple) throws IOException {
+      if (expectedModCount != _node._btree.modCount)
+        throw new ConcurrentModificationException();
+      if (_node == null) {
+        // last record in iterator was deleted, so iterator is at end of node
+        return false;
+      }
+
+      if (_index < BTree.DEFAULT_SIZE) {
+        if (_node._keys[_index] == null) {
+          // reached end of the tree.
+          return false;
+        }
+      } else if (_node._next != 0) {
+        // move to next node
+        _node = _node.loadNode(_node._next);
+        _index = _node._first;
+      }
+      tuple.key = _node._keys[_index];
+      if (_node._values[_index] instanceof BTreeLazyRecord)
+        tuple.value = ((BTreeLazyRecord<V>) _node._values[_index]).get();
+      else
+        tuple.value = (V) _node._values[_index];
+      _index++;
+      return true;
+    }
+
+    public boolean getPrevious(BTree.BTreeTuple<K, V> tuple) throws IOException {
+      if (expectedModCount != _node._btree.modCount)
+        throw new ConcurrentModificationException();
+
+      if (_node == null) {
+        // deleted last record, but this situation is only supportedd on getNext
+        throw new InternalError();
+      }
+
+      if (_index == _node._first) {
+
+        if (_node._previous != 0) {
+          _node = _node.loadNode(_node._previous);
+          _index = BTree.DEFAULT_SIZE;
+        } else {
+          // reached beginning of the tree
+          return false;
+        }
+      }
+      _index--;
+      tuple.key = _node._keys[_index];
+      if (_node._values[_index] instanceof BTreeLazyRecord)
+        tuple.value = ((BTreeLazyRecord<V>) _node._values[_index]).get();
+      else
+        tuple.value = (V) _node._values[_index];
+
+      return true;
+
+    }
+
+    public void remove(K key) throws IOException {
+      if (expectedModCount != _node._btree.modCount)
+        throw new ConcurrentModificationException();
+
+      _node._btree.remove(key);
+      expectedModCount++;
+
+      // An entry was removed and this may trigger tree rebalance,
+      // This would change current node layout, so find our position again
+      BTree.BTreeTupleBrowser b = _node._btree.browse(key, true);
+      // browser is positioned just before value which was currently deleted, so
+      // find if we have new value
+      if (b.getNext(new BTree.BTreeTuple(null, null))) {
+        // next value value exists, copy its state
+        Browser b2 = (Browser) b;
+        this._node = b2._node;
+        this._index = b2._index;
+      } else {
+        this._node = null;
+        this._index = -1;
+      }
+
+    }
+  }
+
+  /**
+   * Used for debugging and testing only. Recursively obtains the recids of all
+   * child BTreeNodes and adds them to the 'out' list.
+   * 
+   * @param out
+   * @param height
+   * @throws IOException
+   */
+  void dumpChildNodeRecIDs(List out, int height) throws IOException {
+    height -= 1;
+    if (height > 0) {
+      for (byte i = _first; i < BTree.DEFAULT_SIZE; i++) {
+        if (_children[i] == 0)
+          continue;
+
+        BTreeNode child = loadNode(_children[i]);
+        out.add(new Long(child._recid));
+        child.dumpChildNodeRecIDs(out, height);
+      }
+    }
+  }
+
+  /**
+   * Read previously written data
+   * 
+   * @author Kevin Day
+   */
+  static byte[] leadingValuePackRead(DataInput in, byte[] previous,
+      int ignoreLeadingCount) throws IOException {
+    int len = LongPacker.unpackInt(in) - 1; // 0 indicates null
+    if (len == -1)
+      return null;
+
+    int actualCommon = LongPacker.unpackInt(in);
+
+    byte[] buf = new byte[len];
+
+    if (previous == null) {
+      actualCommon = 0;
+    }
+
+    if (actualCommon > 0) {
+      in.readFully(buf, 0, ignoreLeadingCount);
+      System.arraycopy(previous, ignoreLeadingCount, buf, ignoreLeadingCount,
+          actualCommon - ignoreLeadingCount);
+    }
+    in.readFully(buf, actualCommon, len - actualCommon);
+    return buf;
+  }
+
+  /**
+   * This method is used for delta compression for keys. Writes the contents of
+   * buf to the DataOutput out, with special encoding if there are common
+   * leading bytes in the previous group stored by this compressor.
+   * 
+   * @author Kevin Day
+   */
+  static void leadingValuePackWrite(DataOutput out, byte[] buf,
+      byte[] previous, int ignoreLeadingCount) throws IOException {
+    if (buf == null) {
+      LongPacker.packInt(out, 0);
+      return;
+    }
+
+    int actualCommon = ignoreLeadingCount;
+
+    if (previous != null) {
+      int maxCommon = buf.length > previous.length ? previous.length
+          : buf.length;
+
+      if (maxCommon > Short.MAX_VALUE)
+        maxCommon = Short.MAX_VALUE;
+
+      for (; actualCommon < maxCommon; actualCommon++) {
+        if (buf[actualCommon] != previous[actualCommon])
+          break;
+      }
+    }
+
+    // there are enough common bytes to justify compression
+    LongPacker.packInt(out, buf.length + 1);// store as +1, 0 indicates null
+    LongPacker.packInt(out, actualCommon);
+    out.write(buf, 0, ignoreLeadingCount);
+    out.write(buf, actualCommon, buf.length - actualCommon);
+
+  }
+
+  BTreeNode<K, V> loadLastChildNode() throws IOException {
+    return loadNode(_children[BTree.DEFAULT_SIZE - 1]);
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.util.AbstractSet;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.SortedSet;
+
+/**
+ * Wrapper class for <code>>SortedMap</code> to implement
+ * <code>>NavigableSet</code>
+ * <p/>
+ * This code originally comes from Apache Harmony, was adapted by Jan Kotek for
+ * JDBM
+ */
+public final class BTreeSet<E> extends AbstractSet<E> implements
+    NavigableSet<E> {
+
+  /**
+   * use keyset from this map
+   */
+  final BTreeMap<E, Object> map;
+
+  BTreeSet(BTreeMap<E, Object> map) {
+    this.map = map;
+  }
+
+  @Override
+  public boolean add(E object) {
+    return map.put(object, JDBMUtils.EMPTY_STRING) == null;
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends E> collection) {
+    return super.addAll(collection);
+  }
+
+  @Override
+  public void clear() {
+    map.clear();
+  }
+
+  @Override
+  public Comparator<? super E> comparator() {
+    return map.comparator();
+  }
+
+  @Override
+  public boolean contains(Object object) {
+    return map.containsKey(object);
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return map.isEmpty();
+  }
+
+  @Override
+  public E lower(E e) {
+    return map.lowerKey(e);
+  }
+
+  @Override
+  public E floor(E e) {
+    return map.floorKey(e);
+  }
+
+  @Override
+  public E ceiling(E e) {
+    return map.ceilingKey(e);
+  }
+
+  @Override
+  public E higher(E e) {
+    return map.higherKey(e);
+  }
+
+  @Override
+  public E pollFirst() {
+    Map.Entry<E, Object> e = map.pollFirstEntry();
+    return e != null ? e.getKey() : null;
+  }
+
+  @Override
+  public E pollLast() {
+    Map.Entry<E, Object> e = map.pollLastEntry();
+    return e != null ? e.getKey() : null;
+  }
+
+  @Override
+  public Iterator<E> iterator() {
+    final Iterator<Map.Entry<E, Object>> iter = map.entrySet().iterator();
+    return new Iterator<E>() {
+      @Override
+      public boolean hasNext() {
+        return iter.hasNext();
+      }
+
+      @Override
+      public E next() {
+        Map.Entry<E, Object> e = iter.next();
+        return e != null ? e.getKey() : null;
+      }
+
+      @Override
+      public void remove() {
+        iter.remove();
+      }
+    };
+  }
+
+  @Override
+  public NavigableSet<E> descendingSet() {
+    return map.descendingKeySet();
+  }
+
+  @Override
+  public Iterator<E> descendingIterator() {
+    return map.descendingKeySet().iterator();
+  }
+
+  @Override
+  public NavigableSet<E> subSet(E fromElement, boolean fromInclusive,
+      E toElement, boolean toInclusive) {
+    return map.subMap(fromElement, fromInclusive, toElement, toInclusive)
+        .navigableKeySet();
+  }
+
+  @Override
+  public NavigableSet<E> headSet(E toElement, boolean inclusive) {
+    return map.headMap(toElement, inclusive).navigableKeySet();
+  }
+
+  @Override
+  public NavigableSet<E> tailSet(E fromElement, boolean inclusive) {
+    return map.tailMap(fromElement, inclusive).navigableKeySet();
+  }
+
+  @Override
+  public boolean remove(Object object) {
+    return map.remove(object) != null;
+  }
+
+  @Override
+  public int size() {
+    return map.size();
+  }
+
+  @Override
+  public E first() {
+    return map.firstKey();
+  }
+
+  @Override
+  public E last() {
+    return map.lastKey();
+  }
+
+  @Override
+  public SortedSet<E> subSet(E start, E end) {
+    Comparator<? super E> c = map.comparator();
+    int compare = (c == null) ? ((Comparable<E>) start).compareTo(end) : c
+        .compare(start, end);
+    if (compare <= 0) {
+      return new BTreeSet<E>((BTreeMap<E, Object>) map.subMap(start, true, end,
+          false));
+    }
+    throw new IllegalArgumentException();
+  }
+
+  @Override
+  public SortedSet<E> headSet(E end) {
+    // Check for errors
+    Comparator<? super E> c = map.comparator();
+    if (c == null) {
+      ((Comparable<E>) end).compareTo(end);
+    } else {
+      c.compare(end, end);
+    }
+    return new BTreeSet<E>((BTreeMap<E, Object>) map.headMap(end, false));
+  }
+
+  @Override
+  public SortedSet<E> tailSet(E start) {
+    // Check for errors
+    Comparator<? super E> c = map.comparator();
+    if (c == null) {
+      ((Comparable<E>) start).compareTo(start);
+    } else {
+      c.compare(start, start);
+    }
+    return new BTreeSet<E>((BTreeMap<E, Object>) map.tailMap(start, true));
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+
+/**
+ * Database is root class for creating and loading persistent collections. It
+ * also contains transaction operations.
+ * <p/>
+ */
+public interface DB {
+
+  /**
+   * Closes the DB and release resources. DB can not be used after it was closed
+   */
+  void close();
+
+  /** @return true if db was already closed */
+  boolean isClosed();
+
+  /**
+   * Clear cache and remove all entries it contains. This may be useful for some
+   * Garbage Collection when reference cache is used.
+   */
+  void clearCache();
+
+  /**
+   * Defragments storage so it consumes less space. It basically copyes all
+   * records into different store and then renames it, replacing original store.
+   * <p/>
+   * Defrag has two steps: In first collections are rearranged, so records in
+   * collection are close to each other, and read speed is improved. In second
+   * step all records are sequentially transferred, reclaiming all unused space.
+   * First step is optinal and may slow down defragmentation significantly as ut
+   * requires many random-access reads. Second step reads and writes data
+   * sequentially and is very fast, comparable to copying files to new location.
+   * 
+   * <p/>
+   * This commits any uncommited data. Defrag also requires free space, as store
+   * is basically recreated at new location.
+   * 
+   * @param sortCollections if collection records should be rearranged during
+   *          defragment, this takes some extra time
+   */
+  void defrag(boolean sortCollections);
+
+  /**
+   * Commit (make persistent) all changes since beginning of transaction. JDBM
+   * supports only single transaction.
+   */
+  void commit();
+
+  /**
+   * Rollback (cancel) all changes since beginning of transaction. JDBM supports
+   * only single transaction. This operations affects all maps created or loaded
+   * by this DB.
+   */
+  void rollback();
+
+  /**
+   * This calculates some database statistics such as collection sizes and
+   * record distributions. Can be useful for performance optimalisations and
+   * trouble shuting. This method can run for very long time.
+   * 
+   * @return statistics contained in string
+   */
+  String calculateStatistics();
+
+  /**
+   * Copy database content into ZIP file
+   * 
+   * @param zipFile
+   */
+  void copyToZip(String zipFile);
+
+  /**
+   * Get a <code>Map</code> which was already created and saved in DB. This map
+   * uses disk based H*Tree and should have similar performance as
+   * <code>HashMap</code>.
+   * 
+   * @param name of hash map
+   * 
+   * @return map
+   */
+  <K, V> Map<K, V> getHashMap(String name);
+
+  /**
+   * Creates Map which persists data into DB.
+   * 
+   * @param name record name
+   * @return
+   */
+  <K, V> Map<K, V> createHashMap(String name);
+
+  /**
+   * Creates Hash Map which persists data into DB. Map will use custom
+   * serializers for Keys and Values. Leave keySerializer null to use default
+   * serializer for keys
+   * 
+   * @param <K> Key type
+   * @param <V> Value type
+   * @param name record name
+   * @param keySerializer serializer to be used for Keys, leave null to use
+   *          default serializer
+   * @param valueSerializer serializer to be used for Values
+   * @return
+   */
+  <K, V> Map<K, V> createHashMap(String name, Serializer<K> keySerializer,
+      Serializer<V> valueSerializer);
+
+  <K> Set<K> createHashSet(String name);
+
+  <K> Set<K> getHashSet(String name);
+
+  <K> Set<K> createHashSet(String name, Serializer<K> keySerializer);
+
+  <K, V> NavigableMap<K, V> getTreeMap(String name);
+
+  /**
+   * Create TreeMap which persists data into DB.
+   * 
+   * @param <K> Key type
+   * @param <V> Value type
+   * @param name record name
+   * @return
+   */
+  <K extends Comparable<K>, V> NavigableMap<K, V> createTreeMap(String name);
+
+  /**
+   * Creates TreeMap which persists data into DB.
+   * 
+   * @param <K> Key type
+   * @param <V> Value type
+   * @param name record name
+   * @param keyComparator Comparator used to sort keys
+   * @param keySerializer Serializer used for keys. This may reduce disk space
+   *          usage *
+   * @param valueSerializer Serializer used for values. This may reduce disk
+   *          space usage
+   * @return
+   */
+  <K, V> NavigableMap<K, V> createTreeMap(String name,
+      Comparator<K> keyComparator, Serializer<K> keySerializer,
+      Serializer<V> valueSerializer);
+
+  <K> NavigableSet<K> getTreeSet(String name);
+
+  <K> NavigableSet<K> createTreeSet(String name);
+
+  <K> NavigableSet<K> createTreeSet(String name, Comparator<K> keyComparator,
+      Serializer<K> keySerializer);
+
+  <K> List<K> createLinkedList(String name);
+
+  <K> List<K> createLinkedList(String name, Serializer<K> serializer);
+
+  <K> List<K> getLinkedList(String name);
+
+  /**
+   * returns unmodifiable map which contains all collection names and
+   * collections thenselfs
+   */
+  Map<String, Object> getCollections();
+
+  /** completely remove collection from store */
+  void deleteCollection(String name);
+
+  /**
+   * Java Collections returns their size as int. This may not be enought for
+   * JDBM collections. This method returns number of elements in JDBM collection
+   * as long.
+   * 
+   * @param collection created by JDBM
+   * @return number of elements in collection as long
+   */
+  long collectionSize(Object collection);
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Set;
+
+/**
+ * An abstract class implementing most of DB. It also has some JDBM package
+ * protected stuff (getNamedRecord)
+ */
+public abstract class DBAbstract implements DB {
+
+  /**
+   * Reserved slot for name directory recid.
+   */
+  static final byte NAME_DIRECTORY_ROOT = 0;
+
+  /**
+   * Reserved slot for version number
+   */
+  static final byte STORE_VERSION_NUMBER_ROOT = 1;
+
+  /**
+   * Reserved slot for recid where Serial class info is stored
+   * 
+   * NOTE when introducing more roots, do not forget to update defrag
+   */
+  static final byte SERIAL_CLASS_INFO_RECID_ROOT = 2;
+
+  /**
+   * to prevent double instances of the same collection, we use weak value map
+   * 
+   * //TODO what to do when there is rollback? //TODO clear on close
+   */
+  final private Map<String, WeakReference<Object>> collections = new HashMap<String, WeakReference<Object>>();
+
+  /**
+   * Inserts a new record using a custom serializer.
+   * 
+   * @param obj the object for the new record.
+   * @param serializer a custom serializer
+   * @return the rowid for the new record.
+   * @throws java.io.IOException when one of the underlying I/O operations
+   *           fails.
+   */
+  abstract <A> long insert(A obj, Serializer<A> serializer, boolean disableCache)
+      throws IOException;
+
+  /**
+   * Deletes a record.
+   * 
+   * @param recid the rowid for the record that should be deleted.
+   * @throws java.io.IOException when one of the underlying I/O operations
+   *           fails.
+   */
+  abstract void delete(long recid) throws IOException;
+
+  /**
+   * Updates a record using a custom serializer. If given recid does not exist,
+   * IOException will be thrown before/during commit (cache).
+   * 
+   * @param recid the recid for the record that is to be updated.
+   * @param obj the new object for the record.
+   * @param serializer a custom serializer
+   * @throws java.io.IOException when one of the underlying I/O operations fails
+   */
+  abstract <A> void update(long recid, A obj, Serializer<A> serializer)
+      throws IOException;
+
+  /**
+   * Fetches a record using a custom serializer.
+   * 
+   * @param recid the recid for the record that must be fetched.
+   * @param serializer a custom serializer
+   * @return the object contained in the record, null if given recid does not
+   *         exist
+   * @throws java.io.IOException when one of the underlying I/O operations
+   *           fails.
+   */
+  abstract <A> A fetch(long recid, Serializer<A> serializer) throws IOException;
+
+  /**
+   * Fetches a record using a custom serializer and optionaly disabled cache
+   * 
+   * @param recid the recid for the record that must be fetched.
+   * @param serializer a custom serializer
+   * @param disableCache true to disable any caching mechanism
+   * @return the object contained in the record, null if given recid does not
+   *         exist
+   * @throws java.io.IOException when one of the underlying I/O operations
+   *           fails.
+   */
+  abstract <A> A fetch(long recid, Serializer<A> serializer,
+      boolean disableCache) throws IOException;
+
+  @SuppressWarnings("unchecked")
+  public long insert(Object obj) throws IOException {
+    return insert(obj, defaultSerializer(), false);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void update(long recid, Object obj) throws IOException {
+    update(recid, obj, defaultSerializer());
+  }
+
+  @SuppressWarnings("unchecked")
+  public <A> A fetch(long recid) throws IOException {
+    return (A) fetch(recid, defaultSerializer());
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public <K, V> Map<K, V> getHashMap(String name) {
+    Object o = getCollectionInstance(name);
+    if (o != null)
+      return (Map<K, V>) o;
+
+    try {
+      long recid = getNamedObject(name);
+      if (recid == 0)
+        return null;
+
+      @SuppressWarnings("rawtypes")
+      HTree tree = fetch(recid);
+      tree.setPersistenceContext(this);
+      if (!tree.hasValues()) {
+        throw new ClassCastException("HashSet is not HashMap");
+      }
+      collections.put(name, new WeakReference<Object>(tree));
+      return tree;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K, V> Map<K, V> createHashMap(String name) {
+    return createHashMap(name, null, null);
+  }
+
+  @Override
+  public <K, V> Map<K, V> createHashMap(String name,
+      Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+    try {
+      assertNameNotExist(name);
+
+      @SuppressWarnings({ "unchecked", "rawtypes" })
+      HTree<K, V> tree = new HTree(this, keySerializer, valueSerializer, true);
+      long recid = insert(tree);
+      setNamedObject(name, recid);
+      collections.put(name, new WeakReference<Object>(tree));
+      return tree;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K> Set<K> getHashSet(String name) {
+    Object o = getCollectionInstance(name);
+    if (o != null)
+      return (Set<K>) o;
+
+    try {
+      long recid = getNamedObject(name);
+      if (recid == 0)
+        return null;
+
+      HTree tree = fetch(recid);
+      tree.setPersistenceContext(this);
+      if (tree.hasValues()) {
+        throw new ClassCastException("HashMap is not HashSet");
+      }
+      Set<K> ret = new HTreeSet(tree);
+      collections.put(name, new WeakReference<Object>(ret));
+      return ret;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K> Set<K> createHashSet(String name) {
+    return createHashSet(name, null);
+  }
+
+  @Override
+  public <K> Set<K> createHashSet(String name, Serializer<K> keySerializer) {
+    try {
+      assertNameNotExist(name);
+
+      HTree<K, Object> tree = new HTree(this, keySerializer, null, false);
+      long recid = insert(tree);
+      setNamedObject(name, recid);
+
+      Set<K> ret = new HTreeSet<K>(tree);
+      collections.put(name, new WeakReference<Object>(ret));
+      return ret;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K, V> NavigableMap<K, V> getTreeMap(String name) {
+    Object o = getCollectionInstance(name);
+    if (o != null)
+      return (NavigableMap<K, V>) o;
+
+    try {
+      long recid = getNamedObject(name);
+      if (recid == 0)
+        return null;
+
+      BTree t = BTree.<K, V> load(this, recid);
+      if (!t.hasValues())
+        throw new ClassCastException("TreeSet is not TreeMap");
+      NavigableMap<K, V> ret = new BTreeMap<K, V>(t, false); // TODO
+                                                             // put
+                                                             // readonly
+                                                             // flag
+                                                             // here
+      collections.put(name, new WeakReference<Object>(ret));
+      return ret;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K extends Comparable<K>, V> NavigableMap<K, V> createTreeMap(
+      String name) {
+    return createTreeMap(name, null, null, null);
+  }
+
+  @Override
+  public <K, V> NavigableMap<K, V> createTreeMap(String name,
+      Comparator<K> keyComparator, Serializer<K> keySerializer,
+      Serializer<V> valueSerializer) {
+    try {
+      assertNameNotExist(name);
+      BTree<K, V> tree = BTree.createInstance(this, keyComparator,
+          keySerializer, valueSerializer, true);
+      setNamedObject(name, tree.getRecid());
+      NavigableMap<K, V> ret = new BTreeMap<K, V>(tree, false); // TODO
+                                                                // put
+                                                                // readonly
+                                                                // flag
+                                                                // here
+      collections.put(name, new WeakReference<Object>(ret));
+      return ret;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K> NavigableSet<K> getTreeSet(String name) {
+    Object o = getCollectionInstance(name);
+    if (o != null)
+      return (NavigableSet<K>) o;
+
+    try {
+      long recid = getNamedObject(name);
+      if (recid == 0)
+        return null;
+
+      BTree t = BTree.<K, Object> load(this, recid);
+      if (t.hasValues())
+        throw new ClassCastException("TreeMap is not TreeSet");
+      BTreeSet<K> ret = new BTreeSet<K>(new BTreeMap(t, false));
+      collections.put(name, new WeakReference<Object>(ret));
+      return ret;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K> NavigableSet<K> createTreeSet(String name) {
+    return createTreeSet(name, null, null);
+  }
+
+  @Override
+  public <K> NavigableSet<K> createTreeSet(String name,
+      Comparator<K> keyComparator, Serializer<K> keySerializer) {
+    try {
+      assertNameNotExist(name);
+      BTree<K, Object> tree = BTree.createInstance(this, keyComparator,
+          keySerializer, null, false);
+      setNamedObject(name, tree.getRecid());
+      BTreeSet<K> ret = new BTreeSet<K>(new BTreeMap(tree, false));
+      collections.put(name, new WeakReference<Object>(ret));
+      return ret;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @Override
+  public <K> List<K> createLinkedList(String name) {
+    return createLinkedList(name, null);
+  }
+
+  @Override
+  public <K> List<K> createLinkedList(String name, Serializer<K> serializer) {
+    try {
+      assertNameNotExist(name);
+
+      // allocate record and overwrite it
+
+      LinkedList<K> list = new LinkedList<K>(this, serializer);
+      long recid = insert(list);
+      setNamedObject(name, recid);
+
+      collections.put(name, new WeakReference<Object>(list));
+
+      return list;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public <K> List<K> getLinkedList(String name) {
+    Object o = getCollectionInstance(name);
+    if (o != null)
+      return (List<K>) o;
+
+    try {
+      long recid = getNamedObject(name);
+      if (recid == 0)
+        return null;
+      LinkedList<K> list = (LinkedList<K>) fetch(recid);
+      list.setPersistenceContext(this);
+      collections.put(name, new WeakReference<Object>(list));
+      return list;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  private Object getCollectionInstance(String name) {
+    WeakReference ref = collections.get(name);
+    if (ref == null)
+      return null;
+    Object o = ref.get();
+    if (o != null)
+      return o;
+    // already GCed
+    collections.remove(name);
+    return null;
+  }
+
+  private void assertNameNotExist(String name) throws IOException {
+    if (getNamedObject(name) != 0)
+      throw new IllegalArgumentException("Object with name '" + name
+          + "' already exists");
+  }
+
+  /**
+   * Obtain the record id of a named object. Returns 0 if named object doesn't
+   * exist. Named objects are used to store Map views and other well known
+   * objects.
+   */
+  protected long getNamedObject(String name) throws IOException {
+    long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT);
+    if (nameDirectory_recid == 0) {
+      return 0;
+    }
+    HTree<String, Long> m = fetch(nameDirectory_recid);
+    Long res = m.get(name);
+    if (res == null)
+      return 0;
+    return res;
+  }
+
+  /**
+   * Set the record id of a named object. Named objects are used to store Map
+   * views and other well known objects.
+   */
+  protected void setNamedObject(String name, long recid) throws IOException {
+    long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT);
+    HTree<String, Long> m = null;
+    if (nameDirectory_recid == 0) {
+      // does not exists, create it
+      m = new HTree<String, Long>(this, null, null, true);
+      nameDirectory_recid = insert(m);
+      setRoot(NAME_DIRECTORY_ROOT, nameDirectory_recid);
+    } else {
+      // fetch it
+      m = fetch(nameDirectory_recid);
+    }
+    m.put(name, recid);
+  }
+
+  @Override
+  public Map<String, Object> getCollections() {
+    try {
+      Map<String, Object> ret = new LinkedHashMap<String, Object>();
+      long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT);
+      if (nameDirectory_recid == 0)
+        return ret;
+      HTree<String, Long> m = fetch(nameDirectory_recid);
+
+      for (Map.Entry<String, Long> e : m.entrySet()) {
+        Object o = fetch(e.getValue());
+        if (o instanceof BTree) {
+          if (((BTree) o).hasValues)
+            o = getTreeMap(e.getKey());
+          else
+            o = getTreeSet(e.getKey());
+        } else if (o instanceof HTree) {
+          if (((HTree) o).hasValues)
+            o = getHashMap(e.getKey());
+          else
+            o = getHashSet(e.getKey());
+        }
+
+        ret.put(e.getKey(), o);
+      }
+      return Collections.unmodifiableMap(ret);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @Override
+  public void deleteCollection(String name) {
+    try {
+      long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT);
+      if (nameDirectory_recid == 0)
+        throw new IOException("Collection not found");
+      HTree<String, Long> dir = fetch(nameDirectory_recid);
+
+      Long recid = dir.get(name);
+      if (recid == null)
+        throw new IOException("Collection not found");
+
+      Object o = fetch(recid);
+      // we can not use O instance since it is not correctly initialized
+      if (o instanceof LinkedList) {
+        LinkedList l = (LinkedList) o;
+        l.clear();
+        delete(l.rootRecid);
+      } else if (o instanceof BTree) {
+        ((BTree) o).clear();
+      } else if (o instanceof HTree) {
+        HTree t = (HTree) o;
+        t.clear();
+        HTreeDirectory n = (HTreeDirectory) fetch(t.rootRecid, t.SERIALIZER);
+        n.deleteAllChildren();
+        delete(t.rootRecid);
+      } else {
+        throw new InternalError("unknown collection type: "
+            + (o == null ? null : o.getClass()));
+      }
+      delete(recid);
+      collections.remove(name);
+
+      dir.remove(name);
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  /**
+   * we need to set reference to this DB instance, so serializer needs to be
+   * here
+   */
+  final Serializer<Serialization> defaultSerializationSerializer = new Serializer<Serialization>() {
+
+    @Override
+    public void serialize(DataOutput out, Serialization obj) throws IOException {
+      LongPacker.packLong(out, obj.serialClassInfoRecid);
+      SerialClassInfo.serializer.serialize(out, obj.registered);
+    }
+
+    @Override
+    public Serialization deserialize(DataInput in) throws IOException,
+        ClassNotFoundException {
+      final long recid = LongPacker.unpackLong(in);
+      final ArrayList<SerialClassInfo.ClassInfo> classes = SerialClassInfo.serializer
+          .deserialize(in);
+      return new Serialization(DBAbstract.this, recid, classes);
+    }
+  };
+
+  public Serializer defaultSerializer() {
+
+    try {
+      long serialClassInfoRecid = getRoot(SERIAL_CLASS_INFO_RECID_ROOT);
+      if (serialClassInfoRecid == 0) {
+        // allocate new recid
+        serialClassInfoRecid = insert(null, JDBMUtils.NULL_SERIALIZER, false);
+        // and insert new serializer
+        Serialization ser = new Serialization(this, serialClassInfoRecid,
+            new ArrayList<SerialClassInfo.ClassInfo>());
+
+        update(serialClassInfoRecid, ser, defaultSerializationSerializer);
+        setRoot(SERIAL_CLASS_INFO_RECID_ROOT, serialClassInfoRecid);
+        return ser;
+      } else {
+        return fetch(serialClassInfoRecid, defaultSerializationSerializer);
+      }
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  final protected void checkNotClosed() {
+    if (isClosed())
+      throw new IllegalStateException("db was closed");
+  }
+
+  protected abstract void setRoot(byte root, long recid);
+
+  protected abstract long getRoot(byte root);
+
+  @Override
+  public long collectionSize(Object collection) {
+    if (collection instanceof BTreeMap) {
+      BTreeMap t = (BTreeMap) collection;
+      if (t.fromKey != null || t.toKey != null)
+        throw new IllegalArgumentException(
+            "collectionSize does not work on BTree submap");
+      return t.tree._entries;
+    } else if (collection instanceof HTree) {
+      return ((HTree) collection).getRoot().size;
+    } else if (collection instanceof HTreeSet) {
+      return collectionSize(((HTreeSet) collection).map);
+    } else if (collection instanceof BTreeSet) {
+      return collectionSize(((BTreeSet) collection).map);
+    } else if (collection instanceof LinkedList) {
+      return ((LinkedList) collection).getRoot().size;
+    } else {
+      throw new IllegalArgumentException("Not JDBM collection");
+    }
+  }
+
+  void addShutdownHook() {
+    if (shutdownCloseThread != null) {
+      shutdownCloseThread = new ShutdownCloseThread();
+      Runtime.getRuntime().addShutdownHook(shutdownCloseThread);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (shutdownCloseThread != null) {
+      Runtime.getRuntime().removeShutdownHook(shutdownCloseThread);
+      shutdownCloseThread.dbToClose = null;
+      shutdownCloseThread = null;
+    }
+  }
+
+  ShutdownCloseThread shutdownCloseThread = null;
+
+  private static class ShutdownCloseThread extends Thread {
+
+    DBAbstract dbToClose = null;
+
+    ShutdownCloseThread() {
+      super("JDBM shutdown");
+    }
+
+    @Override
+    public void run() {
+      if (dbToClose != null && !dbToClose.isClosed()) {
+        dbToClose.shutdownCloseThread = null;
+        dbToClose.close();
+      }
+    }
+
+  }
+
+  @Override
+  public void rollback() {
+    try {
+      for (WeakReference<Object> o : collections.values()) {
+        Object c = o.get();
+        if (c != null && c instanceof BTreeMap) {
+          // reload tree
+          BTreeMap m = (BTreeMap) c;
+          m.tree = fetch(m.tree.getRecid());
+        }
+        if (c != null && c instanceof BTreeSet) {
+          // reload tree
+          BTreeSet m = (BTreeSet) c;
+          m.map.tree = fetch(m.map.tree.getRecid());
+        }
+
+      }
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Iterator;
+
+import javax.crypto.Cipher;
+
+/**
+ * Abstract class with common cache functionality
+ */
+public abstract class DBCache extends DBStore {
+
+  static final int NUM_OF_DIRTY_RECORDS_BEFORE_AUTOCOMIT = 1024;
+
+  static final byte NONE = 1;
+  static final byte MRU = 2;
+  static final byte WEAK = 3;
+  static final byte SOFT = 4;
+  static final byte HARD = 5;
+
+  static final class DirtyCacheEntry {
+    long _recid; // TODO recid is already part of _hashDirties, so this field
+                 // could be removed to save memory
+    Object _obj;
+    Serializer _serializer;
+  }
+
+  /**
+   * Dirty status of _hash CacheEntry Values
+   */
+  final protected LongHashMap<DirtyCacheEntry> _hashDirties = new LongHashMap<DirtyCacheEntry>();
+
+  private Serializer cachedDefaultSerializer = null;
+
+  /**
+   * Construct a CacheRecordManager wrapping another DB and using a given cache
+   * policy.
+   */
+  public DBCache(String filename, boolean readonly,
+      boolean transactionDisabled, Cipher cipherIn, Cipher cipherOut,
+      boolean useRandomAccessFile, boolean deleteFilesAfterClose,
+      boolean lockingDisabled) {
+
+    super(filename, readonly, transactionDisabled, cipherIn, cipherOut,
+        useRandomAccessFile, deleteFilesAfterClose, lockingDisabled);
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public Serializer defaultSerializer() {
+    if (cachedDefaultSerializer == null)
+      cachedDefaultSerializer = super.defaultSerializer();
+    return cachedDefaultSerializer;
+  }
+
+  @Override
+  boolean needsAutoCommit() {
+    return super.needsAutoCommit()
+        || (transactionsDisabled && !commitInProgress && _hashDirties.size() > NUM_OF_DIRTY_RECORDS_BEFORE_AUTOCOMIT);
+  }
+
+  @Override
+  public <A> long insert(final A obj, final Serializer<A> serializer,
+      final boolean disableCache) throws IOException {
+    checkNotClosed();
+
+    if (super.needsAutoCommit())
+      commit();
+
+    if (disableCache)
+      return super.insert(obj, serializer, disableCache);
+
+    // prealocate recid so we have something to return
+    final long recid = super.insert(PREALOCATE_OBJ, null, disableCache);
+
+    // super.update(recid, obj,serializer);
+
+    // return super.insert(obj,serializer,disableCache);
+
+    // and create new dirty record for future update
+    final DirtyCacheEntry e = new DirtyCacheEntry();
+    e._recid = recid;
+    e._obj = obj;
+    e._serializer = serializer;
+    _hashDirties.put(recid, e);
+
+    return recid;
+  }
+
+  @Override
+  public void commit() {
+    try {
+      commitInProgress = true;
+      updateCacheEntries();
+      super.commit();
+    } finally {
+      commitInProgress = false;
+    }
+  }
+
+  @Override
+  public void rollback() {
+    cachedDefaultSerializer = null;
+    _hashDirties.clear();
+    super.rollback();
+  }
+
+  private static final Comparator<DirtyCacheEntry> DIRTY_COMPARATOR = new Comparator<DirtyCacheEntry>() {
+    @Override
+    final public int compare(DirtyCacheEntry o1, DirtyCacheEntry o2) {
+      return (int) (o1._recid - o2._recid);
+
+    }
+  };
+
+  /**
+   * Update all dirty cache objects to the underlying DB.
+   */
+  @SuppressWarnings("unchecked")
+  protected void updateCacheEntries() {
+    try {
+      while (!_hashDirties.isEmpty()) {
+        // make defensive copy of values as _db.update() may trigger changes
+        // in db
+        // and this would modify dirties again
+        DirtyCacheEntry[] vals = new DirtyCacheEntry[_hashDirties.size()];
+        Iterator<DirtyCacheEntry> iter = _hashDirties.valuesIterator();
+
+        for (int i = 0; i < vals.length; i++) {
+          vals[i] = iter.next();
+        }
+        iter = null;
+
+        java.util.Arrays.sort(vals, DIRTY_COMPARATOR);
+
+        for (int i = 0; i < vals.length; i++) {
+          final DirtyCacheEntry entry = vals[i];
+          vals[i] = null;
+          super.update(entry._recid, entry._obj, entry._serializer);
+          _hashDirties.remove(entry._recid);
+
+        }
+
+        // update may have triggered more records to be added into dirties, so
+        // repeat until all records are written.
+      }
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+}



Mime
View raw message