hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1387533 [1/10] - in /hama/trunk: ./ core/ core/src/main/java/org/apache/hama/bsp/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ jdbm/src/ jdbm/src/main/ jdbm/src/main/java/ jdbm/src/main/java/org/ jdbm/src/main/java/org/apache/ j...
Date Wed, 19 Sep 2012 11:52:24 GMT
Author: tjungblut
Date: Wed Sep 19 11:52:20 2012
New Revision: 1387533

URL: http://svn.apache.org/viewvc?rev=1387533&view=rev
Log:
[HAMA-642]: Make GraphRunner disk based

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableComparator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableSerialization.java
    hama/trunk/jdbm/   (with props)
    hama/trunk/jdbm/pom.xml
    hama/trunk/jdbm/src/
    hama/trunk/jdbm/src/main/
    hama/trunk/jdbm/src/main/java/
    hama/trunk/jdbm/src/main/java/org/
    hama/trunk/jdbm/src/main/java/org/apache/
    hama/trunk/jdbm/src/main/java/org/apache/hama/
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectInputStream.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectOutputStream.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTree.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeLazyRecord.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeMap.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeNode.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeSet.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DB.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBAbstract.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCache.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheMRU.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBCacheRef.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBMaker.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DBStore.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/DataInputOutput.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTree.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeBucket.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeDirectory.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/HTreeSet.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/JDBMUtils.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LinkedList.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LogicalRowIdManager.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongHashMap.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/LongPacker.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Magic.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageFile.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageIo.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageManager.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PageTransactionManager.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalFreeRowIdManager.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/PhysicalRowIdManager.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordHeader.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/RecordListener.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerialClassInfo.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serialization.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/SerializationHeader.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Serializer.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/Storage.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDisk.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageDiskMapped.java
    hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/StorageMemory.java
    hama/trunk/jdbm/src/main/resources/
    hama/trunk/jdbm/src/test/
    hama/trunk/jdbm/src/test/java/
    hama/trunk/jdbm/src/test/java/org/
    hama/trunk/jdbm/src/test/java/org/apache/
    hama/trunk/jdbm/src/test/java/org/apache/hama/
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeBench.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeKeyCompressionTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeLeadingValuePackTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeMapNavigable2Test.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeMapNavigableSubMapExclusiveTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeMapNavigableSubMapInclusiveTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeMapNavigableTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeNodeTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/BTreeSetTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/ByteArrayComparator.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/CompactTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/ConcurrentBTreeReadTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/DBCacheMRUTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/DBCacheTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/DBMakerTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/DBTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/DataInputOutputTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/DefragTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/FileHeaderTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/FileLockTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/HTreeBucketTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/HTreeDirectoryTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/HTreeSetTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/HTreeTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/LinkedListTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/LogicalRowIdManagerTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/LongHashMapTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/LongTreeMap.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/MapInterfaceTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/ObjectOutputStreamTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/PageFileTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/PageIoTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/PageManagerTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/PageTransactionManagerTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/PhysicalFreeRowIdManagerTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/PhysicalRowIdManagerTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/RecordHeaderTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/RollbackTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/SerialClassInfoTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/Serialization2Bean.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/Serialization2Test.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/SerializationHeaderTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/SerializationTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/Serialized2DerivedBean.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/StorageDiskMappedTest.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/StreamCorrupted.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestCaseWithTestFile.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestInsertPerf.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestInsertUpdate.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestIssues.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestLargeData.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestLazyRecordsInTree.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestRollback.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/TestStress.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/UtilTT.java
    hama/trunk/jdbm/src/test/java/org/apache/hama/jdbm/UtilsTest.java
    hama/trunk/jdbm/src/test/resources/
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/graph/pom.xml
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/pom.xml
    hama/trunk/src/assemble/bin.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Wed Sep 19 11:52:20 2012
@@ -12,7 +12,8 @@ Release 0.6 (unreleased changes)
    HAMA-608: LocalRunner should honor the configured queues (tjungblut)
 
   IMPROVEMENTS
