hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1388324 - in /hama/trunk: ./ bin/ conf/ core/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/pipes/ graph/ graph/src/main/java/org/apache/hama/graph/ jdbm/ src/assemble/
Date Fri, 21 Sep 2012 04:57:11 GMT
Author: edwardyoon
Date: Fri Sep 21 04:57:10 2012
New Revision: 1388324

URL: http://svn.apache.org/viewvc?rev=1388324&view=rev
Log:
merge to 1387508 revision

Removed:
    hama/trunk/core/src/main/java/org/apache/hama/bsp/WritableComparator.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/WritableSerialization.java
    hama/trunk/core/src/main/java/org/apache/hama/pipes/
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritableSerialization.java
    hama/trunk/jdbm/
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/bin/hama
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/pom.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
    hama/trunk/graph/pom.xml
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/pom.xml
    hama/trunk/src/assemble/bin.xml

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Fri Sep 21 04:57:10 2012
@@ -4,8 +4,6 @@ Release 0.6 (unreleased changes)
 
   NEW FEATURES
 
-   HAMA-601: Hama Streaming (tjungblut)
-
   BUG FIXES
 
    HAMA-635: Number of vertices value is inconsistent among tasks (Yuesheng Hu via tjungblut)
@@ -14,8 +12,7 @@ Release 0.6 (unreleased changes)
    HAMA-608: LocalRunner should honor the configured queues (tjungblut)
 
   IMPROVEMENTS
- 
-   HAMA-642: Make GraphRunner disk based (tjungblut)
+
    HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon & tjungblut) 
    HAMA-557: Implement Checkpointing service in Hama (surajmenon)
    HAMA-587: Synchronization Client should provide API's to store and retrieve information
among peers and BSPMaster (surajmenon)
@@ -51,34 +48,34 @@ Release 0.5 - April 10, 2012 
 
   IMPROVEMENTS
    
