Return-Path: X-Original-To: apmail-hama-commits-archive@www.apache.org Delivered-To: apmail-hama-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3F47EDE00 for ; Wed, 19 Sep 2012 11:53:22 +0000 (UTC) Received: (qmail 601 invoked by uid 500); 19 Sep 2012 11:53:22 -0000 Delivered-To: apmail-hama-commits-archive@hama.apache.org Received: (qmail 477 invoked by uid 500); 19 Sep 2012 11:53:19 -0000 Mailing-List: contact commits-help@hama.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hama.apache.org Delivered-To: mailing list commits@hama.apache.org Received: (qmail 436 invoked by uid 99); 19 Sep 2012 11:53:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Sep 2012 11:53:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 19 Sep 2012 11:53:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6812923889C5 for ; Wed, 19 Sep 2012 11:52:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1387533 [2/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j... Date: Wed, 19 Sep 2012 11:52:24 -0000 To: commits@hama.apache.org From: tjungblut@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120919115231.6812923889C5@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java?rev=1387533&view=auto ============================================================================== --- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java (added) +++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java Wed Sep 19 11:52:20 2012 @@ -0,0 +1,1532 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.jdbm; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOError; +import java.io.IOException; +import java.util.ConcurrentModificationException; +import java.util.List; + +/** + * Node of a BTree. + *

+ * The node contains a number of key-value pairs. Keys are ordered to allow + * dichotomic search. If value is too big, it is stored in separate record and + * only recid reference is stored + *

+ * If the node is a leaf node, the keys and values are user-defined and + * represent entries inserted by the user. + *

+ * If the node is non-leaf, each key represents the greatest key in the + * underlying BTreeNode and the values are recids pointing to the children + * BTreeNodes. The only exception is the rightmost BTreeNode, which is + * considered to have an "infinite" key value, meaning that any insert will be + * to the left of this pseudo-key + */ +public final class BTreeNode implements Serializer> { + + private static final boolean DEBUG = false; + + /** + * Parent B+Tree. + */ + transient BTree _btree; + + /** + * This BTreeNode's record ID in the DB. + */ + protected transient long _recid; + + /** + * Flag indicating if this is a leaf BTreeNode. + */ + protected boolean _isLeaf; + + /** + * Keys of children nodes + */ + protected K[] _keys; + + /** + * Values associated with keys. (Only valid if leaf node) + */ + protected Object[] _values; + + /** + * Children nodes (recids) associated with keys. (Only valid if non-leaf node) + */ + protected long[] _children; + + /** + * Index of first used item at the node + */ + protected byte _first; + + /** + * Previous leaf node (only if this node is a leaf) + */ + protected long _previous; + + /** + * Next leaf node (only if this node is a leaf) + */ + protected long _next; + + /** + * Return the B+Tree that is the owner of this {@link BTreeNode}. + */ + public BTree getBTree() { + return _btree; + } + + /** + * No-argument constructor used by serialization. + */ + public BTreeNode() { + // empty + } + + /** + * Root node overflow constructor + */ + @SuppressWarnings("unchecked") + BTreeNode(BTree btree, BTreeNode root, BTreeNode overflow) + throws IOException { + _btree = btree; + + _isLeaf = false; + + _first = BTree.DEFAULT_SIZE - 2; + + _keys = (K[]) new Object[BTree.DEFAULT_SIZE]; + _keys[BTree.DEFAULT_SIZE - 2] = overflow.getLargestKey(); + _keys[BTree.DEFAULT_SIZE - 1] = root.getLargestKey(); + + _children = new long[BTree.DEFAULT_SIZE]; + _children[BTree.DEFAULT_SIZE - 2] = overflow._recid; + _children[BTree.DEFAULT_SIZE - 1] = root._recid; + + _recid = _btree._db.insert(this, this, false); + } + + /** + * Root node (first insert) constructor. + */ + @SuppressWarnings("unchecked") + BTreeNode(BTree btree, K key, V value) throws IOException { + _btree = btree; + + _isLeaf = true; + + _first = BTree.DEFAULT_SIZE - 2; + + _keys = (K[]) new Object[BTree.DEFAULT_SIZE]; + _keys[BTree.DEFAULT_SIZE - 2] = key; + _keys[BTree.DEFAULT_SIZE - 1] = null; // I am the root BTreeNode for now + + _values = new Object[BTree.DEFAULT_SIZE]; + _values[BTree.DEFAULT_SIZE - 2] = value; + _values[BTree.DEFAULT_SIZE - 1] = null; // I am the root BTreeNode for now + + _recid = _btree._db.insert(this, this, false); + } + + /** + * Overflow node constructor. Creates an empty BTreeNode. + */ + @SuppressWarnings("unchecked") + BTreeNode(BTree btree, boolean isLeaf) { + _btree = btree; + + _isLeaf = isLeaf; + + // node will initially be half-full + _first = BTree.DEFAULT_SIZE / 2; + + _keys = (K[]) new Object[BTree.DEFAULT_SIZE]; + if (isLeaf) { + _values = new Object[BTree.DEFAULT_SIZE]; + } else { + _children = new long[BTree.DEFAULT_SIZE]; + } + + try { + _recid = _btree._db.insert(this, this, false); + } catch (IOException e) { + throw new IOError(e); + } + } + + /** + * Get largest key under this BTreeNode. Null is considered to be the greatest + * possible key. + */ + K getLargestKey() { + return _keys[BTree.DEFAULT_SIZE - 1]; + } + + /** + * Return true if BTreeNode is empty. + */ + boolean isEmpty() { + if (_isLeaf) { + return (_first == _values.length - 1); + } else { + return (_first == _children.length - 1); + } + } + + /** + * Return true if BTreeNode is full. + */ + boolean isFull() { + return (_first == 0); + } + + /** + * Find the object associated with the given key. + * + * @param height Height of the current BTreeNode (zero is leaf node) + * @param key The key + * @return TupleBrowser positionned just before the given key, or before next + * greater key if key isn't found. + */ + BTree.BTreeTupleBrowser find(int height, final K key, + final boolean inclusive) throws IOException { + byte index = findChildren(key, inclusive); + + height -= 1; + + if (height == 0) { + // leaf node + return new Browser(this, index); + } else { + // non-leaf node + BTreeNode child = loadNode(_children[index]); + return child.find(height, key, inclusive); + } + } + + /** + * Find value associated with the given key. + * + * @param height Height of the current BTreeNode (zero is leaf node) + * @param key The key + * @return TupleBrowser positionned just before the given key, or before next + * greater key if key isn't found. + */ + V findValue(int height, K key) throws IOException { + byte index = findChildren(key, true); + + height -= 1; + + if (height == 0) { + + K key2 = _keys[index]; + // // get returns the matching key or the next ordered key, so we must + // // check if we have an exact match + if (key2 == null || compare(key, key2) != 0) + return null; + + // leaf node + if (_values[index] instanceof BTreeLazyRecord) + return ((BTreeLazyRecord) _values[index]).get(); + else + return (V) _values[index]; + + } else { + // non-leaf node + BTreeNode child = loadNode(_children[index]); + return child.findValue(height, key); + } + } + + /** + * Find first entry and return a browser positioned before it. + * + * @return TupleBrowser positionned just before the first entry. + */ + BTree.BTreeTupleBrowser findFirst() throws IOException { + if (_isLeaf) { + return new Browser(this, _first); + } else { + BTreeNode child = loadNode(_children[_first]); + return child.findFirst(); + } + } + + /** + * Deletes this BTreeNode and all children nodes from the record manager + */ + void delete() throws IOException { + if (_isLeaf) { + if (_next != 0) { + BTreeNode nextNode = loadNode(_next); + if (nextNode._previous == _recid) { // this consistency check can be + // removed in production code + nextNode._previous = _previous; + _btree._db.update(nextNode._recid, nextNode, nextNode); + } else { + throw new Error("Inconsistent data in BTree"); + } + } + if (_previous != 0) { + BTreeNode previousNode = loadNode(_previous); + if (previousNode._next != _recid) { // this consistency check can be + // removed in production code + previousNode._next = _next; + _btree._db.update(previousNode._recid, previousNode, previousNode); + } else { + throw new Error("Inconsistent data in BTree"); + } + } + } else { + int left = _first; + int right = BTree.DEFAULT_SIZE - 1; + + for (int i = left; i <= right; i++) { + BTreeNode childNode = loadNode(_children[i]); + childNode.delete(); + } + } + + _btree._db.delete(_recid); + } + + /** + * Insert the given key and value. + *

+ * Since the Btree does not support duplicate entries, the caller must specify + * whether to replace the existing value. + * + * @param height Height of the current BTreeNode (zero is leaf node) + * @param key Insert key + * @param value Insert value + * @param replace Set to true to replace the existing value, if one exists. + * @return Insertion result containing existing value OR a BTreeNode if the + * key was inserted and provoked a BTreeNode overflow. + */ + InsertResult insert(int height, K key, final V value, + final boolean replace) throws IOException { + InsertResult result; + long overflow; + + final byte index = findChildren(key, true); + + height -= 1; + if (height == 0) { + + // reuse InsertResult instance to avoid GC trashing on massive inserts + result = _btree.insertResultReuse; + _btree.insertResultReuse = null; + if (result == null) + result = new InsertResult(); + + // inserting on a leaf BTreeNode + overflow = -1; + if (DEBUG) { + System.out.println("BTreeNode.insert() Insert on leaf node key=" + key + + " value=" + value + " index=" + index); + } + if (compare(_keys[index], key) == 0) { + // key already exists + if (DEBUG) { + System.out.println("BTreeNode.insert() Key already exists."); + } + boolean isLazyRecord = _values[index] instanceof BTreeLazyRecord; + if (isLazyRecord) + result._existing = ((BTreeLazyRecord) _values[index]).get(); + else + result._existing = (V) _values[index]; + if (replace) { + // remove old lazy record if necesarry + if (isLazyRecord) + ((BTreeLazyRecord) _values[index]).delete(); + _values[index] = value; + _btree._db.update(_recid, this, this); + } + // return the existing key + return result; + } + } else { + // non-leaf BTreeNode + BTreeNode child = loadNode(_children[index]); + result = child.insert(height, key, value, replace); + + if (result._existing != null) { + // return existing key, if any. + return result; + } + + if (result._overflow == null) { + // no overflow means we're done with insertion + return result; + } + + // there was an overflow, we need to insert the overflow node on this + // BTreeNode + if (DEBUG) { + System.out.println("BTreeNode.insert() Overflow node: " + + result._overflow._recid); + } + key = result._overflow.getLargestKey(); + overflow = result._overflow._recid; + + // update child's largest key + _keys[index] = child.getLargestKey(); + + // clean result so we can reuse it + result._overflow = null; + } + + // if we get here, we need to insert a new entry on the BTreeNode before + // _children[ index ] + if (!isFull()) { + if (height == 0) { + insertEntry(this, index - 1, key, value); + } else { + insertChild(this, index - 1, key, overflow); + } + _btree._db.update(_recid, this, this); + return result; + } + + // node is full, we must divide the node + final byte half = BTree.DEFAULT_SIZE >> 1; + BTreeNode newNode = new BTreeNode(_btree, _isLeaf); + if (index < half) { + // move lower-half of entries to overflow node, including new entry + if (DEBUG) { + System.out + .println("BTreeNode.insert() move lower-half of entries to overflow BTreeNode, including new entry."); + } + if (height == 0) { + copyEntries(this, 0, newNode, half, index); + setEntry(newNode, half + index, key, value); + copyEntries(this, index, newNode, half + index + 1, half - index - 1); + } else { + copyChildren(this, 0, newNode, half, index); + setChild(newNode, half + index, key, overflow); + copyChildren(this, index, newNode, half + index + 1, half - index - 1); + } + } else { + // move lower-half of entries to overflow node, new entry stays on this + // node + if (DEBUG) { + System.out + .println("BTreeNode.insert() move lower-half of entries to overflow BTreeNode. New entry stays"); + } + if (height == 0) { + copyEntries(this, 0, newNode, half, half); + copyEntries(this, half, this, half - 1, index - half); + setEntry(this, index - 1, key, value); + } else { + copyChildren(this, 0, newNode, half, half); + copyChildren(this, half, this, half - 1, index - half); + setChild(this, index - 1, key, overflow); + } + } + + _first = half - 1; + + // nullify lower half of entries + for (int i = 0; i < _first; i++) { + if (height == 0) { + setEntry(this, i, null, null); + } else { + setChild(this, i, null, -1); + } + } + + if (_isLeaf) { + // link newly created node + newNode._previous = _previous; + newNode._next = _recid; + if (_previous != 0) { + BTreeNode previous = loadNode(_previous); + previous._next = newNode._recid; + _btree._db.update(_previous, previous, this); + + } + _previous = newNode._recid; + } + + _btree._db.update(_recid, this, this); + _btree._db.update(newNode._recid, newNode, this); + + result._overflow = newNode; + return result; + } + + /** + * Remove the entry associated with the given key. + * + * @param height Height of the current BTreeNode (zero is leaf node) + * @param key Removal key + * @return Remove result object + */ + RemoveResult remove(int height, K key) throws IOException { + RemoveResult result; + + int half = BTree.DEFAULT_SIZE / 2; + byte index = findChildren(key, true); + + height -= 1; + if (height == 0) { + // remove leaf entry + if (compare(_keys[index], key) != 0) { + throw new IllegalArgumentException("Key not found: " + key); + } + result = new RemoveResult(); + + if (_values[index] instanceof BTreeLazyRecord) { + BTreeLazyRecord r = (BTreeLazyRecord) _values[index]; + result._value = r.get(); + r.delete(); + } else { + result._value = (V) _values[index]; + } + removeEntry(this, index); + + // update this node + _btree._db.update(_recid, this, this); + + } else { + // recurse into Btree to remove entry on a children node + BTreeNode child = loadNode(_children[index]); + result = child.remove(height, key); + + // update children + _keys[index] = child.getLargestKey(); + _btree._db.update(_recid, this, this); + + if (result._underflow) { + // underflow occured + if (child._first != half + 1) { + throw new IllegalStateException("Error during underflow [1]"); + } + if (index < _children.length - 1) { + // exists greater brother node + BTreeNode brother = loadNode(_children[index + 1]); + int bfirst = brother._first; + if (bfirst < half) { + // steal entries from "brother" node + int steal = (half - bfirst + 1) / 2; + brother._first += steal; + child._first -= steal; + if (child._isLeaf) { + copyEntries(child, half + 1, child, half + 1 - steal, half - 1); + copyEntries(brother, bfirst, child, 2 * half - steal, steal); + } else { + copyChildren(child, half + 1, child, half + 1 - steal, half - 1); + copyChildren(brother, bfirst, child, 2 * half - steal, steal); + } + + for (int i = bfirst; i < bfirst + steal; i++) { + if (brother._isLeaf) { + setEntry(brother, i, null, null); + } else { + setChild(brother, i, null, -1); + } + } + + // update child's largest key + _keys[index] = child.getLargestKey(); + + // no change in previous/next node + + // update nodes + _btree._db.update(_recid, this, this); + _btree._db.update(brother._recid, brother, this); + _btree._db.update(child._recid, child, this); + + } else { + // move all entries from node "child" to "brother" + if (brother._first != half) { + throw new IllegalStateException("Error during underflow [2]"); + } + + brother._first = 1; + if (child._isLeaf) { + copyEntries(child, half + 1, brother, 1, half - 1); + } else { + copyChildren(child, half + 1, brother, 1, half - 1); + } + _btree._db.update(brother._recid, brother, this); + + // remove "child" from current node + if (_isLeaf) { + copyEntries(this, _first, this, _first + 1, index - _first); + setEntry(this, _first, null, null); + } else { + copyChildren(this, _first, this, _first + 1, index - _first); + setChild(this, _first, null, -1); + } + _first += 1; + _btree._db.update(_recid, this, this); + + // re-link previous and next nodes + if (child._previous != 0) { + BTreeNode prev = loadNode(child._previous); + prev._next = child._next; + _btree._db.update(prev._recid, prev, this); + } + if (child._next != 0) { + BTreeNode next = loadNode(child._next); + next._previous = child._previous; + _btree._db.update(next._recid, next, this); + + } + + // delete "child" node + _btree._db.delete(child._recid); + } + } else { + // node "brother" is before "child" + BTreeNode brother = loadNode(_children[index - 1]); + int bfirst = brother._first; + if (bfirst < half) { + // steal entries from "brother" node + int steal = (half - bfirst + 1) / 2; + brother._first += steal; + child._first -= steal; + if (child._isLeaf) { + copyEntries(brother, 2 * half - steal, child, half + 1 - steal, + steal); + copyEntries(brother, bfirst, brother, bfirst + steal, 2 * half + - bfirst - steal); + } else { + copyChildren(brother, 2 * half - steal, child, half + 1 - steal, + steal); + copyChildren(brother, bfirst, brother, bfirst + steal, 2 * half + - bfirst - steal); + } + + for (int i = bfirst; i < bfirst + steal; i++) { + if (brother._isLeaf) { + setEntry(brother, i, null, null); + } else { + setChild(brother, i, null, -1); + } + } + + // update brother's largest key + _keys[index - 1] = brother.getLargestKey(); + + // no change in previous/next node + + // update nodes + _btree._db.update(_recid, this, this); + _btree._db.update(brother._recid, brother, this); + _btree._db.update(child._recid, child, this); + + } else { + // move all entries from node "brother" to "child" + if (brother._first != half) { + throw new IllegalStateException("Error during underflow [3]"); + } + + child._first = 1; + if (child._isLeaf) { + copyEntries(brother, half, child, 1, half); + } else { + copyChildren(brother, half, child, 1, half); + } + _btree._db.update(child._recid, child, this); + + // remove "brother" from current node + if (_isLeaf) { + copyEntries(this, _first, this, _first + 1, index - 1 - _first); + setEntry(this, _first, null, null); + } else { + copyChildren(this, _first, this, _first + 1, index - 1 - _first); + setChild(this, _first, null, -1); + } + _first += 1; + _btree._db.update(_recid, this, this); + + // re-link previous and next nodes + if (brother._previous != 0) { + BTreeNode prev = loadNode(brother._previous); + prev._next = brother._next; + _btree._db.update(prev._recid, prev, this); + } + if (brother._next != 0) { + BTreeNode next = loadNode(brother._next); + next._previous = brother._previous; + _btree._db.update(next._recid, next, this); + } + + // delete "brother" node + _btree._db.delete(brother._recid); + } + } + } + } + + // underflow if node is more than half-empty + result._underflow = _first > half; + + return result; + } + + /** + * Find the first children node with a key equal or greater than the given + * key. + * + * @return index of first children with equal or greater key. + */ + private byte findChildren(final K key, final boolean inclusive) { + int left = _first; + int right = BTree.DEFAULT_SIZE - 1; + int middle; + final int D = inclusive ? 0 : 1; + + // binary search + while (true) { + middle = (left + right) / 2; + if (compare(_keys[middle], key) < D) { + left = middle + 1; + } else { + right = middle; + } + if (left >= right) { + return (byte) right; + } + } + } + + /** + * Insert entry at given position. + */ + private static void insertEntry(BTreeNode node, int index, + K key, V value) { + K[] keys = node._keys; + Object[] values = node._values; + int start = node._first; + int count = index - node._first + 1; + + // shift entries to the left + System.arraycopy(keys, start, keys, start - 1, count); + System.arraycopy(values, start, values, start - 1, count); + node._first -= 1; + keys[index] = key; + values[index] = value; + } + + /** + * Insert child at given position. + */ + private static void insertChild(BTreeNode node, int index, + K key, long child) { + K[] keys = node._keys; + long[] children = node._children; + int start = node._first; + int count = index - node._first + 1; + + // shift entries to the left + System.arraycopy(keys, start, keys, start - 1, count); + System.arraycopy(children, start, children, start - 1, count); + node._first -= 1; + keys[index] = key; + children[index] = child; + } + + /** + * Remove entry at given position. + */ + private static void removeEntry(BTreeNode node, int index) { + K[] keys = node._keys; + Object[] values = node._values; + int start = node._first; + int count = index - node._first; + + System.arraycopy(keys, start, keys, start + 1, count); + keys[start] = null; + System.arraycopy(values, start, values, start + 1, count); + values[start] = null; + node._first++; + } + + /** + * Set the entry at the given index. + */ + private static void setEntry(BTreeNode node, int index, K key, + V value) { + node._keys[index] = key; + node._values[index] = value; + } + + /** + * Set the child BTreeNode recid at the given index. + */ + private static void setChild(BTreeNode node, int index, K key, + long recid) { + node._keys[index] = key; + node._children[index] = recid; + } + + /** + * Copy entries between two nodes + */ + private static void copyEntries(BTreeNode source, + int indexSource, BTreeNode dest, int indexDest, int count) { + System.arraycopy(source._keys, indexSource, dest._keys, indexDest, count); + System.arraycopy(source._values, indexSource, dest._values, indexDest, + count); + } + + /** + * Copy child node recids between two nodes + */ + private static void copyChildren(BTreeNode source, + int indexSource, BTreeNode dest, int indexDest, int count) { + System.arraycopy(source._keys, indexSource, dest._keys, indexDest, count); + System.arraycopy(source._children, indexSource, dest._children, indexDest, + count); + } + + /** + * Load the node at the given recid. + */ + private BTreeNode loadNode(long recid) throws IOException { + BTreeNode child = _btree._db.fetch(recid, this); + child._recid = recid; + child._btree = _btree; + return child; + } + + private final int compare(final K value1, final K value2) { + if (value1 == null) { + return 1; + } + if (value2 == null) { + return -1; + } + + if (_btree._comparator == null) { + return ((Comparable) value1).compareTo(value2); + } else { + return _btree._comparator.compare(value1, value2); + } + + } + + /** + * Dump the structure of the tree on the screen. This is used for debugging + * purposes only. + */ + private void dump(int height) { + String prefix = ""; + for (int i = 0; i < height; i++) { + prefix += " "; + } + System.out.println(prefix + + "-------------------------------------- BTreeNode recid=" + _recid); + System.out.println(prefix + "first=" + _first); + for (int i = 0; i < BTree.DEFAULT_SIZE; i++) { + if (_isLeaf) { + System.out.println(prefix + "BTreeNode [" + i + "] " + _keys[i] + " " + + _values[i]); + } else { + System.out.println(prefix + "BTreeNode [" + i + "] " + _keys[i] + " " + + _children[i]); + } + } + System.out.println(prefix + "--------------------------------------"); + } + + /** + * Recursively dump the state of the BTree on screen. This is used for + * debugging purposes only. + */ + void dumpRecursive(int height, int level) throws IOException { + height -= 1; + level += 1; + if (height > 0) { + for (byte i = _first; i < BTree.DEFAULT_SIZE; i++) { + if (_keys[i] == null) + break; + BTreeNode child = loadNode(_children[i]); + child.dump(level); + child.dumpRecursive(height, level); + } + } + } + + /** + * Deserialize the content of an object from a byte array. + */ + @SuppressWarnings("unchecked") + public BTreeNode deserialize(DataInput ois2) throws IOException { + DataInputOutput ois = (DataInputOutput) ois2; + + BTreeNode node = new BTreeNode(); + + switch (ois.readUnsignedByte()) { + case SerializationHeader.BTREE_NODE_LEAF: + node._isLeaf = true; + break; + case SerializationHeader.BTREE_NODE_NONLEAF: + node._isLeaf = false; + break; + default: + throw new InternalError("wrong BTreeNode header"); + } + + if (node._isLeaf) { + node._previous = LongPacker.unpackLong(ois); + node._next = LongPacker.unpackLong(ois); + } + + node._first = ois.readByte(); + + if (!node._isLeaf) { + node._children = new long[BTree.DEFAULT_SIZE]; + for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) { + node._children[i] = LongPacker.unpackLong(ois); + } + } + + if (!_btree.loadValues) + return node; + + try { + + node._keys = readKeys(ois, node._first); + + } catch (ClassNotFoundException except) { + throw new IOException(except.getMessage()); + } + + if (node._isLeaf) { + + try { + readValues(ois, node); + } catch (ClassNotFoundException except) { + throw new IOException(except); + } + } + + return node; + + } + + /** + * Serialize the content of an object into a byte array. + * + * @param obj Object to serialize + * @return a byte array representing the object's state + */ + public void serialize(DataOutput oos, BTreeNode obj) throws IOException { + + // note: It is assumed that BTreeNode instance doing the serialization is + // the parent + // of the BTreeNode object being serialized. + + BTreeNode node = obj; + + oos.writeByte(node._isLeaf ? SerializationHeader.BTREE_NODE_LEAF + : SerializationHeader.BTREE_NODE_NONLEAF); + if (node._isLeaf) { + LongPacker.packLong(oos, node._previous); + LongPacker.packLong(oos, node._next); + } + + oos.write(node._first); + + if (!node._isLeaf) { + for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) { + LongPacker.packLong(oos, node._children[i]); + } + } + + writeKeys(oos, node._keys, node._first); + + if (node._isLeaf && _btree.hasValues()) { + writeValues(oos, node); + } + } + + private void readValues(DataInputOutput ois, BTreeNode node) + throws IOException, ClassNotFoundException { + node._values = new Object[BTree.DEFAULT_SIZE]; + if (_btree.hasValues()) { + Serializer serializer = _btree.valueSerializer != null ? _btree.valueSerializer + : (Serializer) _btree.getRecordManager().defaultSerializer(); + for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) { + int header = ois.readUnsignedByte(); + if (header == BTreeLazyRecord.NULL) { + node._values[i] = null; + } else if (header == BTreeLazyRecord.LAZY_RECORD) { + long recid = LongPacker.unpackLong(ois); + node._values[i] = new BTreeLazyRecord(_btree._db, recid, serializer); + } else { + node._values[i] = BTreeLazyRecord.fastDeser(ois, serializer, header); + } + } + } else { + // create fake values + for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) { + if (node._keys[i] != null) + node._values[i] = JDBMUtils.EMPTY_STRING; + } + } + } + + private void writeValues(DataOutput oos, BTreeNode node) + throws IOException { + + DataInputOutput output = null; + Serializer serializer = _btree.valueSerializer != null ? _btree.valueSerializer + : _btree.getRecordManager().defaultSerializer(); + for (int i = node._first; i < BTree.DEFAULT_SIZE; i++) { + if (node._values[i] instanceof BTreeLazyRecord) { + oos.write(BTreeLazyRecord.LAZY_RECORD); + LongPacker.packLong(oos, ((BTreeLazyRecord) node._values[i]).recid); + } else if (node._values[i] != null) { + if (output == null) { + output = new DataInputOutput(); + } else { + output.reset(); + } + + serializer.serialize(output, node._values[i]); + + if (output.getPos() > BTreeLazyRecord.MAX_INTREE_RECORD_SIZE) { + // write as separate record + long recid = _btree._db.insert(output.toByteArray(), + BTreeLazyRecord.FAKE_SERIALIZER, true); + oos.write(BTreeLazyRecord.LAZY_RECORD); + LongPacker.packLong(oos, recid); + } else { + // write as part of btree + oos.write(output.getPos()); + oos.write(output.getBuf(), 0, output.getPos()); + } + } else { + oos.write(BTreeLazyRecord.NULL); + } + } + } + + private static final int ALL_NULL = 0; + private static final int ALL_INTEGERS = 1 << 5; + private static final int ALL_INTEGERS_NEGATIVE = 2 << 5; + private static final int ALL_LONGS = 3 << 5; + private static final int ALL_LONGS_NEGATIVE = 4 << 5; + private static final int ALL_STRINGS = 5 << 5; + private static final int ALL_OTHER = 6 << 5; + + private K[] readKeys(DataInput ois, final int firstUse) throws IOException, + ClassNotFoundException { + Object[] ret = new Object[BTree.DEFAULT_SIZE]; + final int type = ois.readUnsignedByte(); + if (type == ALL_NULL) { + return (K[]) ret; + } else if (type == ALL_INTEGERS || type == ALL_INTEGERS_NEGATIVE) { + long first = LongPacker.unpackLong(ois); + if (type == ALL_INTEGERS_NEGATIVE) + first = -first; + ret[firstUse] = Integer.valueOf((int) first); + for (int i = firstUse + 1; i < BTree.DEFAULT_SIZE; i++) { + // ret[i] = Serialization.readObject(ois); + long v = LongPacker.unpackLong(ois); + if (v == 0) + continue; // null + v = v + first; + ret[i] = Integer.valueOf((int) v); + first = v; + } + return (K[]) ret; + } else if (type == ALL_LONGS || type == ALL_LONGS_NEGATIVE) { + long first = LongPacker.unpackLong(ois); + if (type == ALL_LONGS_NEGATIVE) + first = -first; + + ret[firstUse] = Long.valueOf(first); + for (int i = firstUse + 1; i < BTree.DEFAULT_SIZE; i++) { + // ret[i] = Serialization.readObject(ois); + long v = LongPacker.unpackLong(ois); + if (v == 0) + continue; // null + v = v + first; + ret[i] = Long.valueOf(v); + first = v; + } + return (K[]) ret; + } else if (type == ALL_STRINGS) { + byte[] previous = null; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + byte[] b = leadingValuePackRead(ois, previous, 0); + if (b == null) + continue; + ret[i] = new String(b, Serialization.UTF8); + previous = b; + } + return (K[]) ret; + + } else if (type == ALL_OTHER) { + + // TODO why this block is here? + if (_btree.keySerializer == null + || _btree.keySerializer == _btree.getRecordManager() + .defaultSerializer()) { + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + ret[i] = _btree.getRecordManager().defaultSerializer() + .deserialize(ois); + } + return (K[]) ret; + } + + Serializer ser = _btree.keySerializer != null ? _btree.keySerializer + : _btree.getRecordManager().defaultSerializer(); + DataInputOutput in2 = null; + byte[] previous = null; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + byte[] b = leadingValuePackRead(ois, previous, 0); + if (b == null) + continue; + if (in2 == null) { + in2 = new DataInputOutput(); + } + in2.reset(b); + ret[i] = ser.deserialize(in2); + previous = b; + } + return (K[]) ret; + + } else { + throw new InternalError("unknown BTreeNode header type: " + type); + } + + } + + @SuppressWarnings("unchecked") + private void writeKeys(DataOutput oos, K[] keys, final int firstUse) + throws IOException { + if (keys.length != BTree.DEFAULT_SIZE) + throw new IllegalArgumentException("wrong keys size"); + + // check if all items on key are null + boolean allNull = true; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] != null) { + allNull = false; + break; + } + } + if (allNull) { + oos.write(ALL_NULL); + return; + } + + /** + * Special compression to compress Long and Integer + */ + if ((_btree._comparator == JDBMUtils.COMPARABLE_COMPARATOR || _btree._comparator == null) + && (_btree.keySerializer == null || _btree.keySerializer == _btree + .getRecordManager().defaultSerializer())) { + boolean allInteger = true; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] != null && keys[i].getClass() != Integer.class) { + allInteger = false; + break; + } + } + boolean allLong = true; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] != null && (keys[i].getClass() != Long.class || + // special case to exclude Long.MIN_VALUE from conversion, causes + // problems to LongPacker + ((Long) keys[i]).longValue() == Long.MIN_VALUE)) { + allLong = false; + break; + } + } + + if (allLong) { + // check that diff between MIN and MAX fits into PACKED_LONG + long max = Long.MIN_VALUE; + long min = Long.MAX_VALUE; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] == null) + continue; + long v = (Long) keys[i]; + if (v > max) + max = v; + if (v < min) + min = v; + } + // now convert to Double to prevent overflow errors + double max2 = max; + double min2 = min; + double maxDiff = Long.MAX_VALUE; + if (max2 - min2 > maxDiff / 2) // divide by two just to by sure + allLong = false; + + } + + if (allLong && allInteger) + throw new InternalError(); + + if (allLong || allInteger) { + long first = ((Number) keys[firstUse]).longValue(); + // write header + if (allInteger) { + if (first > 0) + oos.write(ALL_INTEGERS); + else + oos.write(ALL_INTEGERS_NEGATIVE); + } else if (allLong) { + if (first > 0) + oos.write(ALL_LONGS); + else + oos.write(ALL_LONGS_NEGATIVE); + } else { + throw new InternalError(); + } + + // write first + LongPacker.packLong(oos, Math.abs(first)); + // write others + for (int i = firstUse + 1; i < BTree.DEFAULT_SIZE; i++) { + // Serialization.writeObject(oos, keys[i]); + if (keys[i] == null) + LongPacker.packLong(oos, 0); + else { + long v = ((Number) keys[i]).longValue(); + if (v <= first) + throw new InternalError("not ordered"); + LongPacker.packLong(oos, v - first); + first = v; + } + } + return; + } else { + // another special case for Strings + boolean allString = true; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] != null && (keys[i].getClass() != String.class)) { + allString = false; + break; + } + } + if (allString) { + oos.write(ALL_STRINGS); + byte[] previous = null; + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] == null) { + leadingValuePackWrite(oos, null, previous, 0); + } else { + byte[] b = ((String) keys[i]).getBytes(Serialization.UTF8); + leadingValuePackWrite(oos, b, previous, 0); + previous = b; + } + } + return; + } + } + } + + /** + * other case, serializer is provided or other stuff + */ + oos.write(ALL_OTHER); + if (_btree.keySerializer == null + || _btree.keySerializer == _btree.getRecordManager() + .defaultSerializer()) { + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + _btree.getRecordManager().defaultSerializer().serialize(oos, keys[i]); + } + return; + } + + // custom serializer is provided, use it + + Serializer ser = _btree.keySerializer; + byte[] previous = null; + + DataInputOutput out3 = new DataInputOutput(); + for (int i = firstUse; i < BTree.DEFAULT_SIZE; i++) { + if (keys[i] == null) { + leadingValuePackWrite(oos, null, previous, 0); + } else { + out3.reset(); + ser.serialize(out3, keys[i]); + byte[] b = out3.toByteArray(); + leadingValuePackWrite(oos, b, previous, 0); + previous = b; + } + } + + } + + public void defrag(DBStore r1, DBStore r2) throws IOException { + if (_children != null) + for (long child : _children) { + if (child == 0) + continue; + byte[] data = r1.fetchRaw(child); + r2.forceInsert(child, data); + BTreeNode t = deserialize(new DataInputOutput(data)); + t._btree = _btree; + t.defrag(r1, r2); + } + } + + /** + * STATIC INNER CLASS Result from insert() method call + */ + static final class InsertResult { + + /** + * Overflow node. + */ + BTreeNode _overflow; + + /** + * Existing value for the insertion key. + */ + V _existing; + + } + + /** + * STATIC INNER CLASS Result from remove() method call + */ + static final class RemoveResult { + + /** + * Set to true if underlying nodes underflowed + */ + boolean _underflow; + + /** + * Removed entry value + */ + V _value; + } + + /** + * PRIVATE INNER CLASS Browser to traverse leaf nodes. + */ + static final class Browser implements BTree.BTreeTupleBrowser { + + /** + * Current node. + */ + private BTreeNode _node; + + /** + * Current index in the node. The index positionned on the next tuple to + * return. + */ + private byte _index; + + private int expectedModCount; + + /** + * Create a browser. + * + * @param node Current node + * @param index Position of the next tuple to return. + */ + Browser(BTreeNode node, byte index) { + _node = node; + _index = index; + expectedModCount = node._btree.modCount; + } + + public boolean getNext(BTree.BTreeTuple tuple) throws IOException { + if (expectedModCount != _node._btree.modCount) + throw new ConcurrentModificationException(); + if (_node == null) { + // last record in iterator was deleted, so iterator is at end of node + return false; + } + + if (_index < BTree.DEFAULT_SIZE) { + if (_node._keys[_index] == null) { + // reached end of the tree. + return false; + } + } else if (_node._next != 0) { + // move to next node + _node = _node.loadNode(_node._next); + _index = _node._first; + } + tuple.key = _node._keys[_index]; + if (_node._values[_index] instanceof BTreeLazyRecord) + tuple.value = ((BTreeLazyRecord) _node._values[_index]).get(); + else + tuple.value = (V) _node._values[_index]; + _index++; + return true; + } + + public boolean getPrevious(BTree.BTreeTuple tuple) throws IOException { + if (expectedModCount != _node._btree.modCount) + throw new ConcurrentModificationException(); + + if (_node == null) { + // deleted last record, but this situation is only supportedd on getNext + throw new InternalError(); + } + + if (_index == _node._first) { + + if (_node._previous != 0) { + _node = _node.loadNode(_node._previous); + _index = BTree.DEFAULT_SIZE; + } else { + // reached beginning of the tree + return false; + } + } + _index--; + tuple.key = _node._keys[_index]; + if (_node._values[_index] instanceof BTreeLazyRecord) + tuple.value = ((BTreeLazyRecord) _node._values[_index]).get(); + else + tuple.value = (V) _node._values[_index]; + + return true; + + } + + public void remove(K key) throws IOException { + if (expectedModCount != _node._btree.modCount) + throw new ConcurrentModificationException(); + + _node._btree.remove(key); + expectedModCount++; + + // An entry was removed and this may trigger tree rebalance, + // This would change current node layout, so find our position again + BTree.BTreeTupleBrowser b = _node._btree.browse(key, true); + // browser is positioned just before value which was currently deleted, so + // find if we have new value + if (b.getNext(new BTree.BTreeTuple(null, null))) { + // next value value exists, copy its state + Browser b2 = (Browser) b; + this._node = b2._node; + this._index = b2._index; + } else { + this._node = null; + this._index = -1; + } + + } + } + + /** + * Used for debugging and testing only. Recursively obtains the recids of all + * child BTreeNodes and adds them to the 'out' list. + * + * @param out + * @param height + * @throws IOException + */ + void dumpChildNodeRecIDs(List out, int height) throws IOException { + height -= 1; + if (height > 0) { + for (byte i = _first; i < BTree.DEFAULT_SIZE; i++) { + if (_children[i] == 0) + continue; + + BTreeNode child = loadNode(_children[i]); + out.add(new Long(child._recid)); + child.dumpChildNodeRecIDs(out, height); + } + } + } + + /** + * Read previously written data + * + * @author Kevin Day + */ + static byte[] leadingValuePackRead(DataInput in, byte[] previous, + int ignoreLeadingCount) throws IOException { + int len = LongPacker.unpackInt(in) - 1; // 0 indicates null + if (len == -1) + return null; + + int actualCommon = LongPacker.unpackInt(in); + + byte[] buf = new byte[len]; + + if (previous == null) { + actualCommon = 0; + } + + if (actualCommon > 0) { + in.readFully(buf, 0, ignoreLeadingCount); + System.arraycopy(previous, ignoreLeadingCount, buf, ignoreLeadingCount, + actualCommon - ignoreLeadingCount); + } + in.readFully(buf, actualCommon, len - actualCommon); + return buf; + } + + /** + * This method is used for delta compression for keys. Writes the contents of + * buf to the DataOutput out, with special encoding if there are common + * leading bytes in the previous group stored by this compressor. + * + * @author Kevin Day + */ + static void leadingValuePackWrite(DataOutput out, byte[] buf, + byte[] previous, int ignoreLeadingCount) throws IOException { + if (buf == null) { + LongPacker.packInt(out, 0); + return; + } + + int actualCommon = ignoreLeadingCount; + + if (previous != null) { + int maxCommon = buf.length > previous.length ? previous.length + : buf.length; + + if (maxCommon > Short.MAX_VALUE) + maxCommon = Short.MAX_VALUE; + + for (; actualCommon < maxCommon; actualCommon++) { + if (buf[actualCommon] != previous[actualCommon]) + break; + } + } + + // there are enough common bytes to justify compression + LongPacker.packInt(out, buf.length + 1);// store as +1, 0 indicates null + LongPacker.packInt(out, actualCommon); + out.write(buf, 0, ignoreLeadingCount); + out.write(buf, actualCommon, buf.length - actualCommon); + + } + + BTreeNode loadLastChildNode() throws IOException { + return loadNode(_children[BTree.DEFAULT_SIZE - 1]); + } + +} Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java?rev=1387533&view=auto ============================================================================== --- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java (added) +++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java Wed Sep 19 11:52:20 2012 @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hama.jdbm; + +import java.util.AbstractSet; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableSet; +import java.util.SortedSet; + +/** + * Wrapper class for >SortedMap to implement + * >NavigableSet + *

+ * This code originally comes from Apache Harmony, was adapted by Jan Kotek for + * JDBM + */ +public final class BTreeSet extends AbstractSet implements + NavigableSet { + + /** + * use keyset from this map + */ + final BTreeMap map; + + BTreeSet(BTreeMap map) { + this.map = map; + } + + @Override + public boolean add(E object) { + return map.put(object, JDBMUtils.EMPTY_STRING) == null; + } + + @Override + public boolean addAll(Collection collection) { + return super.addAll(collection); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public Comparator comparator() { + return map.comparator(); + } + + @Override + public boolean contains(Object object) { + return map.containsKey(object); + } + + @Override + public boolean isEmpty() { + return map.isEmpty(); + } + + @Override + public E lower(E e) { + return map.lowerKey(e); + } + + @Override + public E floor(E e) { + return map.floorKey(e); + } + + @Override + public E ceiling(E e) { + return map.ceilingKey(e); + } + + @Override + public E higher(E e) { + return map.higherKey(e); + } + + @Override + public E pollFirst() { + Map.Entry e = map.pollFirstEntry(); + return e != null ? e.getKey() : null; + } + + @Override + public E pollLast() { + Map.Entry e = map.pollLastEntry(); + return e != null ? e.getKey() : null; + } + + @Override + public Iterator iterator() { + final Iterator> iter = map.entrySet().iterator(); + return new Iterator() { + @Override + public boolean hasNext() { + return iter.hasNext(); + } + + @Override + public E next() { + Map.Entry e = iter.next(); + return e != null ? e.getKey() : null; + } + + @Override + public void remove() { + iter.remove(); + } + }; + } + + @Override + public NavigableSet descendingSet() { + return map.descendingKeySet(); + } + + @Override + public Iterator descendingIterator() { + return map.descendingKeySet().iterator(); + } + + @Override + public NavigableSet subSet(E fromElement, boolean fromInclusive, + E toElement, boolean toInclusive) { + return map.subMap(fromElement, fromInclusive, toElement, toInclusive) + .navigableKeySet(); + } + + @Override + public NavigableSet headSet(E toElement, boolean inclusive) { + return map.headMap(toElement, inclusive).navigableKeySet(); + } + + @Override + public NavigableSet tailSet(E fromElement, boolean inclusive) { + return map.tailMap(fromElement, inclusive).navigableKeySet(); + } + + @Override + public boolean remove(Object object) { + return map.remove(object) != null; + } + + @Override + public int size() { + return map.size(); + } + + @Override + public E first() { + return map.firstKey(); + } + + @Override + public E last() { + return map.lastKey(); + } + + @Override + public SortedSet subSet(E start, E end) { + Comparator c = map.comparator(); + int compare = (c == null) ? ((Comparable) start).compareTo(end) : c + .compare(start, end); + if (compare <= 0) { + return new BTreeSet((BTreeMap) map.subMap(start, true, end, + false)); + } + throw new IllegalArgumentException(); + } + + @Override + public SortedSet headSet(E end) { + // Check for errors + Comparator c = map.comparator(); + if (c == null) { + ((Comparable) end).compareTo(end); + } else { + c.compare(end, end); + } + return new BTreeSet((BTreeMap) map.headMap(end, false)); + } + + @Override + public SortedSet tailSet(E start) { + // Check for errors + Comparator c = map.comparator(); + if (c == null) { + ((Comparable) start).compareTo(start); + } else { + c.compare(start, start); + } + return new BTreeSet((BTreeMap) map.tailMap(start, true)); + } + +} Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java?rev=1387533&view=auto ============================================================================== --- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java (added) +++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java Wed Sep 19 11:52:20 2012 @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.jdbm; + +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; + +/** + * Database is root class for creating and loading persistent collections. It + * also contains transaction operations. + *

+ */ +public interface DB { + + /** + * Closes the DB and release resources. DB can not be used after it was closed + */ + void close(); + + /** @return true if db was already closed */ + boolean isClosed(); + + /** + * Clear cache and remove all entries it contains. This may be useful for some + * Garbage Collection when reference cache is used. + */ + void clearCache(); + + /** + * Defragments storage so it consumes less space. It basically copyes all + * records into different store and then renames it, replacing original store. + *

+ * Defrag has two steps: In first collections are rearranged, so records in + * collection are close to each other, and read speed is improved. In second + * step all records are sequentially transferred, reclaiming all unused space. + * First step is optinal and may slow down defragmentation significantly as ut + * requires many random-access reads. Second step reads and writes data + * sequentially and is very fast, comparable to copying files to new location. + * + *

+ * This commits any uncommited data. Defrag also requires free space, as store + * is basically recreated at new location. + * + * @param sortCollections if collection records should be rearranged during + * defragment, this takes some extra time + */ + void defrag(boolean sortCollections); + + /** + * Commit (make persistent) all changes since beginning of transaction. JDBM + * supports only single transaction. + */ + void commit(); + + /** + * Rollback (cancel) all changes since beginning of transaction. JDBM supports + * only single transaction. This operations affects all maps created or loaded + * by this DB. + */ + void rollback(); + + /** + * This calculates some database statistics such as collection sizes and + * record distributions. Can be useful for performance optimalisations and + * trouble shuting. This method can run for very long time. + * + * @return statistics contained in string + */ + String calculateStatistics(); + + /** + * Copy database content into ZIP file + * + * @param zipFile + */ + void copyToZip(String zipFile); + + /** + * Get a Map which was already created and saved in DB. This map + * uses disk based H*Tree and should have similar performance as + * HashMap. + * + * @param name of hash map + * + * @return map + */ + Map getHashMap(String name); + + /** + * Creates Map which persists data into DB. + * + * @param name record name + * @return + */ + Map createHashMap(String name); + + /** + * Creates Hash Map which persists data into DB. Map will use custom + * serializers for Keys and Values. Leave keySerializer null to use default + * serializer for keys + * + * @param Key type + * @param Value type + * @param name record name + * @param keySerializer serializer to be used for Keys, leave null to use + * default serializer + * @param valueSerializer serializer to be used for Values + * @return + */ + Map createHashMap(String name, Serializer keySerializer, + Serializer valueSerializer); + + Set createHashSet(String name); + + Set getHashSet(String name); + + Set createHashSet(String name, Serializer keySerializer); + + NavigableMap getTreeMap(String name); + + /** + * Create TreeMap which persists data into DB. + * + * @param Key type + * @param Value type + * @param name record name + * @return + */ + , V> NavigableMap createTreeMap(String name); + + /** + * Creates TreeMap which persists data into DB. + * + * @param Key type + * @param Value type + * @param name record name + * @param keyComparator Comparator used to sort keys + * @param keySerializer Serializer used for keys. This may reduce disk space + * usage * + * @param valueSerializer Serializer used for values. This may reduce disk + * space usage + * @return + */ + NavigableMap createTreeMap(String name, + Comparator keyComparator, Serializer keySerializer, + Serializer valueSerializer); + + NavigableSet getTreeSet(String name); + + NavigableSet createTreeSet(String name); + + NavigableSet createTreeSet(String name, Comparator keyComparator, + Serializer keySerializer); + + List createLinkedList(String name); + + List createLinkedList(String name, Serializer serializer); + + List getLinkedList(String name); + + /** + * returns unmodifiable map which contains all collection names and + * collections thenselfs + */ + Map getCollections(); + + /** completely remove collection from store */ + void deleteCollection(String name); + + /** + * Java Collections returns their size as int. This may not be enought for + * JDBM collections. This method returns number of elements in JDBM collection + * as long. + * + * @param collection created by JDBM + * @return number of elements in collection as long + */ + long collectionSize(Object collection); + +} Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java?rev=1387533&view=auto ============================================================================== --- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java (added) +++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java Wed Sep 19 11:52:20 2012 @@ -0,0 +1,636 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.jdbm; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOError; +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Set; + +/** + * An abstract class implementing most of DB. It also has some JDBM package + * protected stuff (getNamedRecord) + */ +public abstract class DBAbstract implements DB { + + /** + * Reserved slot for name directory recid. + */ + static final byte NAME_DIRECTORY_ROOT = 0; + + /** + * Reserved slot for version number + */ + static final byte STORE_VERSION_NUMBER_ROOT = 1; + + /** + * Reserved slot for recid where Serial class info is stored + * + * NOTE when introducing more roots, do not forget to update defrag + */ + static final byte SERIAL_CLASS_INFO_RECID_ROOT = 2; + + /** + * to prevent double instances of the same collection, we use weak value map + * + * //TODO what to do when there is rollback? //TODO clear on close + */ + final private Map> collections = new HashMap>(); + + /** + * Inserts a new record using a custom serializer. + * + * @param obj the object for the new record. + * @param serializer a custom serializer + * @return the rowid for the new record. + * @throws java.io.IOException when one of the underlying I/O operations + * fails. + */ + abstract long insert(A obj, Serializer serializer, boolean disableCache) + throws IOException; + + /** + * Deletes a record. + * + * @param recid the rowid for the record that should be deleted. + * @throws java.io.IOException when one of the underlying I/O operations + * fails. + */ + abstract void delete(long recid) throws IOException; + + /** + * Updates a record using a custom serializer. If given recid does not exist, + * IOException will be thrown before/during commit (cache). + * + * @param recid the recid for the record that is to be updated. + * @param obj the new object for the record. + * @param serializer a custom serializer + * @throws java.io.IOException when one of the underlying I/O operations fails + */ + abstract void update(long recid, A obj, Serializer serializer) + throws IOException; + + /** + * Fetches a record using a custom serializer. + * + * @param recid the recid for the record that must be fetched. + * @param serializer a custom serializer + * @return the object contained in the record, null if given recid does not + * exist + * @throws java.io.IOException when one of the underlying I/O operations + * fails. + */ + abstract A fetch(long recid, Serializer serializer) throws IOException; + + /** + * Fetches a record using a custom serializer and optionaly disabled cache + * + * @param recid the recid for the record that must be fetched. + * @param serializer a custom serializer + * @param disableCache true to disable any caching mechanism + * @return the object contained in the record, null if given recid does not + * exist + * @throws java.io.IOException when one of the underlying I/O operations + * fails. + */ + abstract A fetch(long recid, Serializer serializer, + boolean disableCache) throws IOException; + + @SuppressWarnings("unchecked") + public long insert(Object obj) throws IOException { + return insert(obj, defaultSerializer(), false); + } + + @SuppressWarnings("unchecked") + public void update(long recid, Object obj) throws IOException { + update(recid, obj, defaultSerializer()); + } + + @SuppressWarnings("unchecked") + public A fetch(long recid) throws IOException { + return (A) fetch(recid, defaultSerializer()); + } + + @SuppressWarnings("unchecked") + @Override + public Map getHashMap(String name) { + Object o = getCollectionInstance(name); + if (o != null) + return (Map) o; + + try { + long recid = getNamedObject(name); + if (recid == 0) + return null; + + @SuppressWarnings("rawtypes") + HTree tree = fetch(recid); + tree.setPersistenceContext(this); + if (!tree.hasValues()) { + throw new ClassCastException("HashSet is not HashMap"); + } + collections.put(name, new WeakReference(tree)); + return tree; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public Map createHashMap(String name) { + return createHashMap(name, null, null); + } + + @Override + public Map createHashMap(String name, + Serializer keySerializer, Serializer valueSerializer) { + try { + assertNameNotExist(name); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + HTree tree = new HTree(this, keySerializer, valueSerializer, true); + long recid = insert(tree); + setNamedObject(name, recid); + collections.put(name, new WeakReference(tree)); + return tree; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public Set getHashSet(String name) { + Object o = getCollectionInstance(name); + if (o != null) + return (Set) o; + + try { + long recid = getNamedObject(name); + if (recid == 0) + return null; + + HTree tree = fetch(recid); + tree.setPersistenceContext(this); + if (tree.hasValues()) { + throw new ClassCastException("HashMap is not HashSet"); + } + Set ret = new HTreeSet(tree); + collections.put(name, new WeakReference(ret)); + return ret; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public Set createHashSet(String name) { + return createHashSet(name, null); + } + + @Override + public Set createHashSet(String name, Serializer keySerializer) { + try { + assertNameNotExist(name); + + HTree tree = new HTree(this, keySerializer, null, false); + long recid = insert(tree); + setNamedObject(name, recid); + + Set ret = new HTreeSet(tree); + collections.put(name, new WeakReference(ret)); + return ret; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public NavigableMap getTreeMap(String name) { + Object o = getCollectionInstance(name); + if (o != null) + return (NavigableMap) o; + + try { + long recid = getNamedObject(name); + if (recid == 0) + return null; + + BTree t = BTree. load(this, recid); + if (!t.hasValues()) + throw new ClassCastException("TreeSet is not TreeMap"); + NavigableMap ret = new BTreeMap(t, false); // TODO + // put + // readonly + // flag + // here + collections.put(name, new WeakReference(ret)); + return ret; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public , V> NavigableMap createTreeMap( + String name) { + return createTreeMap(name, null, null, null); + } + + @Override + public NavigableMap createTreeMap(String name, + Comparator keyComparator, Serializer keySerializer, + Serializer valueSerializer) { + try { + assertNameNotExist(name); + BTree tree = BTree.createInstance(this, keyComparator, + keySerializer, valueSerializer, true); + setNamedObject(name, tree.getRecid()); + NavigableMap ret = new BTreeMap(tree, false); // TODO + // put + // readonly + // flag + // here + collections.put(name, new WeakReference(ret)); + return ret; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public NavigableSet getTreeSet(String name) { + Object o = getCollectionInstance(name); + if (o != null) + return (NavigableSet) o; + + try { + long recid = getNamedObject(name); + if (recid == 0) + return null; + + BTree t = BTree. load(this, recid); + if (t.hasValues()) + throw new ClassCastException("TreeMap is not TreeSet"); + BTreeSet ret = new BTreeSet(new BTreeMap(t, false)); + collections.put(name, new WeakReference(ret)); + return ret; + + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public NavigableSet createTreeSet(String name) { + return createTreeSet(name, null, null); + } + + @Override + public NavigableSet createTreeSet(String name, + Comparator keyComparator, Serializer keySerializer) { + try { + assertNameNotExist(name); + BTree tree = BTree.createInstance(this, keyComparator, + keySerializer, null, false); + setNamedObject(name, tree.getRecid()); + BTreeSet ret = new BTreeSet(new BTreeMap(tree, false)); + collections.put(name, new WeakReference(ret)); + return ret; + + } catch (IOException e) { + throw new IOError(e); + } + + } + + @Override + public List createLinkedList(String name) { + return createLinkedList(name, null); + } + + @Override + public List createLinkedList(String name, Serializer serializer) { + try { + assertNameNotExist(name); + + // allocate record and overwrite it + + LinkedList list = new LinkedList(this, serializer); + long recid = insert(list); + setNamedObject(name, recid); + + collections.put(name, new WeakReference(list)); + + return list; + } catch (IOException e) { + throw new IOError(e); + } + } + + @Override + public List getLinkedList(String name) { + Object o = getCollectionInstance(name); + if (o != null) + return (List) o; + + try { + long recid = getNamedObject(name); + if (recid == 0) + return null; + LinkedList list = (LinkedList) fetch(recid); + list.setPersistenceContext(this); + collections.put(name, new WeakReference(list)); + return list; + } catch (IOException e) { + throw new IOError(e); + } + } + + private Object getCollectionInstance(String name) { + WeakReference ref = collections.get(name); + if (ref == null) + return null; + Object o = ref.get(); + if (o != null) + return o; + // already GCed + collections.remove(name); + return null; + } + + private void assertNameNotExist(String name) throws IOException { + if (getNamedObject(name) != 0) + throw new IllegalArgumentException("Object with name '" + name + + "' already exists"); + } + + /** + * Obtain the record id of a named object. Returns 0 if named object doesn't + * exist. Named objects are used to store Map views and other well known + * objects. + */ + protected long getNamedObject(String name) throws IOException { + long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT); + if (nameDirectory_recid == 0) { + return 0; + } + HTree m = fetch(nameDirectory_recid); + Long res = m.get(name); + if (res == null) + return 0; + return res; + } + + /** + * Set the record id of a named object. Named objects are used to store Map + * views and other well known objects. + */ + protected void setNamedObject(String name, long recid) throws IOException { + long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT); + HTree m = null; + if (nameDirectory_recid == 0) { + // does not exists, create it + m = new HTree(this, null, null, true); + nameDirectory_recid = insert(m); + setRoot(NAME_DIRECTORY_ROOT, nameDirectory_recid); + } else { + // fetch it + m = fetch(nameDirectory_recid); + } + m.put(name, recid); + } + + @Override + public Map getCollections() { + try { + Map ret = new LinkedHashMap(); + long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT); + if (nameDirectory_recid == 0) + return ret; + HTree m = fetch(nameDirectory_recid); + + for (Map.Entry e : m.entrySet()) { + Object o = fetch(e.getValue()); + if (o instanceof BTree) { + if (((BTree) o).hasValues) + o = getTreeMap(e.getKey()); + else + o = getTreeSet(e.getKey()); + } else if (o instanceof HTree) { + if (((HTree) o).hasValues) + o = getHashMap(e.getKey()); + else + o = getHashSet(e.getKey()); + } + + ret.put(e.getKey(), o); + } + return Collections.unmodifiableMap(ret); + } catch (IOException e) { + throw new IOError(e); + } + + } + + @Override + public void deleteCollection(String name) { + try { + long nameDirectory_recid = getRoot(NAME_DIRECTORY_ROOT); + if (nameDirectory_recid == 0) + throw new IOException("Collection not found"); + HTree dir = fetch(nameDirectory_recid); + + Long recid = dir.get(name); + if (recid == null) + throw new IOException("Collection not found"); + + Object o = fetch(recid); + // we can not use O instance since it is not correctly initialized + if (o instanceof LinkedList) { + LinkedList l = (LinkedList) o; + l.clear(); + delete(l.rootRecid); + } else if (o instanceof BTree) { + ((BTree) o).clear(); + } else if (o instanceof HTree) { + HTree t = (HTree) o; + t.clear(); + HTreeDirectory n = (HTreeDirectory) fetch(t.rootRecid, t.SERIALIZER); + n.deleteAllChildren(); + delete(t.rootRecid); + } else { + throw new InternalError("unknown collection type: " + + (o == null ? null : o.getClass())); + } + delete(recid); + collections.remove(name); + + dir.remove(name); + + } catch (IOException e) { + throw new IOError(e); + } + + } + + /** + * we need to set reference to this DB instance, so serializer needs to be + * here + */ + final Serializer defaultSerializationSerializer = new Serializer() { + + @Override + public void serialize(DataOutput out, Serialization obj) throws IOException { + LongPacker.packLong(out, obj.serialClassInfoRecid); + SerialClassInfo.serializer.serialize(out, obj.registered); + } + + @Override + public Serialization deserialize(DataInput in) throws IOException, + ClassNotFoundException { + final long recid = LongPacker.unpackLong(in); + final ArrayList classes = SerialClassInfo.serializer + .deserialize(in); + return new Serialization(DBAbstract.this, recid, classes); + } + }; + + public Serializer defaultSerializer() { + + try { + long serialClassInfoRecid = getRoot(SERIAL_CLASS_INFO_RECID_ROOT); + if (serialClassInfoRecid == 0) { + // allocate new recid + serialClassInfoRecid = insert(null, JDBMUtils.NULL_SERIALIZER, false); + // and insert new serializer + Serialization ser = new Serialization(this, serialClassInfoRecid, + new ArrayList()); + + update(serialClassInfoRecid, ser, defaultSerializationSerializer); + setRoot(SERIAL_CLASS_INFO_RECID_ROOT, serialClassInfoRecid); + return ser; + } else { + return fetch(serialClassInfoRecid, defaultSerializationSerializer); + } + + } catch (IOException e) { + throw new IOError(e); + } + + } + + final protected void checkNotClosed() { + if (isClosed()) + throw new IllegalStateException("db was closed"); + } + + protected abstract void setRoot(byte root, long recid); + + protected abstract long getRoot(byte root); + + @Override + public long collectionSize(Object collection) { + if (collection instanceof BTreeMap) { + BTreeMap t = (BTreeMap) collection; + if (t.fromKey != null || t.toKey != null) + throw new IllegalArgumentException( + "collectionSize does not work on BTree submap"); + return t.tree._entries; + } else if (collection instanceof HTree) { + return ((HTree) collection).getRoot().size; + } else if (collection instanceof HTreeSet) { + return collectionSize(((HTreeSet) collection).map); + } else if (collection instanceof BTreeSet) { + return collectionSize(((BTreeSet) collection).map); + } else if (collection instanceof LinkedList) { + return ((LinkedList) collection).getRoot().size; + } else { + throw new IllegalArgumentException("Not JDBM collection"); + } + } + + void addShutdownHook() { + if (shutdownCloseThread != null) { + shutdownCloseThread = new ShutdownCloseThread(); + Runtime.getRuntime().addShutdownHook(shutdownCloseThread); + } + } + + @Override + public void close() { + if (shutdownCloseThread != null) { + Runtime.getRuntime().removeShutdownHook(shutdownCloseThread); + shutdownCloseThread.dbToClose = null; + shutdownCloseThread = null; + } + } + + ShutdownCloseThread shutdownCloseThread = null; + + private static class ShutdownCloseThread extends Thread { + + DBAbstract dbToClose = null; + + ShutdownCloseThread() { + super("JDBM shutdown"); + } + + @Override + public void run() { + if (dbToClose != null && !dbToClose.isClosed()) { + dbToClose.shutdownCloseThread = null; + dbToClose.close(); + } + } + + } + + @Override + public void rollback() { + try { + for (WeakReference o : collections.values()) { + Object c = o.get(); + if (c != null && c instanceof BTreeMap) { + // reload tree + BTreeMap m = (BTreeMap) c; + m.tree = fetch(m.tree.getRecid()); + } + if (c != null && c instanceof BTreeSet) { + // reload tree + BTreeSet m = (BTreeSet) c; + m.map.tree = fetch(m.map.tree.getRecid()); + } + + } + } catch (IOException e) { + throw new IOError(e); + } + + } +} Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java?rev=1387533&view=auto ============================================================================== --- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java (added) +++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java Wed Sep 19 11:52:20 2012 @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hama.jdbm; + +import java.io.IOError; +import java.io.IOException; +import java.util.Comparator; +import java.util.Iterator; + +import javax.crypto.Cipher; + +/** + * Abstract class with common cache functionality + */ +public abstract class DBCache extends DBStore { + + static final int NUM_OF_DIRTY_RECORDS_BEFORE_AUTOCOMIT = 1024; + + static final byte NONE = 1; + static final byte MRU = 2; + static final byte WEAK = 3; + static final byte SOFT = 4; + static final byte HARD = 5; + + static final class DirtyCacheEntry { + long _recid; // TODO recid is already part of _hashDirties, so this field + // could be removed to save memory + Object _obj; + Serializer _serializer; + } + + /** + * Dirty status of _hash CacheEntry Values + */ + final protected LongHashMap _hashDirties = new LongHashMap(); + + private Serializer cachedDefaultSerializer = null; + + /** + * Construct a CacheRecordManager wrapping another DB and using a given cache + * policy. + */ + public DBCache(String filename, boolean readonly, + boolean transactionDisabled, Cipher cipherIn, Cipher cipherOut, + boolean useRandomAccessFile, boolean deleteFilesAfterClose, + boolean lockingDisabled) { + + super(filename, readonly, transactionDisabled, cipherIn, cipherOut, + useRandomAccessFile, deleteFilesAfterClose, lockingDisabled); + + } + + @SuppressWarnings("rawtypes") + @Override + public Serializer defaultSerializer() { + if (cachedDefaultSerializer == null) + cachedDefaultSerializer = super.defaultSerializer(); + return cachedDefaultSerializer; + } + + @Override + boolean needsAutoCommit() { + return super.needsAutoCommit() + || (transactionsDisabled && !commitInProgress && _hashDirties.size() > NUM_OF_DIRTY_RECORDS_BEFORE_AUTOCOMIT); + } + + @Override + public long insert(final A obj, final Serializer serializer, + final boolean disableCache) throws IOException { + checkNotClosed(); + + if (super.needsAutoCommit()) + commit(); + + if (disableCache) + return super.insert(obj, serializer, disableCache); + + // prealocate recid so we have something to return + final long recid = super.insert(PREALOCATE_OBJ, null, disableCache); + + // super.update(recid, obj,serializer); + + // return super.insert(obj,serializer,disableCache); + + // and create new dirty record for future update + final DirtyCacheEntry e = new DirtyCacheEntry(); + e._recid = recid; + e._obj = obj; + e._serializer = serializer; + _hashDirties.put(recid, e); + + return recid; + } + + @Override + public void commit() { + try { + commitInProgress = true; + updateCacheEntries(); + super.commit(); + } finally { + commitInProgress = false; + } + } + + @Override + public void rollback() { + cachedDefaultSerializer = null; + _hashDirties.clear(); + super.rollback(); + } + + private static final Comparator DIRTY_COMPARATOR = new Comparator() { + @Override + final public int compare(DirtyCacheEntry o1, DirtyCacheEntry o2) { + return (int) (o1._recid - o2._recid); + + } + }; + + /** + * Update all dirty cache objects to the underlying DB. + */ + @SuppressWarnings("unchecked") + protected void updateCacheEntries() { + try { + while (!_hashDirties.isEmpty()) { + // make defensive copy of values as _db.update() may trigger changes + // in db + // and this would modify dirties again + DirtyCacheEntry[] vals = new DirtyCacheEntry[_hashDirties.size()]; + Iterator iter = _hashDirties.valuesIterator(); + + for (int i = 0; i < vals.length; i++) { + vals[i] = iter.next(); + } + iter = null; + + java.util.Arrays.sort(vals, DIRTY_COMPARATOR); + + for (int i = 0; i < vals.length; i++) { + final DirtyCacheEntry entry = vals[i]; + vals[i] = null; + super.update(entry._recid, entry._obj, entry._serializer); + _hashDirties.remove(entry._recid); + + } + + // update may have triggered more records to be added into dirties, so + // repeat until all records are written. + } + } catch (IOException e) { + throw new IOError(e); + } + + } + +}