-
+ 
+   HAMA-642: Make GraphRunner disk based (tjungblut)
    HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon & tjungblut) 
    HAMA-557: Implement Checkpointing service in Hama (surajmenon)
    HAMA-587: Synchronization Client should provide API's to store and retrieve information among peers and BSPMaster (surajmenon)

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Wed Sep 19 11:52:20 2012
@@ -150,6 +150,11 @@
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-jdbm</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Wed Sep 19 11:52:20 2012
@@ -22,7 +22,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -40,7 +39,7 @@ public class BSPMessageBundle<M extends 
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
-  private HashMap<String, LinkedList<M>> messages = new HashMap<String, LinkedList<M>>();
+  private HashMap<String, ArrayList<M>> messages = new HashMap<String, ArrayList<M>>();
   private HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
 
   public BSPMessageBundle() {
@@ -54,8 +53,7 @@ public class BSPMessageBundle<M extends 
   public void addMessage(M message) {
     String className = message.getClass().getName();
     if (!messages.containsKey(className)) {
-      // use linked list because we're just iterating over them
-      LinkedList<M> list = new LinkedList<M>();
+      ArrayList<M> list = new ArrayList<M>();
       list.add(message);
       messages.put(className, list);
     } else {
@@ -67,7 +65,7 @@ public class BSPMessageBundle<M extends 
     // here we use an arraylist, because we know the size and outside may need
     // random access
     List<M> mergeList = new ArrayList<M>(messages.size());
-    for (LinkedList<M> c : messages.values()) {
+    for (ArrayList<M> c : messages.values()) {
       mergeList.addAll(c);
     }
     return mergeList;
@@ -78,9 +76,9 @@ public class BSPMessageBundle<M extends 
     // writes the k/v mapping size
     out.writeInt(messages.size());
     if (messages.size() > 0) {
-      for (Entry<String, LinkedList<M>> entry : messages.entrySet()) {
+      for (Entry<String, ArrayList<M>> entry : messages.entrySet()) {
         out.writeUTF(entry.getKey());
-        LinkedList<M> messageList = entry.getValue();
+        ArrayList<M> messageList = entry.getValue();
         out.writeInt(messageList.size());
         for (M msg : messageList) {
           msg.write(out);
@@ -93,14 +91,14 @@ public class BSPMessageBundle<M extends 
   @SuppressWarnings("unchecked")
   public void readFields(DataInput in) throws IOException {
     if (messages == null) {
-      messages = new HashMap<String, LinkedList<M>>();
+      messages = new HashMap<String, ArrayList<M>>();
     }
     int numMessages = in.readInt();
     if (numMessages > 0) {
       for (int entries = 0; entries < numMessages; entries++) {
         String className = in.readUTF();
         int size = in.readInt();
-        LinkedList<M> msgList = new LinkedList<M>();
+        ArrayList<M> msgList = new ArrayList<M>();
         messages.put(className, msgList);
 
         Class<M> clazz = null;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Wed Sep 19 11:52:20 2012
@@ -57,6 +57,10 @@ import org.apache.hama.ipc.JobSubmission
  * running BSP's.
  */
 public class LocalBSPRunner implements JobSubmissionProtocol {
+  
+  public static final String BSP_LOCAL_DIR = "bsp.local.dir";
+  public static final String BSP_LOCAL_TASKS_MAXIMUM = "bsp.local.tasks.maximum";
+  
   private static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
 
   private static final String IDENTIFIER = "localrunner";
@@ -82,9 +86,9 @@ public class LocalBSPRunner implements J
     super();
     this.conf = conf;
 
-    maxTasks = conf.getInt("bsp.local.tasks.maximum", 20);
+    maxTasks = conf.getInt(BSP_LOCAL_TASKS_MAXIMUM, 20);
 
-    String path = conf.get("bsp.local.dir");
+    String path = conf.get(BSP_LOCAL_DIR);
     if (path != null && !path.isEmpty()) {
       WORKING_DIR = path;
     }

Modified: hama/trunk/graph/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/graph/pom.xml (original)
+++ hama/trunk/graph/pom.xml Wed Sep 19 11:52:20 2012
@@ -43,6 +43,11 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hama</groupId>
+      <artifactId>hama-jdbm</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
   <build>
     <finalName>hama-graph-${project.version}</finalName>

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java Wed Sep 19 11:52:20 2012
@@ -87,7 +87,7 @@ public abstract class AbstractAggregator
   public IntWritable getTimesAggregated() {
     return new IntWritable(timesAggregated);
   }
-  
+
   @Override
   public String toString() {
     return "VAL=" + getValue();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Wed Sep 19 11:52:20 2012
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.Combiner;
@@ -72,7 +73,8 @@ public class GraphJob extends BSPJob {
   /**
    * Set the Vertex ID class for the job.
    */
-  public void setVertexIDClass(Class<? extends Writable> cls)
+  public void setVertexIDClass(
+      @SuppressWarnings("rawtypes") Class<? extends WritableComparable> cls)
       throws IllegalStateException {
     conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
   }
@@ -129,8 +131,8 @@ public class GraphJob extends BSPJob {
   }
 
   @Override
-  public void setPartitioner(@SuppressWarnings("rawtypes")
-  Class<? extends Partitioner> theClass) {
+  public void setPartitioner(
+      @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
     super.setPartitioner(theClass);
     conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Sep 19 11:52:20 2012
@@ -18,8 +18,10 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,11 +31,15 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
@@ -41,6 +47,8 @@ import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.jdbm.DB;
+import org.apache.hama.jdbm.DBMaker;
 import org.apache.hama.util.KeyValuePair;
 
 /**
@@ -50,8 +58,20 @@ import org.apache.hama.util.KeyValuePair
  * @param <E> the value type of an edge.
  * @param <M> the value type of a vertex.
  */
-public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable>
-    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
+public final class GraphJobRunner<V extends WritableComparable<V>, E extends Writable, M extends Writable>
+    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage>
+    implements Serializable {
+
+  public static final String HAMA_GRAPH_MULTI_STEP_PARTITIONING_INTERVAL = "hama.graph.partitioning.batch.bytes";
+  public static final String HAMA_GRAPH_SELF_REF = "hama.graph.self.ref";
+  public static final String HAMA_GRAPH_STORAGE_PATH = "hama.graph.storage.path";
+  public static final String HAMA_GRAPH_IN_MEMORY = "hama.graph.in.memory";
+  public static final String HAMA_GRAPH_VERTEX_CLASS = "hama.graph.vertex.class";
+  public static final String HAMA_GRAPH_MAX_ITERATION = "hama.graph.max.iteration";
+  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  public static final String GRAPH_REPAIR = "hama.graph.repair";
+
+  private static final long serialVersionUID = 1L;
 
   private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
 
@@ -61,14 +81,12 @@ public final class GraphJobRunner<V exte
   public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
 
-  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
-  public static final String GRAPH_REPAIR = "hama.graph.repair";
-
-  private Configuration conf;
-  private Combiner<M> combiner;
-  private Partitioner<V, M> partitioner;
+  private transient Configuration conf;
+  private transient Combiner<M> combiner;
+  private transient Partitioner<V, M> partitioner;
 
-  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E, M>>();
+  private transient Map<V, Vertex<V, E, M>> vertices;
+  private transient DB db;
 
   private boolean updated = true;
   private int globalUpdateCounts = 0;
@@ -78,14 +96,14 @@ public final class GraphJobRunner<V exte
   private int maxIteration = -1;
   private long iteration;
 
-  private Class<V> vertexIdClass;
-  private Class<M> vertexValueClass;
-  private Class<E> edgeValueClass;
-  private Class<Vertex<V, E, M>> vertexClass;
+  Class<V> vertexIdClass;
+  Class<M> vertexValueClass;
+  Class<E> edgeValueClass;
+  Class<Vertex<V, E, M>> vertexClass;
 
-  private AggregationRunner<V, E, M> aggregationRunner;
+  private transient AggregationRunner<V, E, M> aggregationRunner;
 
-  private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
+  private transient BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
   @Override
   public final void setup(
@@ -125,18 +143,32 @@ public final class GraphJobRunner<V exte
       // loop over vertices and do their computation
       doSuperstep(messages, peer);
     }
+
+    write(peer);
   }
 
   /**
    * Just write <ID as Writable, Value as Writable> pair as a result. Note that
    * this will also be executed when failure happened.
    */
+  private void write(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
+
+    Set<V> keySet = vertices.keySet();
+    for (V value : keySet) {
+      Vertex<V, E, M> e = vertices.get(value);
+      peer.write(e.getVertexID(), e.getValue());
+    }
+  }
+
   @Override
   public final void cleanup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
-      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+    // remove the DB files if they exist
+    if (db != null) {
+      db.close();
     }
   }
 
@@ -171,7 +203,9 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
-    for (Vertex<V, E, M> vertex : vertices.values()) {
+    Set<V> keySet = vertices.keySet();
+    for (V key : keySet) {
+      Vertex<V, E, M> vertex = vertices.get(key);
       List<M> msgs = messages.get(vertex.getVertexID());
       // If there are newly received messages, restart.
       if (vertex.isHalted() && msgs != null) {
@@ -207,7 +241,9 @@ public final class GraphJobRunner<V exte
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Vertex<V, E, M> vertex : vertices.values()) {
+    Set<V> keySet = vertices.keySet();
+    for (V key : keySet) {
+      Vertex<V, E, M> vertex = vertices.get(key);
       List<M> singletonList = Collections.singletonList(vertex.getValue());
       M lastValue = vertex.getValue();
       vertex.compute(singletonList.iterator());
@@ -222,8 +258,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
     this.peer = peer;
     this.conf = peer.getConfiguration();
-    maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
-        -1);
+    maxIteration = peer.getConfiguration().getInt(HAMA_GRAPH_MAX_ITERATION, -1);
 
     vertexIdClass = (Class<V>) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR,
         Text.class, Writable.class);
@@ -233,7 +268,7 @@ public final class GraphJobRunner<V exte
         GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
         Writable.class);
     vertexClass = (Class<Vertex<V, E, M>>) conf.getClass(
-        "hama.graph.vertex.class", Vertex.class);
+        HAMA_GRAPH_VERTEX_CLASS, Vertex.class);
 
     // set the classes statically, so we can save memory per message
     GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
@@ -254,6 +289,35 @@ public final class GraphJobRunner<V exte
           conf);
     }
 
+    if (!conf.getBoolean(HAMA_GRAPH_IN_MEMORY, false)) {
+
+      String storagePath = conf.get(HAMA_GRAPH_STORAGE_PATH);
+      if (storagePath == null) {
+        storagePath = "/tmp/graph_storage/";
+      }
+
+      storagePath += peer.getTaskId().toString();
+
+      try {
+        LocalFileSystem local = FileSystem.getLocal(conf);
+        local.mkdirs(new Path(storagePath));
+      } catch (IOException e) {
+        throw new RuntimeException("Could not create \"" + storagePath
+            + "\", nested exception was: ", e);
+      }
+
+      db = DBMaker.openFile(storagePath + "/graph.db").disableLocking()
+          .disableTransactions().deleteFilesAfterClose().useRandomAccessFile()
+          .make();
+
+      Comparator<V> writableComparator = new WritableComparator<V>();
+      vertices = db.createTreeMap("graph-db", writableComparator,
+          new WritableSerialization<V>(vertexIdClass),
+          new VertexWritableSerialization<Vertex<V, E, M>>(vertexClass, this));
+    } else {
+      vertices = new HashMap<V, Vertex<V, E, M>>();
+    }
+
     aggregationRunner = new AggregationRunner<V, E, M>();
     aggregationRunner.setupAggregators(peer);
   }
@@ -282,7 +346,7 @@ public final class GraphJobRunner<V exte
     final int partitioningSteps = partitionMultiSteps(peer, splitSize);
     final long interval = splitSize / partitioningSteps;
 
-    final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
+    final boolean selfReference = conf.getBoolean(HAMA_GRAPH_SELF_REF, false);
 
     /*
      * Several partitioning constants end
@@ -503,7 +567,7 @@ public final class GraphJobRunner<V exte
       }
 
       int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
-          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
+          HAMA_GRAPH_MULTI_STEP_PARTITIONING_INTERVAL, 20000000)) + 1;
 
       for (String peerName : peer.getAllPeerNames()) {
         MapWritable temp = new MapWritable();
@@ -518,7 +582,7 @@ public final class GraphJobRunner<V exte
     for (Entry<Writable, Writable> e : x.entrySet()) {
       multiSteps = ((IntWritable) e.getValue()).get();
     }
-    LOG.info(peer.getPeerName() + ": " + multiSteps);
+    LOG.info(peer.getPeerName() + ": Number of partitioning supersteps: " + multiSteps);
     return multiSteps;
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1387533&r1=1387532&r2=1387533&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Wed Sep 19 11:52:20 2012
@@ -17,6 +17,8 @@
  */
 package org.apache.hama.graph;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -24,13 +26,15 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Partitioner;
 
 public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable>
-    implements VertexInterface<V, E, M> {
+    implements VertexInterface<V, E, M>, Writable {
 
-  GraphJobRunner<?, ?, ?> runner;
+  transient GraphJobRunner<?, ?, ?> runner;
 
   private V vertexID;
   private M value;
@@ -218,6 +222,81 @@ public abstract class Vertex<V extends W
     return true;
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    votedToHalt = in.readBoolean();
+    vertexID = (V) ReflectionUtils.newInstance(runner.vertexIdClass, null);
+    vertexID.readFields(in);
+    if (in.readBoolean()) {
+      value = (M) ReflectionUtils.newInstance(runner.vertexValueClass, null);
+      value.readFields(in);
+    }
+
+    int edges = WritableUtils.readVInt(in);
+    ArrayList<Edge<V, E>> list = new ArrayList<Edge<V, E>>(edges);
+    for (int i = 0; i < edges; i++) {
+      V adjacentId = (V) ReflectionUtils
+          .newInstance(runner.vertexIdClass, null);
+      adjacentId.readFields(in);
+      E edgeValue = null;
+      if (in.readBoolean()) {
+        edgeValue = (E) ReflectionUtils
+            .newInstance(runner.edgeValueClass, null);
+        edgeValue.readFields(in);
+      }
+      list.add(new Edge<V, E>(adjacentId, edgeValue));
+    }
+
+    this.setEdges(list);
+    readInternal(in);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(votedToHalt);
+    V vId = getVertexID();
+    vId.write(out);
+    M val = getValue();
+    serializeNull(out, val);
+
+    List<Edge<V, E>> edges = getEdges();
+    int length = edges == null ? 0 : edges.size();
+    WritableUtils.writeVInt(out, length);
+    for (Edge<V, E> edge : edges) {
+      edge.getDestinationVertexID().write(out);
+      serializeNull(out, edge.getValue());
+    }
+
+    writeInternal(out);
+  }
+
+  /**
+   * A write method to let the user save its own state in the vertex class.
+   */
+  protected void writeInternal(DataOutput out) throws IOException {
+  }
+
+  /**
+   * A read method to let the user save its own state in the vertex class.
+   */
+  protected void readInternal(DataInput out) throws IOException {
+  }
+
+  /**
+   * Serializes data null-safe by writing a boolean that is only true when the
+   * given writable is not null.
+   */
+  protected static void serializeNull(DataOutput out, Writable writable)
+      throws IOException {
+    if (writable == null) {
+      out.writeBoolean(false);
+    } else {
+      out.writeBoolean(true);
+      writable.write(out);
+    }
+  }
+
   @Override
   public String toString() {
     return getVertexID() + (getValue() != null ? " = " + getValue() : "")

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Writable serialization for Hadoop objects with a class cache to reduce the
+ * amount of writing the classname instead of a single integer (with V
+ * compression, so most of the time it just takes a single byte). <br/>
+ * Enhanced by graph instance that can be passed.
+ */
+public final class VertexWritableSerialization<K extends Writable> extends
+    WritableSerialization<K> {
+
+  private static final long serialVersionUID = 1L;
+  @SuppressWarnings("rawtypes")
+  private GraphJobRunner runner;
+
+  public VertexWritableSerialization() {
+  }
+
+  public VertexWritableSerialization(Class<?> writableClazz,
+      @SuppressWarnings("rawtypes") GraphJobRunner runner) {
+    super(writableClazz);
+    Preconditions
+        .checkArgument(
+            Vertex.class.isAssignableFrom(writableClazz),
+            "Class "
+                + writableClazz
+                + " is not assignable from Vertex class! This class only serializes vertices!");
+    this.runner = runner;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public Writable newInstance() {
+    Writable newInstance = (Writable) ReflectionUtils.newInstance(
+        LOOKUP_LIST.get(writableClassIndex), null);
+    ((Vertex) newInstance).runner = this.runner;
+    return newInstance;
+  }
+}

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableComparator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableComparator.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableComparator.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableComparator.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+/**
+ * Comparator that uses Writable Comparable instance to compare.
+ */
+public final class WritableComparator<T extends Comparable<T>> implements
+    Comparator<T>, Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  public int compare(T o1, T o2) {
+    return o1.compareTo(o2);
+  }
+
+}

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableSerialization.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableSerialization.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableSerialization.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/WritableSerialization.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.jdbm.Serializer;
+
+/**
+ * Writable serialization for Hadoop objects with a class cache to reduce the
+ * amount of writing the classname instead of a single integer (with V
+ * compression, so most of the time it just takes a single byte).
+ */
+public class WritableSerialization<K extends Writable> implements
+    Serializer<K>, Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  // clazzname as string -> index in the lookuplist
+  protected transient final HashMap<String, Integer> CLAZZ_CACHE = new HashMap<String, Integer>();
+  protected transient final ArrayList<Class<?>> LOOKUP_LIST = new ArrayList<Class<?>>();
+  private transient int lastAssigned = 0;
+
+  protected transient Writable instance;
+  protected transient int writableClassIndex;
+
+  public WritableSerialization() {
+  }
+
+  public WritableSerialization(Class<?> writableClazz) {
+    Integer integer = CLAZZ_CACHE.get(writableClazz);
+    if (integer == null) {
+      integer = lastAssigned++;
+      CLAZZ_CACHE.put(writableClazz.getName(), integer);
+      LOOKUP_LIST.add(writableClazz);
+    }
+    this.writableClassIndex = integer;
+  }
+
+  @Override
+  public void serialize(DataOutput out, K obj) throws IOException {
+    WritableUtils.writeVInt(out, writableClassIndex);
+    obj.write(out);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public K deserialize(DataInput in) throws IOException, ClassNotFoundException {
+    writableClassIndex = WritableUtils.readVInt(in);
+    instance = newInstance();
+    instance.readFields(in);
+    return (K) instance;
+  }
+
+  public Writable newInstance() {
+    return (Writable) ReflectionUtils.newInstance(
+        LOOKUP_LIST.get(writableClassIndex), null);
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((instance == null) ? 0 : instance.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    @SuppressWarnings("rawtypes")
+    WritableSerialization other = (WritableSerialization) obj;
+    if (instance == null) {
+      if (other.instance != null)
+        return false;
+    } else if (!instance.equals(other.instance))
+      return false;
+    return true;
+  }
+
+}

Propchange: hama/trunk/jdbm/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Sep 19 11:52:20 2012
@@ -0,0 +1,9 @@
+*.iml
+target
+lib
+.classpath
+.project
+.settings
+logs
+docs
+

Added: hama/trunk/jdbm/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/pom.xml?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/pom.xml (added)
+++ hama/trunk/jdbm/pom.xml Wed Sep 19 11:52:20 2012
@@ -0,0 +1,53 @@
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <parent>
+    <groupId>org.apache.hama</groupId>
+    <artifactId>hama-parent</artifactId>
+    <version>0.6.0-SNAPSHOT</version>
+  </parent>
+
+  <modelVersion>4.0.0</modelVersion>
+  <groupId>org.apache.hama</groupId>
+  <artifactId>hama-jdbm</artifactId>
+  <name>jdbm-datastructures</name>
+  <version>0.6.0-SNAPSHOT</version>
+  <packaging>jar</packaging>
+  
+  <dependencies>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  
+  <build>
+    <finalName>hama-jdbm-${project.version}</finalName>
+    <pluginManagement>
+      <plugins>
+        <plugin>
+          <artifactId>maven-surefire-plugin</artifactId>
+        </plugin>
+      </plugins>
+    </pluginManagement>
+  </build>
+
+
+</project>

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectInputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectInputStream.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectInputStream.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectInputStream.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.util.ArrayList;
+
+/**
+ * An alternative to <code>java.io.ObjectInputStream</code> which uses more
+ * efficient serialization
+ */
+public final class AdvancedObjectInputStream extends DataInputStream implements
+    ObjectInput {
+
+  public AdvancedObjectInputStream(InputStream in) {
+    super(in);
+  }
+
+  @Override
+  public Object readObject() throws ClassNotFoundException, IOException {
+    // first read class data
+    ArrayList<SerialClassInfo.ClassInfo> info = SerialClassInfo.serializer
+        .deserialize(this);
+
+    Serialization ser = new Serialization(null, 0, info);
+    return ser.deserialize(this);
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectOutputStream.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectOutputStream.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectOutputStream.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/AdvancedObjectOutputStream.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutput;
+import java.io.OutputStream;
+import java.util.ArrayList;
+
+import org.apache.hama.jdbm.SerialClassInfo.ClassInfo;
+
+/**
+ * An alternative to <code>java.io.ObjectOutputStream</code> which uses more
+ * efficient serialization
+ */
+public final class AdvancedObjectOutputStream extends DataOutputStream
+    implements ObjectOutput {
+
+  public AdvancedObjectOutputStream(OutputStream out) {
+    super(out);
+  }
+
+  @Override
+  public void writeObject(Object obj) throws IOException {
+    ArrayList<ClassInfo> registered = new ArrayList<ClassInfo>();
+    Serialization ser = new Serialization(null, 0, registered);
+
+    byte[] data = ser.serialize(obj);
+    // write class info first
+    SerialClassInfo.serializer.serialize(this, registered);
+    // and write data
+    write(data);
+  }
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTree.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTree.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTree.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTree.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,630 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * B+Tree persistent indexing data structure. B+Trees are optimized for
+ * block-based, random I/O storage because they store multiple keys on one tree
+ * node (called <code>BTreeNode</code>). In addition, the leaf nodes directly
+ * contain (inline) small values associated with the keys, allowing a single (or
+ * sequential) disk read of all the values on the node.
+ * <p/>
+ * B+Trees are n-airy, yeilding log(N) search cost. They are self-balancing,
+ * preventing search performance degradation when the size of the tree grows.
+ * <p/>
+ * BTree stores its keys sorted. By default JDBM expects key to implement
+ * <code>Comparable</code> interface but user may supply its own
+ * <code>Comparator</code> at BTree creation time. Comparator is serialized and
+ * stored as part of BTree.
+ * <p/>
+ * The B+Tree allows traversing the keys in forward and reverse order using a
+ * TupleBrowser obtained from the browse() methods. But it is better to use
+ * <code>BTreeMap</code> wrapper which implements <code>SortedMap</code>
+ * interface
+ * <p/>
+ * This implementation does not directly support duplicate keys. It is possible
+ * to handle duplicates by grouping values using an ArrayList as value. This
+ * scenario is supported by JDBM serialization so there is no big performance
+ * penalty.
+ * <p/>
+ * There is no limit on key size or value size, but it is recommended to keep
+ * keys as small as possible to reduce disk I/O. If serialized value exceeds 32
+ * bytes, it is stored in separate record and tree contains only recid reference
+ * to it. BTree uses delta compression for its keys.
+ * 
+ */
+public final class BTree<K, V> {
+
+  private static final boolean DEBUG = false;
+
+  /**
+   * Default node size (number of entries per node)
+   */
+  public static final int DEFAULT_SIZE = 32; // TODO test optimal size, it has
+                                             // serious impact on sequencial
+                                             // write and read
+
+  /**
+   * Record manager used to persist changes in BTreeNodes
+   */
+  protected transient DBAbstract _db;
+
+  /**
+   * This BTree's record ID in the DB.
+   */
+  private transient long _recid;
+
+  /**
+   * Comparator used to index entries (optional)
+   */
+  protected Comparator<K> _comparator;
+
+  /**
+   * Serializer used to serialize index keys (optional)
+   */
+  protected Serializer<K> keySerializer;
+
+  /**
+   * Serializer used to serialize index values (optional)
+   */
+  protected Serializer<V> valueSerializer;
+
+  /**
+   * indicates if values should be loaded during deserialization, set to false
+   * during defragmentation
+   */
+  boolean loadValues = true;
+
+  /** if false map contains only keys, used for set */
+  boolean hasValues = true;
+
+  /**
+   * The number of structural modifications to the tree for fail fast iterators.
+   * This value is just for runtime, it is not persisted
+   */
+  transient int modCount = 0;
+
+  /**
+   * cached instance of an insert result, so we do not have to allocate new
+   * object on each insert
+   */
+  protected BTreeNode.InsertResult<K, V> insertResultReuse; // TODO investigate
+                                                            // performance
+                                                            // impact of
+                                                            // removing this
+
+  public Serializer<K> getKeySerializer() {
+    return keySerializer;
+  }
+
+  public Serializer<V> getValueSerializer() {
+    return valueSerializer;
+  }
+
+  /**
+   * Height of the B+Tree. This is the number of BTreeNodes you have to traverse
+   * to get to a leaf BTreeNode, starting from the root.
+   */
+  private int _height;
+
+  /**
+   * Recid of the root BTreeNode
+   */
+  private transient long _root;
+
+  /**
+   * Total number of entries in the BTree
+   */
+  protected volatile long _entries;
+
+  /**
+   * Serializer used for BTreeNodes of this tree
+   */
+  private transient BTreeNode<K, V> _nodeSerializer = new BTreeNode();
+  {
+    _nodeSerializer._btree = this;
+  }
+
+  /**
+   * Listeners which are notified about changes in records
+   */
+  protected RecordListener[] recordListeners = new RecordListener[0];
+
+  /**
+   * No-argument constructor used by serialization.
+   */
+  public BTree() {
+    // empty
+  }
+
+  /**
+   * Create a new persistent BTree
+   */
+  public static <K extends Comparable, V> BTree<K, V> createInstance(
+      DBAbstract db) throws IOException {
+    return createInstance(db, null, null, null, true);
+  }
+
+  /**
+   * Create a new persistent BTree
+   */
+  public static <K, V> BTree<K, V> createInstance(DBAbstract db,
+      Comparator<K> comparator, Serializer<K> keySerializer,
+      Serializer<V> valueSerializer, boolean hasValues) throws IOException {
+    BTree<K, V> btree;
+
+    if (db == null) {
+      throw new IllegalArgumentException("Argument 'db' is null");
+    }
+
+    btree = new BTree<K, V>();
+    btree._db = db;
+    btree._comparator = comparator;
+    btree.keySerializer = keySerializer;
+    btree.valueSerializer = valueSerializer;
+    btree.hasValues = hasValues;
+    btree._recid = db.insert(btree, btree.getRecordManager()
+        .defaultSerializer(), false);
+
+    return btree;
+  }
+
+  /**
+   * Load a persistent BTree.
+   * 
+   * @param db DB used to store the persistent btree
+   * @param recid Record id of the BTree
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, V> BTree<K, V> load(DBAbstract db, long recid)
+      throws IOException {
+    BTree<K, V> btree = (BTree<K, V>) db.fetch(recid);
+    btree._recid = recid;
+    btree._db = db;
+    btree._nodeSerializer = new BTreeNode<K, V>();
+    btree._nodeSerializer._btree = btree;
+    return btree;
+  }
+
+  /**
+   * Insert an entry in the BTree.
+   * <p/>
+   * The BTree cannot store duplicate entries. An existing entry can be replaced
+   * using the <code>replace</code> flag. If an entry with the same key already
+   * exists in the BTree, its value is returned.
+   * 
+   * @param key Insert key
+   * @param value Insert value
+   * @param replace Set to true to replace an existing key-value pair.
+   * @return Existing value, if any.
+   */
+  public V insert(final K key, final V value, final boolean replace)
+      throws IOException {
+    if (key == null) {
+      throw new IllegalArgumentException("Argument 'key' is null");
+    }
+    if (value == null) {
+      throw new IllegalArgumentException("Argument 'value' is null");
+    }
+    BTreeNode<K, V> rootNode = getRoot();
+
+    if (rootNode == null) {
+      // BTree is currently empty, create a new root BTreeNode
+      if (DEBUG) {
+        System.out.println("BTree.insert() new root BTreeNode");
+      }
+      rootNode = new BTreeNode<K, V>(this, key, value);
+      _root = rootNode._recid;
+      _height = 1;
+      _entries = 1;
+      _db.update(_recid, this);
+      modCount++;
+      // notifi listeners
+      for (RecordListener<K, V> l : recordListeners) {
+        l.recordInserted(key, value);
+      }
+      return null;
+    } else {
+      BTreeNode.InsertResult<K, V> insert = rootNode.insert(_height, key,
+          value, replace);
+      boolean dirty = false;
+      if (insert._overflow != null) {
+        // current root node overflowed, we replace with a new root node
+        if (DEBUG) {
+          System.out
+              .println("BTreeNode.insert() replace root BTreeNode due to overflow");
+        }
+        rootNode = new BTreeNode<K, V>(this, rootNode, insert._overflow);
+        _root = rootNode._recid;
+        _height += 1;
+        dirty = true;
+      }
+      if (insert._existing == null) {
+        _entries++;
+        modCount++;
+        dirty = true;
+      }
+      if (dirty) {
+        _db.update(_recid, this);
+      }
+      // notify listeners
+      for (RecordListener<K, V> l : recordListeners) {
+        if (insert._existing == null)
+          l.recordInserted(key, value);
+        else
+          l.recordUpdated(key, insert._existing, value);
+      }
+
+      // insert might have returned an existing value
+      V ret = insert._existing;
+      // zero out tuple and put it for reuse
+      insert._existing = null;
+      insert._overflow = null;
+      this.insertResultReuse = insert;
+      return ret;
+    }
+  }
+
+  /**
+   * Remove an entry with the given key from the BTree.
+   * 
+   * @param key Removal key
+   * @return Value associated with the key, or null if no entry with given key
+   *         existed in the BTree.
+   */
+  public V remove(K key) throws IOException {
+    if (key == null) {
+      throw new IllegalArgumentException("Argument 'key' is null");
+    }
+    BTreeNode<K, V> rootNode = getRoot();
+    if (rootNode == null) {
+      return null;
+    }
+    boolean dirty = false;
+    BTreeNode.RemoveResult<K, V> remove = rootNode.remove(_height, key);
+    if (remove._underflow && rootNode.isEmpty()) {
+      _height -= 1;
+      dirty = true;
+
+      _db.delete(_root);
+      if (_height == 0) {
+        _root = 0;
+      } else {
+        _root = rootNode.loadLastChildNode()._recid;
+      }
+    }
+    if (remove._value != null) {
+      _entries--;
+      modCount++;
+      dirty = true;
+    }
+    if (dirty) {
+      _db.update(_recid, this);
+    }
+    if (remove._value != null)
+      for (RecordListener<K, V> l : recordListeners)
+        l.recordRemoved(key, remove._value);
+    return remove._value;
+  }
+
+  /**
+   * Find the value associated with the given key.
+   * 
+   * @param key Lookup key.
+   * @return Value associated with the key, or null if not found.
+   */
+  public V get(K key) throws IOException {
+    if (key == null) {
+      throw new IllegalArgumentException("Argument 'key' is null");
+    }
+    BTreeNode<K, V> rootNode = getRoot();
+    if (rootNode == null) {
+      return null;
+    }
+
+    return rootNode.findValue(_height, key);
+  }
+
+  /**
+   * Find the value associated with the given key, or the entry immediately
+   * following this key in the ordered BTree.
+   * 
+   * @param key Lookup key.
+   * @return Value associated with the key, or a greater entry, or null if no
+   *         greater entry was found.
+   */
+  public BTreeTuple<K, V> findGreaterOrEqual(K key) throws IOException {
+    BTreeTuple<K, V> tuple;
+    BTreeTupleBrowser<K, V> browser;
+
+    if (key == null) {
+      // there can't be a key greater than or equal to "null"
+      // because null is considered an infinite key.
+      return null;
+    }
+
+    tuple = new BTreeTuple<K, V>(null, null);
+    browser = browse(key, true);
+    if (browser.getNext(tuple)) {
+      return tuple;
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Get a browser initially positioned at the beginning of the BTree.
+   * <p>
+   * <b> WARNING: If you make structural modifications to the BTree during
+   * browsing, you will get inconsistent browing results. </b>
+   * 
+   * @return Browser positionned at the beginning of the BTree.
+   */
+  @SuppressWarnings("unchecked")
+  public BTreeTupleBrowser<K, V> browse() throws IOException {
+    BTreeNode<K, V> rootNode = getRoot();
+    if (rootNode == null) {
+      return EMPTY_BROWSER;
+    }
+    return rootNode.findFirst();
+  }
+
+  /**
+   * Get a browser initially positioned just before the given key.
+   * <p>
+   * <b> WARNING: �If you make structural modifications to the BTree during
+   * browsing, you will get inconsistent browing results. </b>
+   * 
+   * @param key Key used to position the browser. If null, the browser will be
+   *          positionned after the last entry of the BTree. (Null is considered
+   *          to be an "infinite" key)
+   * @return Browser positionned just before the given key.
+   */
+  @SuppressWarnings("unchecked")
+  public BTreeTupleBrowser<K, V> browse(final K key, final boolean inclusive)
+      throws IOException {
+    BTreeNode<K, V> rootNode = getRoot();
+    if (rootNode == null) {
+      return EMPTY_BROWSER;
+    }
+    BTreeTupleBrowser<K, V> browser = rootNode.find(_height, key, inclusive);
+    return browser;
+  }
+
+  /**
+   * Return the persistent record identifier of the BTree.
+   */
+  public long getRecid() {
+    return _recid;
+  }
+
+  /**
+   * Return the root BTreeNode, or null if it doesn't exist.
+   */
+  BTreeNode<K, V> getRoot() throws IOException {
+    if (_root == 0) {
+      return null;
+    }
+    BTreeNode<K, V> root = _db.fetch(_root, _nodeSerializer);
+    if (root != null) {
+      root._recid = _root;
+      root._btree = this;
+    }
+    return root;
+  }
+
+  static BTree readExternal(DataInput in, Serialization ser)
+      throws IOException, ClassNotFoundException {
+    BTree tree = new BTree();
+    tree._db = ser.db;
+    tree._height = in.readInt();
+    tree._recid = in.readLong();
+    tree._root = in.readLong();
+    tree._entries = in.readLong();
+    tree.hasValues = in.readBoolean();
+    tree._comparator = (Comparator) ser.deserialize(in);
+    tree.keySerializer = (Serializer) ser.deserialize(in);
+    tree.valueSerializer = (Serializer) ser.deserialize(in);
+    return tree;
+  }
+
+  public void writeExternal(DataOutput out) throws IOException {
+    out.writeInt(_height);
+    out.writeLong(_recid);
+    out.writeLong(_root);
+    out.writeLong(_entries);
+    out.writeBoolean(hasValues);
+    _db.defaultSerializer().serialize(out, _comparator);
+    _db.defaultSerializer().serialize(out, keySerializer);
+    _db.defaultSerializer().serialize(out, valueSerializer);
+  }
+
+  /**
+   * Copyes tree from one db to other, defragmenting it allong the way
+   * 
+   * @param recid
+   * @param r1
+   * @param r2
+   * @throws IOException
+   */
+  public static void defrag(long recid, DBStore r1, DBStore r2)
+      throws IOException {
+    try {
+      byte[] data = r1.fetchRaw(recid);
+      r2.forceInsert(recid, data);
+      DataInput in = new DataInputOutput(data);
+      BTree t = (BTree) r1.defaultSerializer().deserialize(in);
+      t.loadValues = false;
+      t._db = r1;
+      t._nodeSerializer = new BTreeNode(t, false);
+
+      BTreeNode p = t.getRoot();
+      if (p != null) {
+        r2.forceInsert(t._root, r1.fetchRaw(t._root));
+        p.defrag(r1, r2);
+      }
+
+    } catch (ClassNotFoundException e) {
+      throw new IOError(e);
+    }
+  }
+
+  /**
+   * Browser returning no element.
+   */
+  private static final BTreeTupleBrowser EMPTY_BROWSER = new BTreeTupleBrowser() {
+
+    public boolean getNext(BTreeTuple tuple) {
+      return false;
+    }
+
+    public boolean getPrevious(BTreeTuple tuple) {
+      return false;
+    }
+
+    public void remove(Object key) {
+      throw new IndexOutOfBoundsException();
+    }
+  };
+
+  /**
+   * add RecordListener which is notified about record changes
+   * 
+   * @param listener
+   */
+  public void addRecordListener(RecordListener<K, V> listener) {
+    recordListeners = Arrays
+        .copyOf(recordListeners, recordListeners.length + 1);
+    recordListeners[recordListeners.length - 1] = listener;
+  }
+
+  /**
+   * remove RecordListener which is notified about record changes
+   * 
+   * @param listener
+   */
+  public void removeRecordListener(RecordListener<K, V> listener) {
+    List l = Arrays.asList(recordListeners);
+    l.remove(listener);
+    recordListeners = (RecordListener[]) l.toArray(new RecordListener[1]);
+  }
+
+  public DBAbstract getRecordManager() {
+    return _db;
+  }
+
+  public Comparator<K> getComparator() {
+    return _comparator;
+  }
+
+  /**
+   * Deletes all BTreeNodes in this BTree
+   */
+  public void clear() throws IOException {
+    BTreeNode<K, V> rootNode = getRoot();
+    if (rootNode != null)
+      rootNode.delete();
+    _entries = 0;
+    modCount++;
+  }
+
+  /**
+   * Used for debugging and testing only. Populates the 'out' list with the
+   * recids of all child nodes in the BTree.
+   * 
+   * @param out
+   * @throws IOException
+   */
+  void dumpChildNodeRecIDs(List<Long> out) throws IOException {
+    BTreeNode<K, V> root = getRoot();
+    if (root != null) {
+      out.add(root._recid);
+      root.dumpChildNodeRecIDs(out, _height);
+    }
+  }
+
+  public boolean hasValues() {
+    return hasValues;
+  }
+
+  /**
+   * Browser to traverse a collection of tuples. The browser allows for forward
+   * and reverse order traversal.
+   * 
+   * 
+   */
+  static interface BTreeTupleBrowser<K, V> {
+
+    /**
+     * Get the next tuple.
+     * 
+     * @param tuple Tuple into which values are copied.
+     * @return True if values have been copied in tuple, or false if there is no
+     *         next tuple.
+     */
+    boolean getNext(BTree.BTreeTuple<K, V> tuple) throws IOException;
+
+    /**
+     * Get the previous tuple.
+     * 
+     * @param tuple Tuple into which values are copied.
+     * @return True if values have been copied in tuple, or false if there is no
+     *         previous tuple.
+     */
+    boolean getPrevious(BTree.BTreeTuple<K, V> tuple) throws IOException;
+
+    /**
+     * Remove an entry with given key, and increases browsers expectedModCount
+     * This method is here to support 'ConcurrentModificationException' on Map
+     * interface.
+     * 
+     * @param key
+     */
+    void remove(K key) throws IOException;
+
+  }
+
+  /**
+   * Tuple consisting of a key-value pair.
+   */
+  static final class BTreeTuple<K, V> {
+
+    K key;
+
+    V value;
+
+    BTreeTuple() {
+      // empty
+    }
+
+    BTreeTuple(K key, V value) {
+      this.key = key;
+      this.value = value;
+    }
+
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeLazyRecord.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeLazyRecord.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeLazyRecord.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeLazyRecord.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOError;
+import java.io.IOException;
+
+/**
+ * An record lazily loaded from store. This is used in BTree/HTree to store big
+ * records outside of index tree
+ */
+public final class BTreeLazyRecord<E> {
+
+  /**
+   * if value in tree is serialized in more bytes, it is stored as separate
+   * record outside of tree This value must be always smaller than 250
+   */
+  static final int MAX_INTREE_RECORD_SIZE = 32;
+  static final int NULL = 255;
+  static final int LAZY_RECORD = 254;
+
+  private E value = null;
+  private DBAbstract db;
+  private Serializer<E> serializer;
+  final long recid;
+
+  BTreeLazyRecord(DBAbstract db, long recid, Serializer<E> serializer) {
+    this.db = db;
+    this.recid = recid;
+    this.serializer = serializer;
+  }
+
+  E get() {
+    if (value != null)
+      return value;
+    try {
+      value = db.fetch(recid, serializer);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+    return value;
+  }
+
+  void delete() {
+    try {
+      db.delete(recid);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+    value = null;
+    serializer = null;
+    db = null;
+  }
+
+  /**
+   * Serialier used to insert already serialized data into store
+   */
+  @SuppressWarnings("rawtypes")
+  static final Serializer FAKE_SERIALIZER = new Serializer<Object>() {
+
+    @Override
+    public void serialize(DataOutput out, Object obj) throws IOException {
+      byte[] data = (byte[]) obj;
+      out.write(data);
+    }
+
+    @Override
+    public Object deserialize(DataInput in) throws IOException,
+        ClassNotFoundException {
+      throw new UnsupportedOperationException();
+    }
+  };
+
+  static Object fastDeser(DataInputOutput in, Serializer<?> serializer,
+      int expectedSize) throws IOException, ClassNotFoundException {
+    // we should propably copy data for deserialization into separate buffer and
+    // pass it to Serializer
+    // but to make it faster, Serializer will operate directly on top of buffer.
+    // and we check that it readed correct number of bytes.
+    int origAvail = in.available();
+    if (origAvail == 0)
+      throw new InternalError(); // is backed up by byte[] buffer, so there
+                                 // should be always avail bytes
+    Object ret = serializer.deserialize(in);
+    // check than valueSerializer did not read more bytes, if yes it readed
+    // bytes from next record
+    int readed = origAvail - in.available();
+    if (readed > expectedSize)
+      throw new IOException("Serializer readed more bytes than is record size.");
+    else if (readed != expectedSize) {
+      // deserializer did not readed all bytes, unussual but valid.
+      // Skip some to get into correct position
+      for (int ii = 0; ii < expectedSize - readed; ii++)
+        in.readUnsignedByte();
+    }
+    return ret;
+  }
+
+}

Added: hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeMap.java
URL: http://svn.apache.org/viewvc/hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeMap.java?rev=1387533&view=auto
==============================================================================
--- hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeMap.java (added)
+++ hama/trunk/jdbm/src/main/java/org/apache/hama/jdbm/BTreeMap.java Wed Sep 19 11:52:20 2012
@@ -0,0 +1,609 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.jdbm;
+
+import java.io.IOError;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.AbstractSet;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentNavigableMap;
+
+/**
+ * Wrapper for <code>BTree</code> which implements
+ * <code>ConcurrentNavigableMap</code> interface
+ * 
+ * @param <K> key type
+ * @param <V> value type
+ */
+public final class BTreeMap<K, V> extends AbstractMap<K, V> implements
+    NavigableMap<K, V> {
+
+  protected BTree<K, V> tree;
+
+  protected final K fromKey;
+
+  protected final K toKey;
+
+  protected final boolean readonly;
+
+  protected NavigableSet<K> keySet2;
+  private final boolean toInclusive;
+  private final boolean fromInclusive;
+
+  public BTreeMap(BTree<K, V> tree, boolean readonly) {
+    this(tree, readonly, null, false, null, false);
+  }
+
+  protected BTreeMap(BTree<K, V> tree, boolean readonly, K fromKey,
+      boolean fromInclusive, K toKey, boolean toInclusive) {
+    this.tree = tree;
+    this.fromKey = fromKey;
+    this.fromInclusive = fromInclusive;
+    this.toKey = toKey;
+    this.toInclusive = toInclusive;
+    this.readonly = readonly;
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet() {
+    return _entrySet;
+  }
+
+  private final Set<java.util.Map.Entry<K, V>> _entrySet = new AbstractSet<Entry<K, V>>() {
+
+    protected Entry<K, V> newEntry(K k, V v) {
+      return new SimpleEntry<K, V>(k, v) {
+        private static final long serialVersionUID = 978651696969194154L;
+
+        @Override
+        public V setValue(V arg0) {
+          BTreeMap.this.put(getKey(), arg0);
+          return super.setValue(arg0);
+        }
+
+      };
+    }
+
+    @Override
+    public boolean add(java.util.Map.Entry<K, V> e) {
+      if (readonly)
+        throw new UnsupportedOperationException("readonly");
+
+      try {
+        if (e.getKey() == null)
+          throw new NullPointerException("Can not add null key");
+        if (!inBounds(e.getKey()))
+          throw new IllegalArgumentException("key outside of bounds");
+        return tree.insert(e.getKey(), e.getValue(), true) == null;
+      } catch (IOException e1) {
+        throw new IOError(e1);
+      }
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean contains(Object o) {
+
+      if (o instanceof Entry) {
+        Entry<K, V> e = (java.util.Map.Entry<K, V>) o;
+        try {
+          if (!inBounds(e.getKey()))
+            return false;
+          if (e.getKey() != null && tree.get(e.getKey()) != null)
+            return true;
+        } catch (IOException e1) {
+          throw new IOError(e1);
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public Iterator<java.util.Map.Entry<K, V>> iterator() {
+      try {
+        final BTree.BTreeTupleBrowser<K, V> br = fromKey == null ? tree
+            .browse() : tree.browse(fromKey, fromInclusive);
+        return new Iterator<Entry<K, V>>() {
+
+          private Entry<K, V> next;
+          private K lastKey;
+
+          void ensureNext() {
+            try {
+              BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+              if (br.getNext(t) && inBounds(t.key))
+                next = newEntry(t.key, t.value);
+              else
+                next = null;
+            } catch (IOException e1) {
+              throw new IOError(e1);
+            }
+          }
+
+          {
+            ensureNext();
+          }
+
+          @Override
+          public boolean hasNext() {
+            return next != null;
+          }
+
+          @Override
+          public java.util.Map.Entry<K, V> next() {
+            if (next == null)
+              throw new NoSuchElementException();
+            Entry<K, V> ret = next;
+            lastKey = ret.getKey();
+            // move to next position
+            ensureNext();
+            return ret;
+          }
+
+          @Override
+          public void remove() {
+            if (readonly)
+              throw new UnsupportedOperationException("readonly");
+
+            if (lastKey == null)
+              throw new IllegalStateException();
+            try {
+              br.remove(lastKey);
+              lastKey = null;
+            } catch (IOException e1) {
+              throw new IOError(e1);
+            }
+
+          }
+        };
+
+      } catch (IOException e) {
+        throw new IOError(e);
+      }
+
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public boolean remove(Object o) {
+      if (readonly)
+        throw new UnsupportedOperationException("readonly");
+
+      if (o instanceof Entry) {
+        Entry<K, V> e = (java.util.Map.Entry<K, V>) o;
+        try {
+          // check for nulls
+          if (e.getKey() == null || e.getValue() == null)
+            return false;
+          if (!inBounds(e.getKey()))
+            throw new IllegalArgumentException("out of bounds");
+          // get old value, must be same as item in entry
+          V v = get(e.getKey());
+          if (v == null || !e.getValue().equals(v))
+            return false;
+          V v2 = tree.remove(e.getKey());
+          return v2 != null;
+        } catch (IOException e1) {
+          throw new IOError(e1);
+        }
+      }
+      return false;
+
+    }
+
+    @Override
+    public int size() {
+      return BTreeMap.this.size();
+    }
+
+    @Override
+    public void clear() {
+      if (fromKey != null || toKey != null)
+        super.clear();
+      else
+        try {
+          tree.clear();
+        } catch (IOException e) {
+          throw new IOError(e);
+        }
+    }
+
+  };
+
+  public boolean inBounds(K e) {
+    if (fromKey == null && toKey == null)
+      return true;
+
+    Comparator comp = comparator();
+    if (comp == null)
+      comp = JDBMUtils.COMPARABLE_COMPARATOR;
+
+    if (fromKey != null) {
+      final int compare = comp.compare(e, fromKey);
+      if (compare < 0)
+        return false;
+      if (!fromInclusive && compare == 0)
+        return false;
+    }
+    if (toKey != null) {
+      final int compare = comp.compare(e, toKey);
+      if (compare > 0)
+        return false;
+      if (!toInclusive && compare == 0)
+        return false;
+    }
+    return true;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V get(Object key) {
+    try {
+      if (key == null)
+        return null;
+      if (!inBounds((K) key))
+        return null;
+      return tree.get((K) key);
+    } catch (ClassCastException e) {
+      return null;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public V remove(Object key) {
+    if (readonly)
+      throw new UnsupportedOperationException("readonly");
+
+    try {
+      if (key == null || tree.get((K) key) == null)
+        return null;
+      if (!inBounds((K) key))
+        throw new IllegalArgumentException("out of bounds");
+
+      return tree.remove((K) key);
+    } catch (ClassCastException e) {
+      return null;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public V put(K key, V value) {
+    if (readonly)
+      throw new UnsupportedOperationException("readonly");
+
+    try {
+      if (key == null || value == null)
+        throw new NullPointerException("Null key or value");
+      if (!inBounds(key))
+        throw new IllegalArgumentException("out of bounds");
+      return tree.insert(key, value, true);
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public void clear() {
+    entrySet().clear();
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean containsKey(Object key) {
+    if (key == null)
+      return false;
+    try {
+      if (!inBounds((K) key))
+        return false;
+      V v = tree.get((K) key);
+      return v != null;
+    } catch (IOException e) {
+      throw new IOError(e);
+    } catch (ClassCastException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public Comparator<? super K> comparator() {
+    return tree._comparator;
+  }
+
+  @Override
+  public K firstKey() {
+    if (isEmpty())
+      return null;
+    try {
+
+      BTree.BTreeTupleBrowser<K, V> b = fromKey == null ? tree.browse() : tree
+          .browse(fromKey, fromInclusive);
+      BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+      b.getNext(t);
+      return t.key;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public K lastKey() {
+    if (isEmpty())
+      return null;
+    try {
+      BTree.BTreeTupleBrowser<K, V> b = toKey == null ? tree.browse(null, true)
+          : tree.browse(toKey, false);
+      BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+      b.getPrevious(t);
+      if (!toInclusive && toKey != null) {
+        // make sure we wont return last key
+        Comparator c = comparator();
+        if (c == null)
+          c = JDBMUtils.COMPARABLE_COMPARATOR;
+        if (c.compare(t.key, toKey) == 0)
+          b.getPrevious(t);
+      }
+      return t.key;
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public NavigableMap<K, V> headMap(K toKey2, boolean inclusive) {
+    K toKey3 = JDBMUtils.min(this.toKey, toKey2, comparator());
+    boolean inclusive2 = toKey3 == toKey ? toInclusive : inclusive;
+    return new BTreeMap<K, V>(tree, readonly, this.fromKey, this.fromInclusive,
+        toKey3, inclusive2);
+  }
+
+  @Override
+  public NavigableMap<K, V> headMap(K toKey) {
+    return headMap(toKey, false);
+  }
+
+  @Override
+  public Entry<K, V> lowerEntry(K key) {
+    K k = lowerKey(key);
+    return k == null ? null : new SimpleEntry<K, V>(k, get(k));
+  }
+
+  @Override
+  public K lowerKey(K key) {
+    if (isEmpty())
+      return null;
+    K key2 = JDBMUtils.min(key, toKey, comparator());
+    try {
+      BTree.BTreeTupleBrowser<K, V> b = tree.browse(key2, true);
+      BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+      b.getPrevious(t);
+
+      return t.key;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+
+  }
+
+  @Override
+  public Entry<K, V> floorEntry(K key) {
+    K k = floorKey(key);
+    return k == null ? null : new SimpleEntry<K, V>(k, get(k));
+
+  }
+
+  @Override
+  public K floorKey(K key) {
+    if (isEmpty())
+      return null;
+
+    K key2 = JDBMUtils.max(key, fromKey, comparator());
+    try {
+      BTree.BTreeTupleBrowser<K, V> b = tree.browse(key2, true);
+      BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+      b.getNext(t);
+      Comparator comp = comparator();
+      if (comp == null)
+        comp = JDBMUtils.COMPARABLE_COMPARATOR;
+      if (comp.compare(t.key, key2) == 0)
+        return t.key;
+
+      b.getPrevious(t);
+      b.getPrevious(t);
+      return t.key;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public Entry<K, V> ceilingEntry(K key) {
+    K k = ceilingKey(key);
+    return k == null ? null : new SimpleEntry<K, V>(k, get(k));
+  }
+
+  @Override
+  public K ceilingKey(K key) {
+    if (isEmpty())
+      return null;
+    K key2 = JDBMUtils.min(key, toKey, comparator());
+
+    try {
+      BTree.BTreeTupleBrowser<K, V> b = tree.browse(key2, true);
+      BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+      b.getNext(t);
+      return t.key;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public Entry<K, V> higherEntry(K key) {
+    K k = higherKey(key);
+    return k == null ? null : new SimpleEntry<K, V>(k, get(k));
+  }
+
+  @Override
+  public K higherKey(K key) {
+    if (isEmpty())
+      return null;
+
+    K key2 = JDBMUtils.max(key, fromKey, comparator());
+
+    try {
+      BTree.BTreeTupleBrowser<K, V> b = tree.browse(key2, false);
+      BTree.BTreeTuple<K, V> t = new BTree.BTreeTuple<K, V>();
+      b.getNext(t);
+      return t.key;
+
+    } catch (IOException e) {
+      throw new IOError(e);
+    }
+  }
+
+  @Override
+  public Entry<K, V> firstEntry() {
+    K k = firstKey();
+    return k == null ? null : new SimpleEntry<K, V>(k, get(k));
+  }
+
+  @Override
+  public Entry<K, V> lastEntry() {
+    K k = lastKey();
+    return k == null ? null : new SimpleEntry<K, V>(k, get(k));
+  }
+
+  @Override
+  public Entry<K, V> pollFirstEntry() {
+    Entry<K, V> first = firstEntry();
+    if (first != null)
+      remove(first.getKey());
+    return first;
+  }
+
+  @Override
+  public Entry<K, V> pollLastEntry() {
+    Entry<K, V> last = lastEntry();
+    if (last != null)
+      remove(last.getKey());
+    return last;
+  }
+
+  @Override
+  public ConcurrentNavigableMap<K, V> descendingMap() {
+    throw new UnsupportedOperationException("not implemented yet");
+    // TODO implement descending (reverse order) map
+  }
+
+  @Override
+  public NavigableSet<K> keySet() {
+    return navigableKeySet();
+  }
+
+  @Override
+  public NavigableSet<K> navigableKeySet() {
+    if (keySet2 == null)
+      keySet2 = new BTreeSet<K>((BTreeMap<K, Object>) this);
+    return keySet2;
+  }
+
+  @Override
+  public NavigableSet<K> descendingKeySet() {
+    return descendingMap().navigableKeySet();
+  }
+
+  @Override
+  public NavigableMap<K, V> tailMap(K fromKey) {
+    return tailMap(fromKey, true);
+  }
+
+  @Override
+  public NavigableMap<K, V> tailMap(K fromKey2, boolean inclusive) {
+    K fromKey3 = JDBMUtils.max(this.fromKey, fromKey2, comparator());
+    boolean inclusive2 = fromKey3 == toKey ? toInclusive : inclusive;
+
+    return new BTreeMap<K, V>(tree, readonly, fromKey3, inclusive2, toKey,
+        toInclusive);
+  }
+
+  @Override
+  public NavigableMap<K, V> subMap(K fromKey, boolean fromInclusive, K toKey,
+      boolean toInclusive) {
+    Comparator comp = comparator();
+    if (comp == null)
+      comp = JDBMUtils.COMPARABLE_COMPARATOR;
+    if (comp.compare(fromKey, toKey) > 0)
+      throw new IllegalArgumentException("fromKey is bigger then toKey");
+    return new BTreeMap<K, V>(tree, readonly, fromKey, fromInclusive, toKey,
+        toInclusive);
+  }
+
+  @Override
+  public NavigableMap<K, V> subMap(K fromKey, K toKey) {
+    return subMap(fromKey, true, toKey, false);
+  }
+
+  public BTree<K, V> getTree() {
+    return tree;
+  }
+
+  public void addRecordListener(RecordListener<K, V> listener) {
+    tree.addRecordListener(listener);
+  }
+
+  public DBAbstract getRecordManager() {
+    return tree.getRecordManager();
+  }
+
+  public void removeRecordListener(RecordListener<K, V> listener) {
+    tree.removeRecordListener(listener);
+  }
+
+  @Override
+  public int size() {
+    if (fromKey == null && toKey == null)
+      return (int) tree._entries; // use fast counter on tree if Map has no
+                                  // bounds
+    else {
+      // had to count items in iterator
+      Iterator iter = keySet().iterator();
+      int counter = 0;
+      while (iter.hasNext()) {
+        iter.next();
+        counter++;
+      }
+      return counter;
+    }
+
+  }
+
+}



Mime
View raw message