-    HAMA-593: Improve RPC scalability (Mayank Mishra via tjungblut)
-    HAMA-584: Change Pagerank IO format to human-readable text for easy debug (tjungblut
via edwardyoon)
-    HAMA-590: Fix TestSubmitGraphJob tests (tjungblut)
-    HAMA-582: Task's error logs should be displayed on client-end when job is failed (edwardyoon)
-    HAMA-580: Improve input of graph module (tjungblut)
-    HAMA-579: Add multiple aggregators (tjungblut)
-    HAMA-576: Improve sendMessages in Vertex (tjungblut)
-    HAMA-575: Generify graph package (tjungblut)    
-    HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
-    HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)
-    HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
-    HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
-    HAMA-555: Separate bin and src distributions (edwardyoon)
-    HAMA-548: Update 0.23.0-SNAPSHOT to 0.23.1 in pom file of yarn module (edwardyoon)
-    HAMA-545: Include the API and other docs in the Hama release (Suraj Menon via edwardyoon)
-    HAMA-543: Make best effort to start BSP Task on the host where the input split is located.
(Suraj Menon via edwardyoon)
-    HAMA-527: Update commons-configuration version (edwardyoon)
-    HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
-    HAMA-485: Fill Counters with useful information (tjungblut)
-    HAMA-497: Switch the trunk to Hadoop 1.0 based (edwardyoon) 
-    HAMA-445: Make configurable checkpointing (Suraj Menon via edwardyoon)
-    HAMA-498: BSPTask should periodically ping its parent (Suraj Menon via edwardyoon)
-    HAMA-513: Move message classes to somewhere from bsp package. (tjungblut)
-    HAMA-484: Counters should be accessible in client (tjungblut)    
-    HAMA-483: Remove old and deprecated BSP API (tjungblut)    
-    HAMA-514: Add maven-gpg-plugin to parent POM file (edwardyoon)
-    HAMA-510: Add sendMessageToNeighbors() to Vertex (tjungblut)
-    HAMA-502: Message API Improvement (edwardyoon)
+   HAMA-593: Improve RPC scalability (Mayank Mishra via tjungblut)
+   HAMA-584: Change Pagerank IO format to human-readable text for easy debug (tjungblut via
edwardyoon)
+   HAMA-590: Fix TestSubmitGraphJob tests (tjungblut)
+   HAMA-582: Task's error logs should be displayed on client-end when job is failed (edwardyoon)
+   HAMA-580: Improve input of graph module (tjungblut)
+   HAMA-579: Add multiple aggregators (tjungblut)
+   HAMA-576: Improve sendMessages in Vertex (tjungblut)
+   HAMA-575: Generify graph package (tjungblut)    
+   HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
+   HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)
+   HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
+   HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)
+   HAMA-555: Separate bin and src distributions (edwardyoon)
+   HAMA-548: Update 0.23.0-SNAPSHOT to 0.23.1 in pom file of yarn module (edwardyoon)
+   HAMA-545: Include the API and other docs in the Hama release (Suraj Menon via edwardyoon)
+   HAMA-543: Make best effort to start BSP Task on the host where the input split is located.
(Suraj Menon via edwardyoon)
+   HAMA-527: Update commons-configuration version (edwardyoon)
+   HAMA-499: Refactor clearZKNodes() in BSPMaster (Apurv Verma via tjungblut)
+   HAMA-485: Fill Counters with useful information (tjungblut)
+   HAMA-497: Switch the trunk to Hadoop 1.0 based (edwardyoon) 
+   HAMA-445: Make configurable checkpointing (Suraj Menon via edwardyoon)
+   HAMA-498: BSPTask should periodically ping its parent (Suraj Menon via edwardyoon)
+   HAMA-513: Move message classes to somewhere from bsp package. (tjungblut)
+   HAMA-484: Counters should be accessible in client (tjungblut)    
+   HAMA-483: Remove old and deprecated BSP API (tjungblut)    
+   HAMA-514: Add maven-gpg-plugin to parent POM file (edwardyoon)
+   HAMA-510: Add sendMessageToNeighbors() to Vertex (tjungblut)
+   HAMA-502: Message API Improvement (edwardyoon)
   
 Release 0.4 - February 5, 2012
 

