hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1387533 [3/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j...
Date Wed, 19 Sep 2012 11:52:24 GMT
Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheMRU.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheMRU.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheMRU.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheMRU.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+
+import javax.crypto.Cipher;
+
+/**
+ * A DB wrapping and caching another DB.
+ */
+public final class DBCacheMRU extends DBCache {
+
+  private static final boolean debug = false;
+
+  /**
+   * Cached object hashtable
+   */
+  protected LongHashMap<CacheEntry> _hash;
+
+  /**
+   * Maximum number of objects in the cache.
+   */
+  protected int _max;
+
+  /**
+   * Beginning of linked-list of cache elements. First entry is element which
+   * has been used least recently.
+   */
+  protected CacheEntry _first;
+
+  /**
+   * End of linked-list of cache elements. Last entry is element which has been
+   * used most recently.
+   */
+  protected CacheEntry _last;
+
+  /**
+   * Construct a CacheRecordManager wrapping another DB and using a given cache
+   * policy.
+   */
+  public DBCacheMRU(String filename, boolean readonly,
+      boolean transactionDisabled, Cipher cipherIn, Cipher cipherOut,
+      boolean useRandomAccessFile, boolean deleteFilesAfterClose,
+      int cacheMaxRecords, boolean lockingDisabled) {
+    super(filename, readonly, transactionDisabled, cipherIn, cipherOut,
+        useRandomAccessFile, deleteFilesAfterClose, lockingDisabled);
+
+    _hash = new LongHashMap<CacheEntry>(cacheMaxRecords);
+    _max = cacheMaxRecords;
+
+  }
+
+  public synchronized <A> A fetch(long recid, Serializer<A> serializer,
+      boolean disableCache) throws IOException {
+
+    if (disableCache)
+      return super.fetch(recid, serializer, disableCache);
+    else
+      return fetch(recid, serializer);
+  }
+
+  public synchronized void delete(long recid) throws IOException {
+    checkNotClosed();
+
+    super.delete(recid);
+    synchronized (_hash) {
+      CacheEntry entry = _hash.get(recid);
+      if (entry != null) {
+        removeEntry(entry);
+        _hash.remove(entry._recid);
+      }
+      _hashDirties.remove(recid);
+    }
+
+    if (super.needsAutoCommit())
+      commit();
+
+  }
+
+  public synchronized <A> void update(final long recid, final A obj,
+      final Serializer<A> serializer) throws IOException {
+    checkNotClosed();
+
+    synchronized (_hash) {
+
+      // remove entry if it already exists
+      CacheEntry entry = cacheGet(recid);
+      if (entry != null) {
+        _hash.remove(recid);
+        removeEntry(entry);
+      }
+
+      // check if entry is in dirties, in this case just update its object
+      DirtyCacheEntry e = _hashDirties.get(recid);
+      if (e != null) {
+        if (recid != e._recid)
+          throw new Error();
+        e._obj = obj;
+        e._serializer = serializer;
+        return;
+      }
+
+      // create new dirty entry
+      e = new DirtyCacheEntry();
+      e._recid = recid;
+      e._obj = obj;
+      e._serializer = serializer;
+      _hashDirties.put(recid, e);
+    }
+
+    if (super.needsAutoCommit())
+      commit();
+
+  }
+
+  public synchronized <A> A fetch(long recid, Serializer<A> serializer)
+      throws IOException {
+
+    checkNotClosed();
+
+    final CacheEntry entry = cacheGet(recid);
+    if (entry != null) {
+      return (A) entry._obj;
+    }
+
+    // check dirties
+    final DirtyCacheEntry entry2 = _hashDirties.get(recid);
+    if (entry2 != null) {
+      return (A) entry2._obj;
+    }
+
+    A value = super.fetch(recid, serializer);
+
+    if (super.needsAutoCommit())
+      commit();
+
+    // put record into MRU cache
+    cachePut(recid, value);
+
+    return value;
+  }
+
+  public synchronized void close() {
+
+    if (isClosed())
+      return;
+
+    updateCacheEntries();
+    super.close();
+    _hash = null;
+  }
+
+  public synchronized void rollback() {
+
+    // discard all cache entries since we don't know which entries
+    // where part of the transaction
+    synchronized (_hash) {
+      _hash.clear();
+      _first = null;
+      _last = null;
+    }
+
+    super.rollback();
+  }
+
+  /**
+   * Obtain an object in the cache
+   */
+  protected CacheEntry cacheGet(long key) {
+    synchronized (_hash) {
+      CacheEntry entry = _hash.get(key);
+      if (entry != null && _last != entry) {
+        // touch entry
+        removeEntry(entry);
+        addEntry(entry);
+      }
+      return entry;
+    }
+  }
+
+  /**
+   * Place an object in the cache.
+   * 
+   * @throws IOException
+   */
+  protected void cachePut(final long recid, final Object value)
+      throws IOException {
+    synchronized (_hash) {
+      CacheEntry entry = _hash.get(recid);
+      if (entry != null) {
+        entry._obj = value;
+        // touch entry
+        if (_last != entry) {
+          removeEntry(entry);
+          addEntry(entry);
+        }
+      } else {
+
+        if (_hash.size() >= _max) {
+          // purge and recycle entry
+          entry = purgeEntry();
+          entry._recid = recid;
+          entry._obj = value;
+        } else {
+          entry = new CacheEntry(recid, value);
+        }
+        addEntry(entry);
+        _hash.put(entry._recid, entry);
+      }
+    }
+  }
+
+  /**
+   * Add a CacheEntry. Entry goes at the end of the list.
+   */
+  protected void addEntry(CacheEntry entry) {
+    synchronized (_hash) {
+      if (_first == null) {
+        _first = entry;
+        _last = entry;
+      } else {
+        _last._next = entry;
+        entry._previous = _last;
+        _last = entry;
+      }
+    }
+  }
+
+  /**
+   * Remove a CacheEntry from linked list
+   */
+  protected void removeEntry(CacheEntry entry) {
+    synchronized (_hash) {
+      if (entry == _first) {
+        _first = entry._next;
+      }
+      if (_last == entry) {
+        _last = entry._previous;
+      }
+      CacheEntry previous = entry._previous;
+      CacheEntry next = entry._next;
+      if (previous != null) {
+        previous._next = next;
+      }
+      if (next != null) {
+        next._previous = previous;
+      }
+      entry._previous = null;
+      entry._next = null;
+    }
+  }
+
+  /**
+   * Purge least recently used object from the cache
+   * 
+   * @return recyclable CacheEntry
+   */
+  protected CacheEntry purgeEntry() {
+    synchronized (_hash) {
+      CacheEntry entry = _first;
+      if (entry == null)
+        return new CacheEntry(-1, null);
+
+      removeEntry(entry);
+      _hash.remove(entry._recid);
+      entry._obj = null;
+      return entry;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  static final class CacheEntry {
+
+    protected long _recid;
+    protected Object _obj;
+
+    protected CacheEntry _previous;
+    protected CacheEntry _next;
+
+    CacheEntry(long recid, Object obj) {
+      _recid = recid;
+      _obj = obj;
+    }
+
+  }
+
+  public void clearCache() {
+    if (debug)
+      System.err.println("DBCache: Clear cache");
+
+    // discard all cache entries since we don't know which entries
+    // where part of the transaction
+    synchronized (_hash) {
+      _hash.clear();
+      _first = null;
+      _last = null;
+
+      // clear dirties
+      updateCacheEntries();
+
+    }
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheRef.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheRef.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheRef.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheRef.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.IOException;
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.SoftReference;
+import java.lang.ref.WeakReference;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.Cipher;
+
+/**
+ * A DB wrapping and caching another DB.
+ * 
+ */
+public final class DBCacheRef extends DBCache {
+
+  private static final boolean debug = false;
+
+  /**
+   * If Soft Cache is enabled, this contains softly referenced clean entries. If
+   * entry became dirty, it is moved to _hash with limited size. This map is
+   * accessed from SoftCache Disposer thread, so all access must be synchronized
+   */
+  protected LongHashMap _softHash;
+
+  /**
+   * Reference queue used to collect Soft Cache entries
+   */
+  protected ReferenceQueue<ReferenceCacheEntry> _refQueue;
+
+  /**
+   * Thread in which Soft Cache references are disposed
+   */
+  protected Thread _softRefThread;
+
+  protected static AtomicInteger threadCounter = new AtomicInteger(0);
+
+  /** counter which counts number of insert since last 'action' */
+  protected int insertCounter = 0;
+
+  private final boolean _autoClearReferenceCacheOnLowMem;
+  private final byte _cacheType;
+
+  /**
+   * Construct a CacheRecordManager wrapping another DB and using a given cache
+   * policy.
+   */
+  public DBCacheRef(String filename, boolean readonly,
+      boolean transactionDisabled, Cipher cipherIn, Cipher cipherOut,
+      boolean useRandomAccessFile, boolean deleteFilesAfterClose,
+      byte cacheType, boolean cacheAutoClearOnLowMem, boolean lockingDisabled) {
+
+    super(filename, readonly, transactionDisabled, cipherIn, cipherOut,
+        useRandomAccessFile, deleteFilesAfterClose, lockingDisabled);
+
+    this._cacheType = cacheType;
+    _autoClearReferenceCacheOnLowMem = cacheAutoClearOnLowMem;
+
+    _softHash = new LongHashMap<ReferenceCacheEntry>();
+    _refQueue = new ReferenceQueue<ReferenceCacheEntry>();
+    _softRefThread = new Thread(new SoftRunnable(this, _refQueue),
+        "JDBM Soft Cache Disposer " + (threadCounter.incrementAndGet()));
+    _softRefThread.setDaemon(true);
+    _softRefThread.start();
+
+  }
+
+  void clearCacheIfLowOnMem() {
+
+    insertCounter = 0;
+
+    if (!_autoClearReferenceCacheOnLowMem)
+      return;
+
+    Runtime r = Runtime.getRuntime();
+    long max = r.maxMemory();
+    if (max == Long.MAX_VALUE)
+      return;
+
+    double free = r.freeMemory();
+    double total = r.totalMemory();
+    // We believe that free refers to total not max.
+    // Increasing heap size to max would increase to max
+    free = free + (max - total);
+
+    if (debug)
+      System.err.println("DBCache: freemem = " + free + " = " + (free / max)
+          + "%");
+
+    if (free < 1e7 || free * 4 < max)
+      clearCache();
+
+  }
+
+  public synchronized <A> A fetch(long recid, Serializer<A> serializer,
+      boolean disableCache) throws IOException {
+
+    if (disableCache)
+      return super.fetch(recid, serializer, disableCache);
+    else
+      return fetch(recid, serializer);
+  }
+
+  public synchronized void delete(long recid) throws IOException {
+    checkNotClosed();
+
+    super.delete(recid);
+    synchronized (_hashDirties) {
+      _hashDirties.remove(recid);
+    }
+    synchronized (_softHash) {
+      Object e = _softHash.remove(recid);
+      if (e != null && e instanceof ReferenceCacheEntry) {
+        ((ReferenceCacheEntry) e).clear();
+      }
+    }
+
+    if (needsAutoCommit())
+      commit();
+
+  }
+
+  public synchronized <A> void update(final long recid, A obj,
+      Serializer<A> serializer) throws IOException {
+    checkNotClosed();
+
+    synchronized (_softHash) {
+      // soft cache can not contain dirty objects
+      Object e = _softHash.remove(recid);
+      if (e != null && e instanceof ReferenceCacheEntry) {
+        ((ReferenceCacheEntry) e).clear();
+      }
+    }
+    synchronized (_hashDirties) {
+      // put into dirty cache
+      final DirtyCacheEntry e = new DirtyCacheEntry();
+      e._recid = recid;
+      e._obj = obj;
+      e._serializer = serializer;
+      _hashDirties.put(recid, e);
+    }
+
+    if (needsAutoCommit())
+      commit();
+
+  }
+
+  public synchronized <A> A fetch(long recid, Serializer<A> serializer)
+      throws IOException {
+    checkNotClosed();
+
+    synchronized (_softHash) {
+      Object e = _softHash.get(recid);
+      if (e != null) {
+
+        if (e instanceof ReferenceCacheEntry)
+          e = ((ReferenceCacheEntry) e).get();
+        if (e != null) {
+          return (A) e;
+        }
+      }
+    }
+
+    synchronized (_hashDirties) {
+      DirtyCacheEntry e2 = _hashDirties.get(recid);
+      if (e2 != null) {
+        return (A) e2._obj;
+      }
+    }
+
+    A value = super.fetch(recid, serializer);
+
+    if (needsAutoCommit())
+      commit();
+
+    synchronized (_softHash) {
+
+      if (_cacheType == SOFT)
+        _softHash.put(recid, new SoftCacheEntry(recid, value, _refQueue));
+      else if (_cacheType == WEAK)
+        _softHash.put(recid, new WeakCacheEntry(recid, value, _refQueue));
+      else
+        _softHash.put(recid, value);
+    }
+
+    return value;
+  }
+
+  public synchronized void close() {
+    checkNotClosed();
+
+    updateCacheEntries();
+    super.close();
+    _softHash = null;
+    _softRefThread.interrupt();
+  }
+
+  public synchronized void rollback() {
+    checkNotClosed();
+
+    // discard all cache entries since we don't know which entries
+    // where part of the transaction
+    synchronized (_softHash) {
+      Iterator<ReferenceCacheEntry> iter = _softHash.valuesIterator();
+      while (iter.hasNext()) {
+        ReferenceCacheEntry e = iter.next();
+        e.clear();
+      }
+      _softHash.clear();
+    }
+
+    super.rollback();
+  }
+
+  protected boolean isCacheEntryDirty(DirtyCacheEntry entry) {
+    return _hashDirties.get(entry._recid) != null;
+  }
+
+  protected void setCacheEntryDirty(DirtyCacheEntry entry, boolean dirty) {
+    if (dirty) {
+      _hashDirties.put(entry._recid, entry);
+    } else {
+      _hashDirties.remove(entry._recid);
+    }
+  }
+
+  interface ReferenceCacheEntry {
+    long getRecid();
+
+    void clear();
+
+    Object get();
+  }
+
+  @SuppressWarnings("unchecked")
+  static final class SoftCacheEntry extends SoftReference implements
+      ReferenceCacheEntry {
+    protected final long _recid;
+
+    public long getRecid() {
+      return _recid;
+    }
+
+    SoftCacheEntry(long recid, Object obj, ReferenceQueue queue) {
+      super(obj, queue);
+      _recid = recid;
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  static final class WeakCacheEntry extends WeakReference implements
+      ReferenceCacheEntry {
+    protected final long _recid;
+
+    public long getRecid() {
+      return _recid;
+    }
+
+    WeakCacheEntry(long recid, Object obj, ReferenceQueue queue) {
+      super(obj, queue);
+      _recid = recid;
+    }
+
+  }
+
+  /**
+   * Runs in separate thread and cleans SoftCache. Runnable auto exists when
+   * CacheRecordManager is GCed
+   * 
+   * @author Jan Kotek
+   */
+  static final class SoftRunnable implements Runnable {
+
+    private ReferenceQueue<ReferenceCacheEntry> entryQueue;
+    private WeakReference<DBCacheRef> db2;
+
+    public SoftRunnable(DBCacheRef db,
+        ReferenceQueue<ReferenceCacheEntry> entryQueue) {
+      this.db2 = new WeakReference<DBCacheRef>(db);
+      this.entryQueue = entryQueue;
+    }
+
+    public void run() {
+      while (true)
+        try {
+
+          // collect next item from cache,
+          // limit 10000 ms is to keep periodically checking if db was GCed
+          ReferenceCacheEntry e = (ReferenceCacheEntry) entryQueue
+              .remove(10000);
+
+          // check if db was GCed, cancel in that case
+          DBCacheRef db = db2.get();
+          if (db == null)
+            return;
+
+          if (e != null) {
+
+            synchronized (db._softHash) {
+              int counter = 0;
+              while (e != null) {
+                db._softHash.remove(e.getRecid());
+                e = (SoftCacheEntry) entryQueue.poll();
+                if (debug)
+                  counter++;
+              }
+              if (debug)
+                System.err.println("DBCache: " + counter
+                    + " objects released from ref cache.");
+            }
+          } else {
+            // check memory consumption every 10 seconds
+            db.clearCacheIfLowOnMem();
+
+          }
+
+        } catch (InterruptedException e) {
+          return;
+        } catch (Throwable e) {
+          // this thread must keep spinning,
+          // otherwise SoftCacheEntries would not be disposed
+          e.printStackTrace();
+        }
+    }
+
+  }
+
+  public void clearCache() {
+    if (debug)
+      System.err.println("DBCache: Clear cache");
+
+    synchronized (_softHash) {
+      if (_cacheType != HARD) {
+        Iterator<ReferenceCacheEntry> iter = _softHash.valuesIterator();
+        while (iter.hasNext()) {
+          ReferenceCacheEntry e = iter.next();
+          e.clear();
+        }
+      }
+      _softHash.clear();
+    }
+
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBMaker.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBMaker.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBMaker.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBMaker.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,365 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOError;
+import java.security.spec.KeySpec;
+
+import javax.crypto.Cipher;
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.IvParameterSpec;
+import javax.crypto.spec.PBEKeySpec;
+import javax.crypto.spec.SecretKeySpec;
+
+/**
+ * Class used to configure and create DB. It uses builder pattern.
+ */
+public final class DBMaker {
+
+  private byte cacheType = DBCache.MRU;
+  private int mruCacheSize = 2048;
+
+  private String location = null;
+
+  private boolean disableTransactions = false;
+  private boolean lockingDisabled = false;
+  private boolean readonly = false;
+  private String password = null;
+  private boolean useAES256Bit = true;
+  private boolean useRandomAccessFile = false;
+  private boolean autoClearRefCacheOnLowMem = true;
+  private boolean closeOnJVMExit = false;
+  private boolean deleteFilesAfterCloseFlag = false;
+
+  private DBMaker() {
+  }
+
+  /**
+   * Creates new DBMaker and sets file to load data from.
+   * 
+   * @param file to load data from
+   * @return new DBMaker
+   */
+  public static DBMaker openFile(String file) {
+    DBMaker m = new DBMaker();
+    m.location = file;
+    return m;
+  }
+
+  /**
+   * Creates new DBMaker which uses in memory store. Data will be lost after JVM
+   * exits.
+   * 
+   * @return new DBMaker
+   */
+  public static DBMaker openMemory() {
+    return new DBMaker();
+  }
+
+  /**
+   * Open store in zip file
+   * 
+   * @param zip file
+   * @return new DBMaker
+   */
+  public static DBMaker openZip(String zip) {
+    DBMaker m = new DBMaker();
+    m.location = "$$ZIP$$://" + zip;
+    return m;
+  }
+
+  static String isZipFileLocation(String location) {
+    String match = "$$ZIP$$://";
+    if (location.startsWith(match)) {
+      return location.substring(match.length());
+    }
+    return null;
+  }
+
+  /**
+   * Use WeakReference for cache. This cache does not improve performance much,
+   * but prevents JDBM from creating multiple instances of the same object.
+   * 
+   * @return this builder
+   */
+  public DBMaker enableWeakCache() {
+    cacheType = DBCache.WEAK;
+    return this;
+  }
+
+  /**
+   * Use SoftReference for cache. This cache greatly improves performance if you
+   * have enoguth memory. Instances in cache are Garbage Collected when memory
+   * gets low
+   * 
+   * @return this builder
+   */
+  public DBMaker enableSoftCache() {
+    cacheType = DBCache.SOFT;
+    return this;
+  }
+
+  /**
+   * Use hard reference for cache. This greatly improves performance if there is
+   * enought memory Hard cache has smaller memory overhead then Soft or Weak,
+   * because reference objects and queue does not have to be maintained
+   * 
+   * @return this builder
+   */
+  public DBMaker enableHardCache() {
+    cacheType = DBCache.HARD;
+    return this;
+  }
+
+  /**
+   * Use 'Most Recently Used' cache with limited size. Oldest instances are
+   * released from cache when new instances are fetched. This cache is not
+   * cleared by GC. Is good for systems with limited memory.
+   * <p/>
+   * Default size for MRU cache is 2048 records.
+   * 
+   * @return this builder
+   */
+  public DBMaker enableMRUCache() {
+    cacheType = DBCache.MRU;
+    return this;
+  }
+
+  /**
+   * 
+   * Sets 'Most Recently Used' cache size. This cache is activated by default
+   * with size 2048
+   * 
+   * @param cacheSize number of instances which will be kept in cache.
+   * @return this builder
+   */
+  public DBMaker setMRUCacheSize(int cacheSize) {
+    if (cacheSize < 0)
+      throw new IllegalArgumentException("Cache size is smaller than zero");
+    cacheType = DBCache.MRU;
+    mruCacheSize = cacheSize;
+    return this;
+  }
+
+  /**
+   * If reference (soft,weak or hard) cache is enabled, GC may not release
+   * references fast enough (or not at all in case of hard cache). So JDBM
+   * periodically checks amount of free heap memory. If free memory is less than
+   * 25% or 10MB, JDBM completely clears its reference cache to prevent possible
+   * memory issues.
+   * <p>
+   * Calling this method disables auto cache clearing when mem is low. And of
+   * course it can cause some out of memory exceptions.
+   * 
+   * @return this builder
+   */
+  public DBMaker disableCacheAutoClear() {
+    this.autoClearRefCacheOnLowMem = false;
+    return this;
+  }
+
+  /**
+   * Enabled storage encryption using AES cipher. JDBM supports both 128 bit and
+   * 256 bit encryption if JRE provides it. There are some restrictions on AES
+   * 256 bit and not all JREs have it by default.
+   * <p/>
+   * Storage can not be read (decrypted), unless the key is provided next time
+   * it is opened
+   * 
+   * @param password used to encrypt store
+   * @param useAES256Bit if true strong AES 256 bit encryption is used.
+   *          Otherwise more usual AES 128 bit is used.
+   * @return this builder
+   */
+  public DBMaker enableEncryption(String password, boolean useAES256Bit) {
+    this.password = password;
+    this.useAES256Bit = useAES256Bit;
+    return this;
+  }
+
+  /**
+   * Make DB readonly. Update/delete/insert operation will throw
+   * 'UnsupportedOperationException'
+   * 
+   * @return this builder
+   */
+  public DBMaker readonly() {
+    readonly = true;
+    return this;
+  }
+
+  /**
+   * Disable cache completely
+   * 
+   * @return this builder
+   */
+  public DBMaker disableCache() {
+    cacheType = DBCache.NONE;
+    return this;
+  }
+
+  /**
+   * Option to disable transaction (to increase performance at the cost of
+   * potential data loss). Transactions are enabled by default
+   * <p/>
+   * Switches off transactioning for the record manager. This means that a) a
+   * transaction log is not kept, and b) writes aren't synch'ed after every
+   * update. Writes are cached in memory and then flushed to disk every N
+   * writes. You may also flush writes manually by calling commit(). This is
+   * useful when batch inserting into a new database.
+   * <p/>
+   * When using this, database must be properly closed before JVM shutdown.
+   * Failing to do so may and WILL corrupt store.
+   * 
+   * @return this builder
+   */
+  public DBMaker disableTransactions() {
+    this.disableTransactions = true;
+    return this;
+  }
+
+  /**
+   * Disable file system based locking (for file systems that do not support
+   * it).
+   * 
+   * Locking is not supported by many remote or distributed file systems; such
+   * as Lustre and NFS. Attempts to perform locks will result in an IOException
+   * with the message "Function not implemented".
+   * 
+   * Disabling locking will avoid this issue, though of course it comes with all
+   * the issues of uncontrolled file access.
+   * 
+   * @return this builder
+   */
+  public DBMaker disableLocking() {
+    this.lockingDisabled = true;
+    return this;
+  }
+
+  /**
+   * By default JDBM uses mapped memory buffers to read from files. But this may
+   * behave strangely on some platforms. Safe alternative is to use old
+   * RandomAccessFile rather then mapped ByteBuffer. There is typically slower
+   * (pages needs to be copyed into memory on every write).
+   * 
+   * @return this builder
+   */
+  public DBMaker useRandomAccessFile() {
+    this.useRandomAccessFile = true;
+    return this;
+  }
+
+  /**
+   * Registers shutdown hook and close database on JVM exit, if it was not
+   * already closed;
+   * 
+   * @return this builder
+   */
+  public DBMaker closeOnExit() {
+    this.closeOnJVMExit = true;
+    return this;
+  }
+
+  /**
+   * Delete all storage files after DB is closed
+   * 
+   * @return this builder
+   */
+  public DBMaker deleteFilesAfterClose() {
+    this.deleteFilesAfterCloseFlag = true;
+    return this;
+  }
+
+  /**
+   * Opens database with settings earlier specified in this builder.
+   * 
+   * @return new DB
+   * @throws java.io.IOError if db could not be opened
+   */
+  public DB make() {
+
+    Cipher cipherIn = null;
+    Cipher cipherOut = null;
+    if (password != null)
+      try {
+        // initialize ciphers
+        // this code comes from stack owerflow
+        // http://stackoverflow.com/questions/992019/java-256bit-aes-encryption/992413#992413
+        byte[] salt = new byte[] { 3, -34, 123, 53, 78, 121, -12, -1, 45, -12,
+            -48, 89, 11, 100, 99, 8 };
+
+        SecretKeyFactory factory = SecretKeyFactory
+            .getInstance("PBKDF2WithHmacSHA1");
+        KeySpec spec = new PBEKeySpec(password.toCharArray(), salt, 1024,
+            useAES256Bit ? 256 : 128);
+        SecretKey tmp = factory.generateSecret(spec);
+        SecretKey secret = new SecretKeySpec(tmp.getEncoded(), "AES");
+
+        String transform = "AES/CBC/NoPadding";
+        IvParameterSpec params = new IvParameterSpec(salt);
+
+        cipherIn = Cipher.getInstance(transform);
+        cipherIn.init(Cipher.ENCRYPT_MODE, secret, params);
+
+        cipherOut = Cipher.getInstance(transform);
+        cipherOut.init(Cipher.DECRYPT_MODE, secret, params);
+
+        // sanity check, try with page size
+        byte[] data = new byte[Storage.PAGE_SIZE];
+        byte[] encData = cipherIn.doFinal(data);
+        if (encData.length != Storage.PAGE_SIZE)
+          throw new Error(
+              "Page size changed after encryption, make sure you use '/NoPadding'");
+        byte[] data2 = cipherOut.doFinal(encData);
+        for (int i = 0; i < data.length; i++) {
+          if (data[i] != data2[i])
+            throw new Error("Encryption provided by JRE does not work");
+        }
+
+      } catch (Exception e) {
+        throw new IOError(e);
+      }
+
+    DBAbstract db = null;
+
+    if (cacheType == DBCache.MRU) {
+      db = new DBCacheMRU(location, readonly, disableTransactions, cipherIn,
+          cipherOut, useRandomAccessFile, deleteFilesAfterCloseFlag,
+          mruCacheSize, lockingDisabled);
+    } else if (cacheType == DBCache.SOFT || cacheType == DBCache.HARD
+        || cacheType == DBCache.WEAK) {
+      db = new DBCacheRef(location, readonly, disableTransactions, cipherIn,
+          cipherOut, useRandomAccessFile, deleteFilesAfterCloseFlag, cacheType,
+          autoClearRefCacheOnLowMem, lockingDisabled);
+    } else if (cacheType == DBCache.NONE) {
+      db = new DBStore(location, readonly, disableTransactions, cipherIn,
+          cipherOut, useRandomAccessFile, deleteFilesAfterCloseFlag,
+          lockingDisabled);
+    } else {
+      throw new IllegalArgumentException("Unknown cache type: " + cacheType);
+    }
+
+    if (closeOnJVMExit) {
+      db.addShutdownHook();
+    }
+
+    return db;
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBStore.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBStore.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBStore.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBStore.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,912 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipOutputStream;
+
+import javax.crypto.Cipher;
+
+/**
+ * This class manages records, which are uninterpreted blobs of data. The set of
+ * operations is simple and straightforward: you communicate with the class
+ * using long "rowids" and byte[] data blocks. Rowids are returned on inserts
+ * and you can stash them away someplace safe to be able to get back to them.
+ * Data blocks can be as long as you wish, and may have lengths different from
+ * the original when updating.
+ * <p/>
+ * Operations are synchronized, so that only one of them will happen
+ * concurrently even if you hammer away from multiple threads. Operations are
+ * made atomic by keeping a transaction log which is recovered after a crash, so
+ * the operations specified by this interface all have ACID properties.
+ * <p/>
+ * You identify a file by just the name. The package attaches <tt>.db</tt> for
+ * the database file, and <tt>.lg</tt> for the transaction log. The transaction
+ * log is synchronized regularly and then restarted, so don't worry if you see
+ * the size going up and down.
+ */
+public class DBStore extends DBAbstract {
+
+  /**
+   * Version of storage. It should be safe to open lower versions, but engine
+   * should throw exception while opening new versions (as it contains
+   * unsupported features or serialization)
+   */
+  static final long STORE_FORMAT_VERSION = 1L;
+
+  /**
+   * Underlying file for store records.
+   */
+  private PageFile _file;
+
+  /**
+   * Page manager for physical manager.
+   */
+  private PageManager _pageman;
+
+  /**
+   * Physical row identifier manager.
+   */
+  private PhysicalRowIdManager _physMgr;
+
+  /**
+   * Indicated that store is opened for readonly operations If true, store will
+   * throw UnsupportedOperationException when update/insert/delete operation is
+   * called
+   */
+  private final boolean readonly;
+  final boolean transactionsDisabled;
+  private final boolean deleteFilesAfterClose;
+
+  private static final int AUTOCOMMIT_AFTER_N_PAGES = 1024 * 5;
+
+  boolean commitInProgress = false;
+
+  /**
+   * cipher used for decryption, may be null
+   */
+  private Cipher cipherOut;
+  /**
+   * cipher used for encryption, may be null
+   */
+  private Cipher cipherIn;
+  private boolean useRandomAccessFile;
+  private boolean lockingDisabled;
+
+  void checkCanWrite() {
+    if (readonly)
+      throw new UnsupportedOperationException(
+          "Could not write, store is opened as read-only");
+  }
+
+  /**
+   * Logigal to Physical row identifier manager.
+   */
+  private LogicalRowIdManager _logicMgr;
+
+  /**
+   * Static debugging flag
+   */
+  public static final boolean DEBUG = false;
+
+  static final long PREALOCATE_PHYS_RECID = Short.MIN_VALUE;
+
+  static final Object PREALOCATE_OBJ = new Object();
+
+  private final DataInputOutput buffer = new DataInputOutput();
+  private boolean bufferInUse = false;
+
+  private final String _filename;
+
+  public DBStore(String filename, boolean readonly,
+      boolean transactionDisabled, boolean lockingDisabled) throws IOException {
+    this(filename, readonly, transactionDisabled, null, null, false, false,
+        false);
+  }
+
+  /**
+   * Creates a record manager for the indicated file
+   * 
+   * @throws IOException when the file cannot be opened or is not a valid file
+   *           content-wise.
+   */
+  public DBStore(String filename, boolean readonly,
+      boolean transactionDisabled, Cipher cipherIn, Cipher cipherOut,
+      boolean useRandomAccessFile, boolean deleteFilesAfterClose,
+      boolean lockingDisabled) {
+    _filename = filename;
+    this.readonly = readonly;
+    this.transactionsDisabled = transactionDisabled;
+    this.cipherIn = cipherIn;
+    this.cipherOut = cipherOut;
+    this.useRandomAccessFile = useRandomAccessFile;
+    this.deleteFilesAfterClose = deleteFilesAfterClose;
+    this.lockingDisabled = lockingDisabled;
+    reopen();
+  }
+
+  private void reopen() {
+    try {
+      _file = new PageFile(_filename, readonly, transactionsDisabled, cipherIn,
+          cipherOut, useRandomAccessFile, lockingDisabled);
+      _pageman = new PageManager(_file);
+      _physMgr = new PhysicalRowIdManager(_file, _pageman);
+
+      _logicMgr = new LogicalRowIdManager(_file, _pageman);
+
+      long versionNumber = getRoot(STORE_VERSION_NUMBER_ROOT);
+      if (versionNumber > STORE_FORMAT_VERSION)
+        throw new IOException(
+            "Unsupported version of store. Please update JDBM. Minimal supported ver:"
+                + STORE_FORMAT_VERSION + ", store ver:" + versionNumber);
+      if (!readonly)
+        setRoot(STORE_VERSION_NUMBER_ROOT, STORE_FORMAT_VERSION);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  /**
+   * Closes the record manager.
+   * 
+   * @throws IOException when one of the underlying I/O operations fails.
+   */
+  @Override
+  public synchronized void close() {
+    checkNotClosed();
+    try {
+      super.close();
+      _pageman.close();
+      _file.close();
+      if (deleteFilesAfterClose)
+        _file.storage.deleteAllFiles();
+
+      _pageman = null;
+
+      _file = null;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public boolean isClosed() {
+    return _pageman == null;
+  }
+
+  @Override
+  public synchronized <A> long insert(final A obj,
+      final Serializer<A> serializer, final boolean disableCache)
+      throws IOException {
+    checkNotClosed();
+    checkCanWrite();
+
+    if (needsAutoCommit()) {
+      commit();
+    }
+
+    if (bufferInUse) {
+      // current reusable buffer is in use, have to fallback into creating new
+      // instances
+      DataInputOutput buffer2 = new DataInputOutput();
+      return insert2(obj, serializer, buffer2);
+    }
+
+    try {
+
+      bufferInUse = true;
+      return insert2(obj, serializer, buffer);
+    } finally {
+      bufferInUse = false;
+    }
+
+  }
+
+  boolean needsAutoCommit() {
+    return transactionsDisabled && !commitInProgress
+        && (_file.getDirtyPageCount() >= AUTOCOMMIT_AFTER_N_PAGES);
+  }
+
+  private <A> long insert2(A obj, Serializer<A> serializer, DataInputOutput buf)
+      throws IOException {
+    buf.reset();
+
+    long physRowId;
+    if (obj == PREALOCATE_OBJ) {
+      // if inserted record is PREALOCATE_OBJ , it gets special handling.
+      // it is inserted only into _logicMgr with special value to indicate null
+      // this is used to preallocate recid for lazy inserts in cache
+      physRowId = PREALOCATE_PHYS_RECID;
+    } else {
+      serializer.serialize(buf, obj);
+      if (buf.getPos() > RecordHeader.MAX_RECORD_SIZE) {
+        throw new IllegalArgumentException(
+            "Too big record. JDBM only supports record size up to: "
+                + RecordHeader.MAX_RECORD_SIZE + " bytes. Record size was: "
+                + buf.getPos());
+      }
+      physRowId = _physMgr.insert(buf.getBuf(), 0, buf.getPos());
+    }
+    final long recid = _logicMgr.insert(physRowId);
+
+    if (DEBUG) {
+      System.out.println("BaseRecordManager.insert() recid " + recid
+          + " length " + buf.getPos());
+    }
+
+    return compressRecid(recid);
+  }
+
+  @Override
+  public synchronized void delete(long logRowId) throws IOException {
+
+    checkNotClosed();
+    checkCanWrite();
+    if (logRowId <= 0) {
+      throw new IllegalArgumentException("Argument 'recid' is invalid: "
+          + logRowId);
+    }
+
+    if (needsAutoCommit()) {
+      commit();
+    }
+
+    if (DEBUG) {
+      System.out.println("BaseRecordManager.delete() recid " + logRowId);
+    }
+
+    logRowId = decompressRecid(logRowId);
+
+    long physRowId = _logicMgr.fetch(logRowId);
+    _logicMgr.delete(logRowId);
+    if (physRowId != PREALOCATE_PHYS_RECID) {
+      _physMgr.free(physRowId);
+    }
+  }
+
+  @Override
+  public synchronized <A> void update(long recid, A obj,
+      Serializer<A> serializer) throws IOException {
+    checkNotClosed();
+    checkCanWrite();
+    if (recid <= 0) {
+      throw new IllegalArgumentException("Argument 'recid' is invalid: "
+          + recid);
+    }
+
+    if (needsAutoCommit()) {
+      commit();
+    }
+
+    if (bufferInUse) {
+      // current reusable buffer is in use, have to create new instances
+      DataInputOutput buffer2 = new DataInputOutput();
+      update2(recid, obj, serializer, buffer2);
+      return;
+    }
+
+    try {
+      bufferInUse = true;
+      update2(recid, obj, serializer, buffer);
+    } finally {
+      bufferInUse = false;
+    }
+  }
+
+  private <A> void update2(long logRecid, final A obj,
+      final Serializer<A> serializer, final DataInputOutput buf)
+      throws IOException {
+
+    logRecid = decompressRecid(logRecid);
+
+    long physRecid = _logicMgr.fetch(logRecid);
+    if (physRecid == 0)
+      throw new IOException("Can not update, recid does not exist: " + logRecid);
+    buf.reset();
+    serializer.serialize(buf, obj);
+
+    if (DEBUG) {
+      System.out.println("BaseRecordManager.update() recid " + logRecid
+          + " length " + buf.getPos());
+    }
+
+    long newRecid = physRecid != PREALOCATE_PHYS_RECID ? _physMgr.update(
+        physRecid, buf.getBuf(), 0, buf.getPos()) :
+    // previous record was only virtual and does not actually exist, so make new
+    // insert
+        _physMgr.insert(buf.getBuf(), 0, buf.getPos());
+
+    _logicMgr.update(logRecid, newRecid);
+
+  }
+
+  @Override
+  public synchronized <A> A fetch(final long recid,
+      final Serializer<A> serializer) throws IOException {
+
+    checkNotClosed();
+    if (recid <= 0) {
+      throw new IllegalArgumentException("Argument 'recid' is invalid: "
+          + recid);
+    }
+
+    if (bufferInUse) {
+      // current reusable buffer is in use, have to create new instances
+      DataInputOutput buffer2 = new DataInputOutput();
+      return fetch2(recid, serializer, buffer2);
+    }
+    try {
+      bufferInUse = true;
+      return fetch2(recid, serializer, buffer);
+    } finally {
+      bufferInUse = false;
+    }
+  }
+
+  @Override
+  public synchronized <A> A fetch(long recid, Serializer<A> serializer,
+      boolean disableCache) throws IOException {
+    // we dont have any cache, so can ignore disableCache parameter
+    return fetch(recid, serializer);
+  }
+
+  private <A> A fetch2(long recid, final Serializer<A> serializer,
+      final DataInputOutput buf) throws IOException {
+
+    recid = decompressRecid(recid);
+
+    buf.reset();
+    long physLocation = _logicMgr.fetch(recid);
+    if (physLocation == 0) {
+      // throw new IOException("Record not found, recid: "+recid);
+      return null;
+    }
+    if (physLocation == PREALOCATE_PHYS_RECID) {
+      throw new InternalError("cache should prevent this!");
+    }
+
+    _physMgr.fetch(buf, physLocation);
+
+    if (DEBUG) {
+      System.out.println("BaseRecordManager.fetch() recid " + recid
+          + " length " + buf.getPos());
+    }
+    buf.resetForReading();
+    try {
+      return serializer.deserialize(buf); // TODO there should be write limit to
+                                          // throw EOFException
+    } catch (ClassNotFoundException e) {
+      throw new IOError(e);
+    }
+  }
+
+  byte[] fetchRaw(long recid) throws IOException {
+    recid = decompressRecid(recid);
+    long physLocation = _logicMgr.fetch(recid);
+    if (physLocation == 0) {
+      // throw new IOException("Record not found, recid: "+recid);
+      return null;
+    }
+    DataInputOutput i = new DataInputOutput();
+    _physMgr.fetch(i, physLocation);
+    return i.toByteArray();
+  }
+
+  @Override
+  public synchronized long getRoot(final byte id) {
+    checkNotClosed();
+
+    return _pageman.getFileHeader().fileHeaderGetRoot(id);
+  }
+
+  @Override
+  public synchronized void setRoot(final byte id, final long rowid) {
+    checkNotClosed();
+    checkCanWrite();
+
+    _pageman.getFileHeader().fileHeaderSetRoot(id, rowid);
+  }
+
+  @Override
+  public synchronized void commit() {
+    try {
+      commitInProgress = true;
+      checkNotClosed();
+      checkCanWrite();
+      /** flush free phys rows into pages */
+      _physMgr.commit();
+      _logicMgr.commit();
+
+      /** commit pages */
+      _pageman.commit();
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    } finally {
+      commitInProgress = false;
+    }
+  }
+
+  @Override
+  public synchronized void rollback() {
+    if (transactionsDisabled)
+      throw new IllegalAccessError(
+          "Transactions are disabled, can not rollback");
+
+    try {
+      checkNotClosed();
+      _physMgr.rollback();
+      _logicMgr.rollback();
+      _pageman.rollback();
+
+      super.rollback();
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @Override
+  public void copyToZip(String zipFile) {
+    try {
+      String zip = zipFile;
+      String zip2 = "db";
+      ZipOutputStream z = new ZipOutputStream(new FileOutputStream(zip));
+
+      // copy zero pages
+      {
+        String file = zip2 + 0;
+        z.putNextEntry(new ZipEntry(file));
+        z.write(JDBMUtils.encrypt(cipherIn, _pageman.getHeaderBufData()));
+        z.closeEntry();
+      }
+
+      // iterate over pages and create new file for each
+      for (long pageid = _pageman.getFirst(Magic.TRANSLATION_PAGE); pageid != 0; pageid = _pageman
+          .getNext(pageid)) {
+        PageIo page = _file.get(pageid);
+        String file = zip2 + pageid;
+        z.putNextEntry(new ZipEntry(file));
+        z.write(JDBMUtils.encrypt(cipherIn, page.getData()));
+        z.closeEntry();
+        _file.release(page);
+      }
+      for (long pageid = _pageman.getFirst(Magic.FREELOGIDS_PAGE); pageid != 0; pageid = _pageman
+          .getNext(pageid)) {
+        PageIo page = _file.get(pageid);
+        String file = zip2 + pageid;
+        z.putNextEntry(new ZipEntry(file));
+        z.write(JDBMUtils.encrypt(cipherIn, page.getData()));
+        z.closeEntry();
+        _file.release(page);
+      }
+
+      for (long pageid = _pageman.getFirst(Magic.USED_PAGE); pageid != 0; pageid = _pageman
+          .getNext(pageid)) {
+        PageIo page = _file.get(pageid);
+        String file = zip2 + pageid;
+        z.putNextEntry(new ZipEntry(file));
+        z.write(JDBMUtils.encrypt(cipherIn, page.getData()));
+        z.closeEntry();
+        _file.release(page);
+      }
+      for (long pageid = _pageman.getFirst(Magic.FREEPHYSIDS_PAGE); pageid != 0; pageid = _pageman
+          .getNext(pageid)) {
+        PageIo page = _file.get(pageid);
+        String file = zip2 + pageid;
+        z.putNextEntry(new ZipEntry(file));
+        z.write(JDBMUtils.encrypt(cipherIn, page.getData()));
+        z.closeEntry();
+        _file.release(page);
+      }
+      for (long pageid = _pageman.getFirst(Magic.FREEPHYSIDS_ROOT_PAGE); pageid != 0; pageid = _pageman
+          .getNext(pageid)) {
+        PageIo page = _file.get(pageid);
+        String file = zip2 + pageid;
+        z.putNextEntry(new ZipEntry(file));
+        z.write(JDBMUtils.encrypt(cipherIn, page.getData()));
+        z.closeEntry();
+        _file.release(page);
+      }
+
+      z.close();
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public synchronized void clearCache() {
+    // no cache
+  }
+
+  private long statisticsCountPages(short pageType) throws IOException {
+    long pageCounter = 0;
+
+    for (long pageid = _pageman.getFirst(pageType); pageid != 0; pageid = _pageman
+        .getNext(pageid)) {
+      pageCounter++;
+    }
+
+    return pageCounter;
+
+  }
+
+  @Override
+  public synchronized String calculateStatistics() {
+    checkNotClosed();
+
+    try {
+
+      final StringBuilder b = new StringBuilder();
+
+      // count pages
+      {
+
+        b.append("PAGES:\n");
+        long total = 0;
+        long pages = statisticsCountPages(Magic.USED_PAGE);
+        total += pages;
+        b.append("  " + pages + " used pages with size "
+            + JDBMUtils.formatSpaceUsage(pages * Storage.PAGE_SIZE) + "\n");
+        pages = statisticsCountPages(Magic.TRANSLATION_PAGE);
+        total += pages;
+        b.append("  " + pages + " record translation pages with size "
+            + JDBMUtils.formatSpaceUsage(pages * Storage.PAGE_SIZE) + "\n");
+        pages = statisticsCountPages(Magic.FREE_PAGE);
+        total += pages;
+        b.append("  " + pages + " free (unused) pages with size "
+            + JDBMUtils.formatSpaceUsage(pages * Storage.PAGE_SIZE) + "\n");
+        pages = statisticsCountPages(Magic.FREEPHYSIDS_PAGE);
+        total += pages;
+        b.append("  " + pages + " free (phys) pages with size "
+            + JDBMUtils.formatSpaceUsage(pages * Storage.PAGE_SIZE) + "\n");
+        pages = statisticsCountPages(Magic.FREELOGIDS_PAGE);
+        total += pages;
+        b.append("  " + pages + " free (logical) pages with size "
+            + JDBMUtils.formatSpaceUsage(pages * Storage.PAGE_SIZE) + "\n");
+        b.append("  Total number of pages is " + total + " with size "
+            + JDBMUtils.formatSpaceUsage(total * Storage.PAGE_SIZE) + "\n");
+
+      }
+      {
+        b.append("RECORDS:\n");
+
+        long recordCount = 0;
+        long freeRecordCount = 0;
+        long maximalRecordSize = 0;
+        long maximalAvailSizeDiff = 0;
+        long totalRecordSize = 0;
+        long totalAvailDiff = 0;
+
+        // count records
+        for (long pageid = _pageman.getFirst(Magic.TRANSLATION_PAGE); pageid != 0; pageid = _pageman
+            .getNext(pageid)) {
+          PageIo io = _file.get(pageid);
+
+          for (int i = 0; i < _logicMgr.ELEMS_PER_PAGE; i += 1) {
+            final int pos = Magic.PAGE_HEADER_SIZE + i
+                * Magic.PhysicalRowId_SIZE;
+            final long physLoc = io.pageHeaderGetLocation((short) pos);
+
+            if (physLoc == 0) {
+              freeRecordCount++;
+              continue;
+            }
+
+            if (physLoc == PREALOCATE_PHYS_RECID) {
+              continue;
+            }
+
+            recordCount++;
+
+            // get size
+            PageIo page = _file.get(physLoc >>> Storage.PAGE_SIZE_SHIFT);
+            final short physOffset = (short) (physLoc & Storage.OFFSET_MASK);
+            int availSize = RecordHeader.getAvailableSize(page, physOffset);
+            int currentSize = RecordHeader.getCurrentSize(page, physOffset);
+            _file.release(page);
+
+            maximalAvailSizeDiff = Math.max(maximalAvailSizeDiff, availSize
+                - currentSize);
+            maximalRecordSize = Math.max(maximalRecordSize, currentSize);
+            totalAvailDiff += availSize - currentSize;
+            totalRecordSize += currentSize;
+
+          }
+          _file.release(io);
+        }
+
+        b.append("  Contains " + recordCount + " records and "
+            + freeRecordCount + " free slots.\n");
+        b.append("  Total space occupied by data is "
+            + JDBMUtils.formatSpaceUsage(totalRecordSize) + "\n");
+        b.append("  Average data size in record is "
+            + JDBMUtils.formatSpaceUsage(Math.round(1D * totalRecordSize
+                / recordCount)) + "\n");
+        b.append("  Maximal data size in record is "
+            + JDBMUtils.formatSpaceUsage(maximalRecordSize) + "\n");
+        b.append("  Space wasted in record fragmentation is "
+            + JDBMUtils.formatSpaceUsage(totalAvailDiff) + "\n");
+        b.append("  Maximal space wasted in single record fragmentation is "
+            + JDBMUtils.formatSpaceUsage(maximalAvailSizeDiff) + "\n");
+      }
+
+      return b.toString();
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public synchronized void defrag(boolean sortCollections) {
+
+    try {
+      checkNotClosed();
+      checkCanWrite();
+      commit();
+      final String filename2 = _filename + "_defrag"
+          + System.currentTimeMillis();
+      final String filename1 = _filename;
+      DBStore db2 = new DBStore(filename2, false, true, cipherIn, cipherOut,
+          false, false, false);
+
+      // recreate logical file with original page layout
+      {
+        // find minimal logical pageid (logical pageids are negative)
+        LongHashMap<String> logicalPages = new LongHashMap<String>();
+        long minpageid = 0;
+        for (long pageid = _pageman.getFirst(Magic.TRANSLATION_PAGE); pageid != 0; pageid = _pageman
+            .getNext(pageid)) {
+          minpageid = Math.min(minpageid, pageid);
+          logicalPages.put(pageid, JDBMUtils.EMPTY_STRING);
+        }
+
+        // fill second db with logical pages
+        long pageCounter = 0;
+        for (long pageid = db2._pageman.allocate(Magic.TRANSLATION_PAGE); pageid >= minpageid; pageid = db2._pageman
+            .allocate(Magic.TRANSLATION_PAGE)) {
+          pageCounter++;
+          if (pageCounter % 1000 == 0)
+            db2.commit();
+        }
+
+        logicalPages = null;
+      }
+
+      // reinsert collections so physical records are located near each other
+      // iterate over named object recids, it is sorted with TreeSet
+      if (sortCollections) {
+        long nameRecid = getRoot(NAME_DIRECTORY_ROOT);
+        Collection<Long> recids = new TreeSet<Long>();
+        if (nameRecid != 0) {
+          HTree<String, Long> m = fetch(nameRecid);
+          recids.addAll(m.values());
+        }
+
+        for (Long namedRecid : recids) {
+          Object obj = fetch(namedRecid);
+          if (obj instanceof LinkedList) {
+            LinkedList.defrag(namedRecid, this, db2);
+          } else if (obj instanceof HTree) {
+            HTree.defrag(namedRecid, this, db2);
+          } else if (obj instanceof BTree) {
+            BTree.defrag(namedRecid, this, db2);
+          }
+        }
+      }
+
+      for (long pageid = _pageman.getFirst(Magic.TRANSLATION_PAGE); pageid != 0; pageid = _pageman
+          .getNext(pageid)) {
+        PageIo io = _file.get(pageid);
+
+        for (int i = 0; i < _logicMgr.ELEMS_PER_PAGE; i += 1) {
+          final int pos = Magic.PAGE_HEADER_SIZE + i * Magic.PhysicalRowId_SIZE;
+          if (pos > Short.MAX_VALUE)
+            throw new Error();
+
+          // write to new file
+          final long logicalRowId = ((-pageid) << Storage.PAGE_SIZE_SHIFT)
+              + (long) pos;
+
+          // read from logical location in second db,
+          // check if record was already inserted as part of collections
+          if (db2._pageman.getLast(Magic.TRANSLATION_PAGE) <= pageid
+              && db2._logicMgr.fetch(logicalRowId) != 0) {
+            // yes, this record already exists in second db
+            continue;
+          }
+
+          // get physical location in this db
+          final long physRowId = io.pageHeaderGetLocation((short) pos);
+
+          if (physRowId == 0)
+            continue;
+
+          if (physRowId == PREALOCATE_PHYS_RECID) {
+            db2._logicMgr.forceInsert(logicalRowId, physRowId);
+            continue;
+          }
+
+          // read from physical location at this db
+          DataInputOutput b = new DataInputOutput();
+          _physMgr.fetch(b, physRowId);
+          byte[] bb = b.toByteArray();
+
+          // force insert into other file, without decompressing logical id to
+          // external form
+          long physLoc = db2._physMgr.insert(bb, 0, bb.length);
+          db2._logicMgr.forceInsert(logicalRowId, physLoc);
+
+        }
+        _file.release(io);
+        db2.commit();
+      }
+      for (byte b = 0; b < Magic.FILE_HEADER_NROOTS; b++) {
+        db2.setRoot(b, getRoot(b));
+      }
+
+      db2.close();
+      _pageman.close();
+      _file.close();
+
+      List<File> filesToDelete = new ArrayList<File>();
+      // now rename old files
+      String[] exts = { StorageDiskMapped.IDR, StorageDiskMapped.DBR };
+      for (String ext : exts) {
+        String f1 = filename1 + ext;
+        String f2 = filename2 + "_OLD" + ext;
+
+        // first rename transaction log
+        File f1t = new File(f1 + StorageDisk.transaction_log_file_extension);
+        File f2t = new File(f2 + StorageDisk.transaction_log_file_extension);
+        f1t.renameTo(f2t);
+        filesToDelete.add(f2t);
+
+        // rename data files, iterate until file exist
+        for (int i = 0;; i++) {
+          File f1d = new File(f1 + "." + i);
+          if (!f1d.exists())
+            break;
+          File f2d = new File(f2 + "." + i);
+          f1d.renameTo(f2d);
+          filesToDelete.add(f2d);
+        }
+      }
+
+      // rename new files
+      for (String ext : exts) {
+        String f1 = filename2 + ext;
+        String f2 = filename1 + ext;
+
+        // first rename transaction log
+        File f1t = new File(f1 + StorageDisk.transaction_log_file_extension);
+        File f2t = new File(f2 + StorageDisk.transaction_log_file_extension);
+        f1t.renameTo(f2t);
+
+        // rename data files, iterate until file exist
+        for (int i = 0;; i++) {
+          File f1d = new File(f1 + "." + i);
+          if (!f1d.exists())
+            break;
+          File f2d = new File(f2 + "." + i);
+          f1d.renameTo(f2d);
+        }
+      }
+
+      for (File d : filesToDelete) {
+        d.delete();
+      }
+
+      reopen();
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  /**
+   * Insert data at forced logicalRowId, use only for defragmentation !!
+   * 
+   * @param logicalRowId
+   * @param data
+   * @throws IOException
+   */
+  void forceInsert(long logicalRowId, byte[] data) throws IOException {
+    logicalRowId = decompressRecid(logicalRowId);
+
+    if (needsAutoCommit()) {
+      commit();
+    }
+
+    long physLoc = _physMgr.insert(data, 0, data.length);
+    _logicMgr.forceInsert(logicalRowId, physLoc);
+  }
+
+  /**
+   * Returns number of records stored in database. Is used for unit tests
+   */
+  long countRecords() throws IOException {
+    long counter = 0;
+
+    long page = _pageman.getFirst(Magic.TRANSLATION_PAGE);
+    while (page != 0) {
+      PageIo io = _file.get(page);
+      for (int i = 0; i < _logicMgr.ELEMS_PER_PAGE; i += 1) {
+        int pos = Magic.PAGE_HEADER_SIZE + i * Magic.PhysicalRowId_SIZE;
+        if (pos > Short.MAX_VALUE)
+          throw new Error();
+
+        // get physical location
+        long physRowId = io.pageHeaderGetLocation((short) pos);
+
+        if (physRowId != 0)
+          counter += 1;
+      }
+      _file.release(io);
+      page = _pageman.getNext(page);
+    }
+    return counter;
+  }
+
+  private static int COMPRESS_RECID_PAGE_SHIFT = Integer.MIN_VALUE;
+  static {
+    int shift = 1;
+    while ((1 << shift) < LogicalRowIdManager.ELEMS_PER_PAGE)
+      shift++;
+    COMPRESS_RECID_PAGE_SHIFT = shift;
+  }
+
+  private final static long COMPRESS_RECID_OFFSET_MASK = 0xFFFFFFFFFFFFFFFFL >>> (64 - COMPRESS_RECID_PAGE_SHIFT);
+
+  /**
+   * Compress recid from physical form (block - offset) to (block - slot). This
+   * way resulting number is smaller and can be easier packed with LongPacker
+   */
+  static long compressRecid(final long recid) {
+    final long page = recid >>> Storage.PAGE_SIZE_SHIFT;
+    short offset = (short) (recid & Storage.OFFSET_MASK);
+
+    offset = (short) (offset - Magic.PAGE_HEADER_SIZE);
+    if (offset % Magic.PhysicalRowId_SIZE != 0)
+      throw new InternalError("recid not dividable " + Magic.PhysicalRowId_SIZE);
+    long slot = offset / Magic.PhysicalRowId_SIZE;
+
+    return (page << COMPRESS_RECID_PAGE_SHIFT) + slot;
+
+  }
+
+  static long decompressRecid(final long recid) {
+
+    final long page = recid >>> COMPRESS_RECID_PAGE_SHIFT;
+    final short offset = (short) ((recid & COMPRESS_RECID_OFFSET_MASK)
+        * Magic.PhysicalRowId_SIZE + Magic.PAGE_HEADER_SIZE);
+    return (page << Storage.PAGE_SIZE_SHIFT) + (long) offset;
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DataInputOutput.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DataInputOutput.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DataInputOutput.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DataInputOutput.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Utility class which implements DataInput and DataOutput on top of byte[]
+ * buffer with minimal overhead
+ */
+public final class DataInputOutput implements DataInput, DataOutput,
+    ObjectInput, ObjectOutput {
+
+  private int pos = 0;
+  private int count = 0;
+  private byte[] buf;
+
+  public DataInputOutput() {
+    buf = new byte[8];
+  }
+
+  public DataInputOutput(byte[] data) {
+    buf = data;
+    count = data.length;
+  }
+
+  public byte[] getBuf() {
+    return buf;
+  }
+
+  public int getPos() {
+    return pos;
+  }
+
+  public void reset() {
+    pos = 0;
+    count = 0;
+  }
+
+  public void resetForReading() {
+    count = pos;
+    pos = 0;
+  }
+
+  public void reset(byte[] b) {
+    pos = 0;
+    buf = b;
+    count = b.length;
+  }
+
+  public byte[] toByteArray() {
+    byte[] d = new byte[pos];
+    System.arraycopy(buf, 0, d, 0, pos);
+    return d;
+  }
+
+  public int available() {
+    return count - pos;
+  }
+
+  public void readFully(byte[] b) throws IOException {
+    readFully(b, 0, b.length);
+  }
+
+  public void readFully(byte[] b, int off, int len) throws IOException {
+    System.arraycopy(buf, pos, b, off, len);
+    pos += len;
+  }
+
+  public int skipBytes(int n) throws IOException {
+    pos += n;
+    return n;
+  }
+
+  public boolean readBoolean() throws IOException {
+    return buf[pos++] == 1;
+  }
+
+  public byte readByte() throws IOException {
+    return buf[pos++];
+  }
+
+  public int readUnsignedByte() throws IOException {
+    return buf[pos++] & 0xff;
+  }
+
+  public short readShort() throws IOException {
+    return (short) (((short) (buf[pos++] & 0xff) << 8) | ((short) (buf[pos++] & 0xff) << 0));
+
+  }
+
+  public int readUnsignedShort() throws IOException {
+    return (((int) (buf[pos++] & 0xff) << 8) | ((int) (buf[pos++] & 0xff) << 0));
+  }
+
+  public char readChar() throws IOException {
+    return (char) readInt();
+  }
+
+  public int readInt() throws IOException {
+    return (((buf[pos++] & 0xff) << 24) | ((buf[pos++] & 0xff) << 16)
+        | ((buf[pos++] & 0xff) << 8) | ((buf[pos++] & 0xff) << 0));
+
+  }
+
+  public long readLong() throws IOException {
+    return (((long) (buf[pos++] & 0xff) << 56)
+        | ((long) (buf[pos++] & 0xff) << 48)
+        | ((long) (buf[pos++] & 0xff) << 40)
+        | ((long) (buf[pos++] & 0xff) << 32)
+        | ((long) (buf[pos++] & 0xff) << 24)
+        | ((long) (buf[pos++] & 0xff) << 16)
+        | ((long) (buf[pos++] & 0xff) << 8) | ((long) (buf[pos++] & 0xff) << 0));
+
+  }
+
+  public float readFloat() throws IOException {
+    return Float.intBitsToFloat(readInt());
+  }
+
+  public double readDouble() throws IOException {
+    return Double.longBitsToDouble(readLong());
+  }
+
+  public String readLine() throws IOException {
+    return readUTF();
+  }
+
+  public String readUTF() throws IOException {
+    return Serialization.deserializeString(this);
+  }
+
+  /**
+   * make sure there will be enought space in buffer to write N bytes
+   */
+  private void ensureAvail(int n) {
+    if (pos + n >= buf.length) {
+      int newSize = Math.max(pos + n, buf.length * 2);
+      buf = Arrays.copyOf(buf, newSize);
+    }
+  }
+
+  public void write(int b) throws IOException {
+    ensureAvail(1);
+    buf[pos++] = (byte) b;
+  }
+
+  public void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  public void write(byte[] b, int off, int len) throws IOException {
+    ensureAvail(len);
+    System.arraycopy(b, off, buf, pos, len);
+    pos += len;
+  }
+
+  public void writeBoolean(boolean v) throws IOException {
+    ensureAvail(1);
+    buf[pos++] = (byte) (v ? 1 : 0);
+  }
+
+  public void writeByte(int v) throws IOException {
+    ensureAvail(1);
+    buf[pos++] = (byte) (v);
+  }
+
+  public void writeShort(int v) throws IOException {
+    ensureAvail(2);
+    buf[pos++] = (byte) (0xff & (v >> 8));
+    buf[pos++] = (byte) (0xff & (v >> 0));
+
+  }
+
+  public void writeChar(int v) throws IOException {
+    writeInt(v);
+  }
+
+  public void writeInt(int v) throws IOException {
+    ensureAvail(4);
+    buf[pos++] = (byte) (0xff & (v >> 24));
+    buf[pos++] = (byte) (0xff & (v >> 16));
+    buf[pos++] = (byte) (0xff & (v >> 8));
+    buf[pos++] = (byte) (0xff & (v >> 0));
+
+  }
+
+  public void writeLong(long v) throws IOException {
+    ensureAvail(8);
+    buf[pos++] = (byte) (0xff & (v >> 56));
+    buf[pos++] = (byte) (0xff & (v >> 48));
+    buf[pos++] = (byte) (0xff & (v >> 40));
+    buf[pos++] = (byte) (0xff & (v >> 32));
+    buf[pos++] = (byte) (0xff & (v >> 24));
+    buf[pos++] = (byte) (0xff & (v >> 16));
+    buf[pos++] = (byte) (0xff & (v >> 8));
+    buf[pos++] = (byte) (0xff & (v >> 0));
+  }
+
+  public void writeFloat(float v) throws IOException {
+    ensureAvail(4);
+    writeInt(Float.floatToIntBits(v));
+  }
+
+  public void writeDouble(double v) throws IOException {
+    ensureAvail(8);
+    writeLong(Double.doubleToLongBits(v));
+  }
+
+  public void writeBytes(String s) throws IOException {
+    writeUTF(s);
+  }
+
+  public void writeChars(String s) throws IOException {
+    writeUTF(s);
+  }
+
+  public void writeUTF(String s) throws IOException {
+    Serialization.serializeString(this, s);
+  }
+
+  /** helper method to write data directly from PageIo */
+  public void writeFromByteBuffer(ByteBuffer b, int offset, int length) {
+    ensureAvail(length);
+    b.position(offset);
+    b.get(buf, pos, length);
+    pos += length;
+  }
+
+  // temp var used for Externalizable
+  SerialClassInfo serializer;
+  // temp var used for Externalizable
+  Serialization.FastArrayList objectStack;
+
+  public Object readObject() throws ClassNotFoundException, IOException {
+    // is here just to implement ObjectInput
+    // Fake method which reads data from serializer.
+    // We could probably implement separate wrapper for this, but I want to safe
+    // class space
+    return serializer.deserialize(this, objectStack);
+  }
+
+  public int read() throws IOException {
+    // is here just to implement ObjectInput
+    return readUnsignedByte();
+  }
+
+  public int read(byte[] b) throws IOException {
+    // is here just to implement ObjectInput
+    readFully(b);
+    return b.length;
+  }
+
+  public int read(byte[] b, int off, int len) throws IOException {
+    // is here just to implement ObjectInput
+    readFully(b, off, len);
+    return len;
+  }
+
+  public long skip(long n) throws IOException {
+    // is here just to implement ObjectInput
+    pos += n;
+    return n;
+  }
+
+  public void close() throws IOException {
+    // is here just to implement ObjectInput
+    // do nothing
+  }
+
+  public void writeObject(Object obj) throws IOException {
+    // is here just to implement ObjectOutput
+    serializer.serialize(this, obj, objectStack);
+  }
+
+  public void flush() throws IOException {
+    // is here just to implement ObjectOutput
+    // do nothing
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTree.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTree.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTree.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTree.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,483 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hama.jdbm;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInput;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Persistent HashMap implementation for DB. Implemented as an H*Tree structure.
+ */
+@SuppressWarnings("rawtypes")
+public final class HTree<K, V> extends AbstractMap<K, V> {
+
+  final Serializer SERIALIZER = new Serializer<Object>() {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Object deserialize(DataInput ds2) throws IOException {
+      DataInputOutput ds = (DataInputOutput) ds2;
+      try {
+        int i = ds.readUnsignedByte();
+        if (i == SerializationHeader.HTREE_BUCKET) { // is HashBucket?
+          HTreeBucket ret = new HTreeBucket(HTree.this);
+          if (loadValues)
+            ret.readExternal(ds);
+
+          if (loadValues && ds.available() != 0)
+            throw new InternalError("bytes left: " + ds.available());
+          return ret;
+        } else if (i == SerializationHeader.HTREE_DIRECTORY) {
+          HTreeDirectory ret = new HTreeDirectory(HTree.this);
+          ret.readExternal(ds);
+          if (loadValues && ds.available() != 0)
+            throw new InternalError("bytes left: " + ds.available());
+          return ret;
+        } else {
+          throw new InternalError("Wrong HTree header: " + i);
+        }
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+
+    }
+
+    @Override
+    public void serialize(DataOutput out, Object obj) throws IOException {
+      if (obj instanceof HTreeBucket) {
+        out.write(SerializationHeader.HTREE_BUCKET);
+        HTreeBucket b = (HTreeBucket) obj;
+        b.writeExternal(out);
+      } else {
+        out.write(SerializationHeader.HTREE_DIRECTORY);
+        HTreeDirectory n = (HTreeDirectory) obj;
+        n.writeExternal(out);
+      }
+    }
+  };
+
+  /**
+   * Listeners which are notified about changes in records
+   */
+  protected RecordListener[] recordListeners = new RecordListener[0];
+
+  /**
+   * Serializer used to serialize index keys (optional)
+   */
+  protected Serializer<K> keySerializer;
+
+  /**
+   * Serializer used to serialize index values (optional)
+   */
+  protected Serializer<V> valueSerializer;
+  protected boolean readonly = false;
+  final long rootRecid;
+  DBAbstract db;
+  /** if false map contains only keys, used for set */
+  boolean hasValues = true;
+
+  /**
+   * counts structural changes in tree at runtume. Is here to support fail-fast
+   * behaviour.
+   */
+  int modCount;
+
+  /**
+   * indicates if values should be loaded during deserialization, set to true
+   * during defragmentation
+   */
+  private boolean loadValues = true;
+
+  public Serializer<K> getKeySerializer() {
+    return keySerializer;
+  }
+
+  public Serializer<V> getValueSerializer() {
+    return valueSerializer;
+  }
+
+  /**
+   * cache writing buffer, so it does not have to be allocated on each write
+   */
+  AtomicReference<DataInputOutput> writeBufferCache = new AtomicReference<DataInputOutput>();
+
+  /**
+   * Create a persistent hashtable.
+   */
+  @SuppressWarnings("unchecked")
+  public HTree(DBAbstract db, Serializer<K> keySerializer,
+      Serializer<V> valueSerializer, boolean hasValues) throws IOException {
+    this.keySerializer = keySerializer;
+    this.valueSerializer = valueSerializer;
+    this.db = db;
+    this.hasValues = hasValues;
+
+    HTreeDirectory<K, V> root = new HTreeDirectory<K, V>(this, (byte) 0);
+    root.setPersistenceContext(0);
+    this.rootRecid = db.insert(root, this.SERIALIZER, false);
+  }
+
+  /**
+   * Load a persistent hashtable
+   */
+  public HTree(DBAbstract db, long rootRecid, Serializer<K> keySerializer,
+      Serializer<V> valueSerializer, boolean hasValues) throws IOException {
+    this.db = db;
+    this.rootRecid = rootRecid;
+    this.keySerializer = keySerializer;
+    this.valueSerializer = valueSerializer;
+    this.hasValues = hasValues;
+  }
+
+  void setPersistenceContext(DBAbstract db) {
+    this.db = db;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V put(K key, V value) {
+    if (readonly)
+      throw new UnsupportedOperationException("readonly");
+    try {
+      if (key == null || value == null)
+        throw new NullPointerException("Null key or value");
+
+      V oldVal = (V) getRoot().put(key, value);
+      if (oldVal == null) {
+        modCount++;
+
+        // increase size
+        HTreeDirectory root = getRoot();
+        root.size++;
+        db.update(rootRecid, root, SERIALIZER);
+
+        for (RecordListener<K, V> r : recordListeners)
+          r.recordInserted(key, value);
+      } else {
+
+        // notify listeners
+        for (RecordListener<K, V> r : recordListeners)
+          r.recordUpdated(key, oldVal, value);
+      }
+
+      return oldVal;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V get(Object key) {
+    if (key == null)
+      return null;
+    try {
+      return getRoot().get((K) key);
+    } catch (ClassCastException e) {
+      return null;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V remove(Object key) {
+    if (readonly)
+      throw new UnsupportedOperationException("readonly");
+
+    try {
+      if (key == null)
+        return null;
+
+      V val = (V) getRoot().remove(key);
+      modCount++;
+
+      if (val != null) {
+        // decrease size
+        HTreeDirectory root = getRoot();
+        root.size--;
+        db.update(rootRecid, root, SERIALIZER);
+
+        for (RecordListener r : recordListeners)
+          r.recordRemoved(key, val);
+      }
+
+      return val;
+    } catch (ClassCastException e) {
+      return null;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    if (key == null)
+      return false;
+    // no need for locking, get is already locked
+    V v = get(key);
+    return v != null;
+  }
+
+  @Override
+  public void clear() {
+    try {
+      Iterator<K> keyIter = keys();
+      while (keyIter.hasNext()) {
+        keyIter.next();
+        keyIter.remove();
+      }
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  /**
+   * Returns an enumeration of the keys contained in this
+   */
+  public Iterator<K> keys() throws IOException {
+    return getRoot().keys();
+  }
+
+  public DBAbstract getRecordManager() {
+    return db;
+  }
+
+  /**
+   * add RecordListener which is notified about record changes
+   * 
+   * @param listener
+   */
+  public void addRecordListener(RecordListener<K, V> listener) {
+    recordListeners = Arrays
+        .copyOf(recordListeners, recordListeners.length + 1);
+    recordListeners[recordListeners.length - 1] = listener;
+  }
+
+  /**
+   * remove RecordListener which is notified about record changes
+   * 
+   * @param listener
+   */
+  @SuppressWarnings("unchecked")
+  public void removeRecordListener(RecordListener<K, V> listener) {
+    List l = Arrays.asList(recordListeners);
+    l.remove(listener);
+    recordListeners = (RecordListener[]) l.toArray(new RecordListener[1]);
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet() {
+    return _entrySet;
+  }
+
+  private Set<Entry<K, V>> _entrySet = new AbstractSet<Entry<K, V>>() {
+
+    protected Entry<K, V> newEntry(K k, V v) {
+      return new SimpleEntry<K, V>(k, v) {
+        private static final long serialVersionUID = 978651696969194154L;
+
+        @Override
+        public V setValue(V arg0) {
+          // put is already locked
+          HTree.this.put(getKey(), arg0);
+          return super.setValue(arg0);
+        }
+
+      };
+    }
+
+    @Override
+    public boolean add(java.util.Map.Entry<K, V> e) {
+      if (readonly)
+        throw new UnsupportedOperationException("readonly");
+      if (e.getKey() == null)
+        throw new NullPointerException("Can not add null key");
+      if (e.getValue().equals(get(e.getKey())))
+        return false;
+      HTree.this.put(e.getKey(), e.getValue());
+      return true;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean contains(Object o) {
+      if (o instanceof Entry) {
+        Entry<K, V> e = (java.util.Map.Entry<K, V>) o;
+
+        // get is already locked
+        if (e.getKey() != null && HTree.this.get(e.getKey()) != null)
+          return true;
+      }
+      return false;
+    }
+
+    @Override
+    public Iterator<java.util.Map.Entry<K, V>> iterator() {
+      try {
+        final Iterator<K> br = keys();
+        return new Iterator<Entry<K, V>>() {
+
+          @Override
+          public boolean hasNext() {
+            return br.hasNext();
+          }
+
+          @Override
+          public java.util.Map.Entry<K, V> next() {
+            K k = br.next();
+            return newEntry(k, get(k));
+          }
+
+          @Override
+          public void remove() {
+            if (readonly)
+              throw new UnsupportedOperationException("readonly");
+            br.remove();
+          }
+        };
+
+      } catch (IOException e) {
+        throw new IOError(e);
+      }
+
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean remove(Object o) {
+      if (readonly)
+        throw new UnsupportedOperationException("readonly");
+
+      if (o instanceof Entry) {
+        Entry<K, V> e = (java.util.Map.Entry<K, V>) o;
+
+        // check for nulls
+        if (e.getKey() == null || e.getValue() == null)
+          return false;
+        // get old value, must be same as item in entry
+        V v = get(e.getKey());
+        if (v == null || !e.getValue().equals(v))
+          return false;
+        HTree.this.remove(e.getKey());
+        return true;
+      }
+      return false;
+
+    }
+
+    @Override
+    public int size() {
+      try {
+        int counter = 0;
+        Iterator<K> it = keys();
+        while (it.hasNext()) {
+          it.next();
+          counter++;
+        }
+        return counter;
+      } catch (IOException e) {
+        throw new IOError(e);
+      }
+
+    }
+
+  };
+
+  @SuppressWarnings("unchecked")
+  HTreeDirectory<K, V> getRoot() {
+    // assumes that caller already holds read or write lock
+    try {
+      HTreeDirectory<K, V> root = (HTreeDirectory<K, V>) db.fetch(rootRecid,
+          this.SERIALIZER);
+      root.setPersistenceContext(rootRecid);
+      return root;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static HTree deserialize(DataInput is, Serialization ser)
+      throws IOException, ClassNotFoundException {
+    long rootRecid = LongPacker.unpackLong(is);
+    boolean hasValues = is.readBoolean();
+    Serializer keySerializer = (Serializer) ser.deserialize(is);
+    Serializer valueSerializer = (Serializer) ser.deserialize(is);
+
+    return new HTree(ser.db, rootRecid, keySerializer, valueSerializer,
+        hasValues);
+  }
+
+  @SuppressWarnings("unchecked")
+  void serialize(DataOutput out) throws IOException {
+    LongPacker.packLong(out, rootRecid);
+    out.writeBoolean(hasValues);
+    db.defaultSerializer().serialize(out, keySerializer);
+    db.defaultSerializer().serialize(out, valueSerializer);
+  }
+
+  @SuppressWarnings("unchecked")
+  static void defrag(Long recid, DBStore r1, DBStore r2) throws IOException {
+    // TODO should modCount be increased after defrag, revert or commit?
+    try {
+      byte[] data = r1.fetchRaw(recid);
+      r2.forceInsert(recid, data);
+      DataInput in = new DataInputStream(new ByteArrayInputStream(data));
+      HTree t = (HTree) r1.defaultSerializer().deserialize(in);
+      t.db = r1;
+      t.loadValues = false;
+
+      HTreeDirectory d = t.getRoot();
+      if (d != null) {
+        r2.forceInsert(t.rootRecid, r1.fetchRaw(t.rootRecid));
+        d.defrag(r1, r2);
+      }
+
+    } catch (ClassNotFoundException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @Override
+  public int size() {
+    return (int) getRoot().size;
+  }
+
+  public boolean hasValues() {
+    return hasValues;
+  }
+
+}



Mime
View raw message