Modified: hama/trunk/bin/hama
URL: http://svn.apache.org/viewvc/hama/trunk/bin/hama?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/bin/hama (original)
+++ hama/trunk/bin/hama Fri Sep 21 04:57:10 2012
@@ -60,7 +60,6 @@ if [ $# = 0 ]; then
   echo "  zookeeper            run a Zookeeper server"
   echo "  job                  manipulate BSP jobs"
   echo "  jar <jar>            run a jar file"
-  echo "  pipes	               run a pipe job"
   echo " or"
   echo "  CLASSNAME            run the class named CLASSNAME"
   echo "Most commands print help when invoked w/o parameters."
@@ -161,8 +160,6 @@ elif [ "$COMMAND" = "zookeeper" ] ; then
   CLASS='org.apache.hama.ZooKeeperRunner'
 elif [ "$COMMAND" = "job" ] ; then
   CLASS='org.apache.hama.bsp.BSPJobClient'
-elif [ "$COMMAND" = "pipes" ] ; then
-  CLASS='org.apache.hama.pipes.Submitter'
 elif [ "$COMMAND" = "jar" ] ; then
   CLASS=org.apache.hama.util.RunJar
   BSP_OPTS="$BSP_OPTS"

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Fri Sep 21 04:57:10 2012
@@ -140,6 +140,7 @@
     <value>10000</value>
     <description>The default timeout period for checking groom server health.</description>
   </property>
+
   <property>
     <name>hama.messenger.max.cached.connections</name>
     <value>100</value>
@@ -149,20 +150,7 @@
     but it trades more memory.
     </description>
   </property>
-  <property>
-    <name>hama.graph.in.memory</name>
-    <value>false</value>
-    <description>true if the graph should completely stored in memory (in a map), 
-    default is false, so it will be stored on disk under the configured "hama.graph.storage.path".
-    </description>
-  </property>
-  <property>
-    <name>hama.graph.storage.path</name>
-    <value>/tmp/graph_storage/</value>
-    <description>The default place where the graph (vertices, edges etc) is stored
on each node's hdd.
-    </description>
-  </property>
-
+  
   <!--
   Beginning of properties that are directly mapped from ZooKeeper's zoo.cfg.
   All properties with an "hama.zookeeper.property." prefix are converted for

Modified: hama/trunk/core/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/core/pom.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/pom.xml (original)
+++ hama/trunk/core/pom.xml Fri Sep 21 04:57:10 2012
@@ -150,11 +150,6 @@
       <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hama</groupId>
-      <artifactId>hama-jdbm</artifactId>
-      <version>${project.version}</version>
-    </dependency>
   </dependencies>
 
   <build>

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPMessageBundle.java Fri Sep 21 04:57:10
2012
@@ -22,6 +22,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
@@ -39,7 +40,7 @@ public class BSPMessageBundle<M extends 
 
   public static final Log LOG = LogFactory.getLog(BSPMessageBundle.class);
 
-  private HashMap<String, ArrayList<M>> messages = new HashMap<String, ArrayList<M>>();
+  private HashMap<String, LinkedList<M>> messages = new HashMap<String, LinkedList<M>>();
   private HashMap<String, Class<M>> classCache = new HashMap<String, Class<M>>();
 
   public BSPMessageBundle() {
@@ -53,7 +54,8 @@ public class BSPMessageBundle<M extends 
   public void addMessage(M message) {
     String className = message.getClass().getName();
     if (!messages.containsKey(className)) {
-      ArrayList<M> list = new ArrayList<M>();
+      // use linked list because we're just iterating over them
+      LinkedList<M> list = new LinkedList<M>();
       list.add(message);
       messages.put(className, list);
     } else {
@@ -65,7 +67,7 @@ public class BSPMessageBundle<M extends 
     // here we use an arraylist, because we know the size and outside may need
     // random access
     List<M> mergeList = new ArrayList<M>(messages.size());
-    for (ArrayList<M> c : messages.values()) {
+    for (LinkedList<M> c : messages.values()) {
       mergeList.addAll(c);
     }
     return mergeList;
@@ -76,9 +78,9 @@ public class BSPMessageBundle<M extends 
     // writes the k/v mapping size
     out.writeInt(messages.size());
     if (messages.size() > 0) {
-      for (Entry<String, ArrayList<M>> entry : messages.entrySet()) {
+      for (Entry<String, LinkedList<M>> entry : messages.entrySet()) {
         out.writeUTF(entry.getKey());
-        ArrayList<M> messageList = entry.getValue();
+        LinkedList<M> messageList = entry.getValue();
         out.writeInt(messageList.size());
         for (M msg : messageList) {
           msg.write(out);
@@ -91,14 +93,14 @@ public class BSPMessageBundle<M extends 
   @SuppressWarnings("unchecked")
   public void readFields(DataInput in) throws IOException {
     if (messages == null) {
-      messages = new HashMap<String, ArrayList<M>>();
+      messages = new HashMap<String, LinkedList<M>>();
     }
     int numMessages = in.readInt();
     if (numMessages > 0) {
       for (int entries = 0; entries < numMessages; entries++) {
         String className = in.readUTF();
         int size = in.readInt();
-        ArrayList<M> msgList = new ArrayList<M>();
+        LinkedList<M> msgList = new LinkedList<M>();
         messages.put(className, msgList);
 
         Class<M> clazz = null;

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Fri Sep 21 04:57:10
2012
@@ -20,7 +20,6 @@ package org.apache.hama.bsp;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.Map.Entry;
 
@@ -277,7 +276,6 @@ public final class BSPPeerImpl<K1, V1, K
         }
       }
     }
-    LOG.info("Moving to local cache files: " + files.toString() +" INITIALLY IT WAS: " +
Arrays.toString(DistributedCache.getCacheFiles(conf)));
     if (files.length() > 0) {
       DistributedCache.addLocalFiles(conf, files.toString());
     }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/LocalBSPRunner.java Fri Sep 21 04:57:10
2012
@@ -57,10 +57,6 @@ import org.apache.hama.ipc.JobSubmission
  * running BSP's.
  */
 public class LocalBSPRunner implements JobSubmissionProtocol {
-  
-  public static final String BSP_LOCAL_DIR = "bsp.local.dir";
-  public static final String BSP_LOCAL_TASKS_MAXIMUM = "bsp.local.tasks.maximum";
-  
   private static final Log LOG = LogFactory.getLog(LocalBSPRunner.class);
 
   private static final String IDENTIFIER = "localrunner";
@@ -86,9 +82,9 @@ public class LocalBSPRunner implements J
     super();
     this.conf = conf;
 
-    maxTasks = conf.getInt(BSP_LOCAL_TASKS_MAXIMUM, 20);
+    maxTasks = conf.getInt("bsp.local.tasks.maximum", 20);
 
-    String path = conf.get(BSP_LOCAL_DIR);
+    String path = conf.get("bsp.local.dir");
     if (path != null && !path.isEmpty()) {
       WORKING_DIR = path;
     }

Modified: hama/trunk/graph/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/graph/pom.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/pom.xml (original)
+++ hama/trunk/graph/pom.xml Fri Sep 21 04:57:10 2012
@@ -43,11 +43,6 @@
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>
-    <dependency>
-      <groupId>org.apache.hama</groupId>
-      <artifactId>hama-jdbm</artifactId>
-      <version>${project.version}</version>
-    </dependency>
   </dependencies>
   <build>
     <finalName>hama-graph-${project.version}</finalName>

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java Fri Sep 21
04:57:10 2012
@@ -87,7 +87,7 @@ public abstract class AbstractAggregator
   public IntWritable getTimesAggregated() {
     return new IntWritable(timesAggregated);
   }
-
+  
   @Override
   public String toString() {
     return "VAL=" + getValue();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Fri Sep 21 04:57:10
2012
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.Combiner;
@@ -73,8 +72,7 @@ public class GraphJob extends BSPJob {
   /**
    * Set the Vertex ID class for the job.
    */
-  public void setVertexIDClass(
-      @SuppressWarnings("rawtypes") Class<? extends WritableComparable> cls)
+  public void setVertexIDClass(Class<? extends Writable> cls)
       throws IllegalStateException {
     conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
   }
@@ -131,8 +129,8 @@ public class GraphJob extends BSPJob {
   }
 
   @Override
-  public void setPartitioner(
-      @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
+  public void setPartitioner(@SuppressWarnings("rawtypes")
+  Class<? extends Partitioner> theClass) {
     super.setPartitioner(theClass);
     conf.setBoolean(VERTEX_GRAPH_RUNTIME_PARTIONING, true);
   }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri Sep 21 04:57:10
2012
@@ -18,10 +18,8 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -31,26 +29,18 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
-import org.apache.hama.bsp.WritableComparator;
-import org.apache.hama.bsp.WritableSerialization;
 import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.jdbm.DB;
-import org.apache.hama.jdbm.DBMaker;
 import org.apache.hama.util.KeyValuePair;
 
 /**
@@ -60,20 +50,8 @@ import org.apache.hama.util.KeyValuePair
  * @param <E> the value type of an edge.
  * @param <M> the value type of a vertex.
  */
-public final class GraphJobRunner<V extends WritableComparable<V>, E extends Writable,
M extends Writable>
-    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage>
-    implements Serializable {
-
-  public static final String HAMA_GRAPH_MULTI_STEP_PARTITIONING_INTERVAL = "hama.graph.partitioning.batch.bytes";
-  public static final String HAMA_GRAPH_SELF_REF = "hama.graph.self.ref";
-  public static final String HAMA_GRAPH_STORAGE_PATH = "hama.graph.storage.path";
-  public static final String HAMA_GRAPH_IN_MEMORY = "hama.graph.in.memory";
-  public static final String HAMA_GRAPH_VERTEX_CLASS = "hama.graph.vertex.class";
-  public static final String HAMA_GRAPH_MAX_ITERATION = "hama.graph.max.iteration";
-  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
-  public static final String GRAPH_REPAIR = "hama.graph.repair";
-
-  private static final long serialVersionUID = 1L;
+public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable>
+    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
 
   private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
 
@@ -83,12 +61,14 @@ public final class GraphJobRunner<V exte
   public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
   public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
 
-  private transient Configuration conf;
-  private transient Combiner<M> combiner;
-  private transient Partitioner<V, M> partitioner;
+  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  public static final String GRAPH_REPAIR = "hama.graph.repair";
+
+  private Configuration conf;
+  private Combiner<M> combiner;
+  private Partitioner<V, M> partitioner;
 
-  private transient Map<V, Vertex<V, E, M>> vertices;
-  private transient DB db;
+  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
M>>();
 
   private boolean updated = true;
   private int globalUpdateCounts = 0;
@@ -98,14 +78,14 @@ public final class GraphJobRunner<V exte
   private int maxIteration = -1;
   private long iteration;
 
-  Class<V> vertexIdClass;
-  Class<M> vertexValueClass;
-  Class<E> edgeValueClass;
-  Class<Vertex<V, E, M>> vertexClass;
+  private Class<V> vertexIdClass;
+  private Class<M> vertexValueClass;
+  private Class<E> edgeValueClass;
+  private Class<Vertex<V, E, M>> vertexClass;
 
-  private transient AggregationRunner<V, E, M> aggregationRunner;
+  private AggregationRunner<V, E, M> aggregationRunner;
 
-  private transient BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
peer;
+  private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
   @Override
   public final void setup(
@@ -145,32 +125,18 @@ public final class GraphJobRunner<V exte
       // loop over vertices and do their computation
       doSuperstep(messages, peer);
     }
-
-    write(peer);
   }
 
   /**
    * Just write <ID as Writable, Value as Writable> pair as a result. Note that
    * this will also be executed when failure happened.
    */
-  private void write(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
-      throws IOException {
-
-    Set<V> keySet = vertices.keySet();
-    for (V value : keySet) {
-      Vertex<V, E, M> e = vertices.get(value);
-      peer.write(e.getVertexID(), e.getValue());
-    }
-  }
-
   @Override
   public final void cleanup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    // remove the DB files if they exist
-    if (db != null) {
-      db.close();
+    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
+      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
     }
   }
 
@@ -205,9 +171,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
-    Set<V> keySet = vertices.keySet();
-    for (V key : keySet) {
-      Vertex<V, E, M> vertex = vertices.get(key);
+    for (Vertex<V, E, M> vertex : vertices.values()) {
       List<M> msgs = messages.get(vertex.getVertexID());
       // If there are newly received messages, restart.
       if (vertex.isHalted() && msgs != null) {
@@ -243,9 +207,7 @@ public final class GraphJobRunner<V exte
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    Set<V> keySet = vertices.keySet();
-    for (V key : keySet) {
-      Vertex<V, E, M> vertex = vertices.get(key);
+    for (Vertex<V, E, M> vertex : vertices.values()) {
       List<M> singletonList = Collections.singletonList(vertex.getValue());
       M lastValue = vertex.getValue();
       vertex.compute(singletonList.iterator());
@@ -260,7 +222,8 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
     this.peer = peer;
     this.conf = peer.getConfiguration();
-    maxIteration = peer.getConfiguration().getInt(HAMA_GRAPH_MAX_ITERATION, -1);
+    maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
+        -1);
 
     vertexIdClass = (Class<V>) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR,
         Text.class, Writable.class);
@@ -270,7 +233,7 @@ public final class GraphJobRunner<V exte
         GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
         Writable.class);
     vertexClass = (Class<Vertex<V, E, M>>) conf.getClass(
-        HAMA_GRAPH_VERTEX_CLASS, Vertex.class);
+        "hama.graph.vertex.class", Vertex.class);
 
     // set the classes statically, so we can save memory per message
     GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
@@ -291,35 +254,6 @@ public final class GraphJobRunner<V exte
           conf);
     }
 
-    if (!conf.getBoolean(HAMA_GRAPH_IN_MEMORY, false)) {
-
-      String storagePath = conf.get(HAMA_GRAPH_STORAGE_PATH);
-      if (storagePath == null) {
-        storagePath = "/tmp/graph_storage/";
-      }
-
-      storagePath += peer.getTaskId().toString();
-
-      try {
-        LocalFileSystem local = FileSystem.getLocal(conf);
-        local.mkdirs(new Path(storagePath));
-      } catch (IOException e) {
-        throw new RuntimeException("Could not create \"" + storagePath
-            + "\", nested exception was: ", e);
-      }
-
-      db = DBMaker.openFile(storagePath + "/graph.db").disableLocking()
-          .disableTransactions().deleteFilesAfterClose().useRandomAccessFile()
-          .make();
-
-      Comparator<V> writableComparator = new WritableComparator<V>();
-      vertices = db.createTreeMap("graph-db", writableComparator,
-          new WritableSerialization<V>(vertexIdClass, peer.getConfiguration()),
-          new VertexWritableSerialization<Vertex<V, E, M>>(vertexClass, this));
-    } else {
-      vertices = new HashMap<V, Vertex<V, E, M>>();
-    }
-
     aggregationRunner = new AggregationRunner<V, E, M>();
     aggregationRunner.setupAggregators(peer);
   }
@@ -348,7 +282,7 @@ public final class GraphJobRunner<V exte
     final int partitioningSteps = partitionMultiSteps(peer, splitSize);
     final long interval = splitSize / partitioningSteps;
 
-    final boolean selfReference = conf.getBoolean(HAMA_GRAPH_SELF_REF, false);
+    final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
 
     /*
      * Several partitioning constants end
@@ -569,7 +503,7 @@ public final class GraphJobRunner<V exte
       }
 
       int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
-          HAMA_GRAPH_MULTI_STEP_PARTITIONING_INTERVAL, 20000000)) + 1;
+          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
 
       for (String peerName : peer.getAllPeerNames()) {
         MapWritable temp = new MapWritable();
@@ -584,7 +518,7 @@ public final class GraphJobRunner<V exte
     for (Entry<Writable, Writable> e : x.entrySet()) {
       multiSteps = ((IntWritable) e.getValue()).get();
     }
-    LOG.info(peer.getPeerName() + ": Number of partitioning supersteps: " + multiSteps);
+    LOG.info(peer.getPeerName() + ": " + multiSteps);
     return multiSteps;
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Fri Sep 21 04:57:10 2012
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.graph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,15 +24,13 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Partitioner;
 
 public abstract class Vertex<V extends Writable, E extends Writable, M extends Writable>
-    implements VertexInterface<V, E, M>, Writable {
+    implements VertexInterface<V, E, M> {
 
-  transient GraphJobRunner<?, ?, ?> runner;
+  GraphJobRunner<?, ?, ?> runner;
 
   private V vertexID;
   private M value;
@@ -222,81 +218,6 @@ public abstract class Vertex<V extends W
     return true;
   }
 
-  @SuppressWarnings("unchecked")
-  @Override
-  public void readFields(DataInput in) throws IOException {
-    votedToHalt = in.readBoolean();
-    vertexID = (V) ReflectionUtils.newInstance(runner.vertexIdClass, null);
-    vertexID.readFields(in);
-    if (in.readBoolean()) {
-      value = (M) ReflectionUtils.newInstance(runner.vertexValueClass, null);
-      value.readFields(in);
-    }
-
-    int edges = WritableUtils.readVInt(in);
-    ArrayList<Edge<V, E>> list = new ArrayList<Edge<V, E>>(edges);
-    for (int i = 0; i < edges; i++) {
-      V adjacentId = (V) ReflectionUtils
-          .newInstance(runner.vertexIdClass, null);
-      adjacentId.readFields(in);
-      E edgeValue = null;
-      if (in.readBoolean()) {
-        edgeValue = (E) ReflectionUtils
-            .newInstance(runner.edgeValueClass, null);
-        edgeValue.readFields(in);
-      }
-      list.add(new Edge<V, E>(adjacentId, edgeValue));
-    }
-
-    this.setEdges(list);
-    readInternal(in);
-  }
-
-  @Override
-  public void write(DataOutput out) throws IOException {
-    out.writeBoolean(votedToHalt);
-    V vId = getVertexID();
-    vId.write(out);
-    M val = getValue();
-    serializeNull(out, val);
-
-    List<Edge<V, E>> edges = getEdges();
-    int length = edges == null ? 0 : edges.size();
-    WritableUtils.writeVInt(out, length);
-    for (Edge<V, E> edge : edges) {
-      edge.getDestinationVertexID().write(out);
-      serializeNull(out, edge.getValue());
-    }
-
-    writeInternal(out);
-  }
-
-  /**
-   * A write method to let the user save its own state in the vertex class.
-   */
-  protected void writeInternal(DataOutput out) throws IOException {
-  }
-
-  /**
-   * A read method to let the user save its own state in the vertex class.
-   */
-  protected void readInternal(DataInput out) throws IOException {
-  }
-
-  /**
-   * Serializes data null-safe by writing a boolean that is only true when the
-   * given writable is not null.
-   */
-  protected static void serializeNull(DataOutput out, Writable writable)
-      throws IOException {
-    if (writable == null) {
-      out.writeBoolean(false);
-    } else {
-      out.writeBoolean(true);
-      writable.write(out);
-    }
-  }
-
   @Override
   public String toString() {
     return getVertexID() + (getValue() != null ? " = " + getValue() : "")

Modified: hama/trunk/pom.xml
URL: http://svn.apache.org/viewvc/hama/trunk/pom.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/pom.xml (original)
+++ hama/trunk/pom.xml Fri Sep 21 04:57:10 2012
@@ -248,7 +248,6 @@
     <module>examples</module>
     <module>yarn</module>
     <module>ml</module>
-    <module>jdbm</module>
     <module>dist</module>
   </modules>
 
@@ -322,8 +321,6 @@
             <exclude>.classpath/**</exclude>
             <exclude>.project</exclude>
             <exclude>**/*.asc</exclude>
-	    <exclude>**/target/*</exclude>
-            <exclude>**/bin/**</exclude>
             <exclude>**/logs/**</exclude>
             <exclude>**/docs/**</exclude>
             <exclude>CHANGES.txt</exclude>

Modified: hama/trunk/src/assemble/bin.xml
URL: http://svn.apache.org/viewvc/hama/trunk/src/assemble/bin.xml?rev=1388324&r1=1388323&r2=1388324&view=diff
==============================================================================
--- hama/trunk/src/assemble/bin.xml (original)
+++ hama/trunk/src/assemble/bin.xml Fri Sep 21 04:57:10 2012
@@ -74,19 +74,6 @@
       </excludes>
       <outputDirectory>../hama-${project.version}/</outputDirectory>
     </fileSet>
-    <fileSet>
-      <directory>../jdbm/target</directory>
-      <includes>
-        <include>hama-*.jar</include>
-      </includes>
-      <excludes>
-        <exclude>*sources.jar</exclude>
-        <exclude>*tests.jar</exclude>
-        <exclude>*javadoc.jar</exclude>
-      </excludes>
-      <outputDirectory>../hama-${project.version}/</outputDirectory>
-    </fileSet>
-
  
     <fileSet>
       <directory>../</directory>



Mime
View raw message