hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1340131 [1/2] - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/sync/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/a...
Date Fri, 18 May 2012 15:35:30 GMT
Author: tjungblut
Date: Fri May 18 15:35:28 2012
New Revision: 1340131

URL: http://svn.apache.org/viewvc?rev=1340131&view=rev
Log:
[HAMA-575]: Generify graph package


Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri May 18 15:35:28 2012
@@ -17,7 +17,8 @@ Release 0.5 - April 10, 2012 
   BUG FIXES
 
   IMPROVEMENTS
-    
+
+    HAMA-575: Generify graph package (tjungblut)    
     HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
     HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)
     HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Fri May 18 15:35:28 2012
@@ -310,6 +310,7 @@ public class BSPJobClient extends Config
       LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
       if (job.getConf().get("bsp.input.partitioner.class") != null) {
         job = partition(job, maxTasks);
+        maxTasks = job.getInt("hama.partition.count", maxTasks);
       }
       job.setNumBspTask(writeSplits(job, submitSplitFile, maxTasks));
       job.set("bsp.job.split.file", submitSplitFile.toString());
@@ -441,7 +442,7 @@ public class BSPJobClient extends Config
           wr.close();
         }
       }
-
+      job.set("hama.partition.count", writers.size() + "");
       job.setInputFormat(SequenceFileInputFormat.class);
       job.setInputPath(partitionedPath);
     }
@@ -597,7 +598,10 @@ public class BSPJobClient extends Config
 
     if (job.isSuccessful()) {
       LOG.info("The total number of supersteps: " + info.getSuperstepCount());
-      info.getStatus().getCounter().incrCounter(BSPPeerImpl.PeerCounter.SUPERSTEPS, info.getSuperstepCount());
+      info.getStatus()
+          .getCounter()
+          .incrCounter(BSPPeerImpl.PeerCounter.SUPERSTEPS,
+              info.getSuperstepCount());
       info.getStatus().getCounter().log(LOG);
     } else {
       LOG.info("Job failed.");

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/SyncServiceFactory.java Fri May 18 15:35:28 2012
@@ -26,36 +26,27 @@ public class SyncServiceFactory {
 
   /**
    * Returns a sync client via reflection based on what was configured.
-   * 
-   * @param conf
-   * @return
    */
   public static SyncClient getSyncClient(Configuration conf)
       throws ClassNotFoundException {
-    return (SyncClient) ReflectionUtils.newInstance(conf.getClassByName(conf
-        .get(SYNC_CLIENT_CLASS,
-            org.apache.hama.bsp.sync.ZooKeeperSyncClientImpl.class
-                .getCanonicalName())), conf);
+    return (SyncClient) ReflectionUtils
+        .newInstance(conf.getClassByName(conf.get(SYNC_CLIENT_CLASS,
+            ZooKeeperSyncClientImpl.class.getName())), conf);
   }
 
   /**
    * Returns a sync server via reflection based on what was configured.
-   * 
-   * @param conf
-   * @return
    */
   public static SyncServer getSyncServer(Configuration conf)
       throws ClassNotFoundException {
     return (SyncServer) ReflectionUtils.newInstance(conf.getClassByName(conf
         .get(SYNC_SERVER_CLASS,
-            org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl.class.getCanonicalName())), conf);
+            org.apache.hama.bsp.sync.ZooKeeperSyncServerImpl.class
+                .getCanonicalName())), conf);
   }
 
   /**
    * Returns a sync server runner via reflection based on what was configured.
-   * 
-   * @param conf
-   * @return
    */
   public static SyncServerRunner getSyncServerRunner(Configuration conf) {
     return new SyncServerRunner(conf);

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/sync/ZooKeeperSyncClientImpl.java Fri May 18 15:35:28 2012
@@ -25,8 +25,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Collections;
 import java.util.List;
-import java.util.TreeMap;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,8 +39,8 @@ import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.data.Stat;
 
 /**
@@ -48,7 +48,7 @@ import org.apache.zookeeper.data.Stat;
  * 
  */
 public class ZooKeeperSyncClientImpl implements SyncClient, Watcher {
-
+  
   /*
    * TODO maybe extract an abstract class and let the subclasses implement
    * enter-/leaveBarrier so we can have multiple implementations, just like
@@ -82,6 +82,7 @@ public class ZooKeeperSyncClientImpl imp
         .getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
 
     peerAddress = new InetSocketAddress(bindAddress, bindPort);
+    LOG.info("Start connecting to Zookeeper! At " + peerAddress);
     numBSPTasks = conf.getInt("bsp.peers.num", 1);
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/BSPNetUtils.java Fri May 18 15:35:28 2012
@@ -59,7 +59,8 @@ public class BSPNetUtils {
    * 
    * @return a free port.
    */
-  public static int getFreePort(int startPort) {
+  public static int getFreePort(int pStartPort) {
+    int startPort = pStartPort;
     while (!AvailablePortFinder.available(startPort)) {
       startPort++;
     }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/Bytes.java Fri May 18 15:35:28 2012
@@ -100,10 +100,12 @@ public class Bytes {
       super();
     }
 
+    @Override
     public int compare(byte[] left, byte[] right) {
       return compareTo(left, right);
     }
 
+    @Override
     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
       return compareTo(b1, s1, l1, b2, s2, l2);
     }

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Fri May 18 15:35:28 2012
@@ -47,6 +47,7 @@ public class TestCheckpoint extends Test
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public void testCheckpoint() throws Exception {
     Configuration config = new HamaConfiguration();
+    config.set(SyncServiceFactory.SYNC_CLIENT_CLASS, LocalBSPRunner.LocalSyncClient.class.getName());
     FileSystem dfs = FileSystem.get(config);
 
     BSPPeerImpl bspTask = new BSPPeerImpl(config, dfs);
@@ -84,7 +85,6 @@ public class TestCheckpoint extends Test
   public void testCheckpointInterval() throws Exception {
 
     HamaConfiguration conf = new HamaConfiguration();
-
     conf.setClass(SyncServiceFactory.SYNC_CLIENT_CLASS,
         LocalBSPRunner.LocalSyncClient.class, SyncClient.class);
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/CombineExample.java Fri May 18 15:35:28 2012
@@ -118,7 +118,7 @@ public class CombineExample {
     if (bsp.waitForCompletion(true)) {
       printOutput(conf);
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Fri May 18 15:35:28 2012
@@ -32,7 +32,8 @@ public class ExampleDriver {
       pgd.addClass("sssp-text2seq", SSSPTextToSeq.class,
           "Generates SSSP input from textfile");
       pgd.addClass("sssp", SSSP.class, "Single Shortest Path");
-      pgd.addClass("mdstsearch", MindistSearch.class, "Mindist search / Connected Components");
+      pgd.addClass("mdstsearch", MindistSearch.class,
+          "Mindist search / Connected Components");
       pgd.addClass("cmb", CombineExample.class, "Combine");
       pgd.addClass("bench", RandBench.class, "Random Benchmark");
       pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class,

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Fri May 18 15:35:28 2012
@@ -22,6 +22,7 @@ import java.util.Iterator;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
@@ -32,7 +33,7 @@ import org.apache.hama.graph.Vertex;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
-public class InlinkCount extends Vertex<IntWritable> {
+public class InlinkCount extends Vertex<Text, IntWritable, NullWritable> {
 
   @Override
   public void compute(Iterator<IntWritable> messages) throws IOException {
@@ -66,6 +67,10 @@ public class InlinkCount extends Vertex<
     inlinkJob.setInputFormat(SequenceFileInputFormat.class);
     inlinkJob.setInputKeyClass(VertexWritable.class);
     inlinkJob.setInputValueClass(VertexArrayWritable.class);
+    
+    inlinkJob.setVertexIDClass(Text.class);
+    inlinkJob.setVertexValueClass(IntWritable.class);
+    inlinkJob.setEdgeValueClass(NullWritable.class);
 
     inlinkJob.setPartitioner(HashPartitioner.class);
     inlinkJob.setOutputFormat(SequenceFileOutputFormat.class);
@@ -75,7 +80,7 @@ public class InlinkCount extends Vertex<
     long startTime = System.currentTimeMillis();
     if (inlinkJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Fri May 18 15:35:28 2012
@@ -22,6 +22,7 @@ import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.Combiner;
@@ -40,7 +41,8 @@ public class MindistSearch {
   /*
    * Make sure that you know that you're comparing text, and not integers!
    */
-  public static class MindistSearchVertex extends Vertex<Text> {
+  public static class MindistSearchVertex extends
+      Vertex<Text, Text, NullWritable> {
 
     @Override
     public void compute(Iterator<Text> messages) throws IOException {
@@ -50,10 +52,10 @@ public class MindistSearch {
         // neighbourhood.
         if (currentComponent == null) {
           setValue(new Text(getVertexID()));
-          for (Edge e : edges) {
-            String id = getVertexID();
-            if (id.compareTo(e.getName()) > 0) {
-              setValue(new Text(e.getName()));
+          for (Edge<Text, NullWritable> e : edges) {
+            Text id = getVertexID();
+            if (id.compareTo(e.getDestinationVertexID()) > 0) {
+              setValue(e.getDestinationVertexID());
             }
           }
           sendMessageToNeighbors(getValue());
@@ -118,6 +120,10 @@ public class MindistSearch {
     if (args.length >= 3)
       connectedComponentsJob.setMaxIteration(Integer.parseInt(args[2]));
 
+    connectedComponentsJob.setVertexIDClass(Text.class);
+    connectedComponentsJob.setVertexValueClass(Text.class);
+    connectedComponentsJob.setEdgeValueClass(NullWritable.class);
+
     connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
     connectedComponentsJob.setPartitioner(HashPartitioner.class);
     connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
@@ -127,7 +133,7 @@ public class MindistSearch {
     long startTime = System.currentTimeMillis();
     if (connectedComponentsJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Fri May 18 15:35:28 2012
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.HashPartitioner;
@@ -34,7 +35,8 @@ import org.apache.hama.graph.Vertex;
 
 public class PageRank {
 
-  public static class PageRankVertex extends Vertex<DoubleWritable> {
+  public static class PageRankVertex extends
+      Vertex<Text, DoubleWritable, NullWritable> {
 
     static double DAMPING_FACTOR = 0.85;
     static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
@@ -58,7 +60,7 @@ public class PageRank {
     public void compute(Iterator<DoubleWritable> messages) throws IOException {
       // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
-        this.setValue(new DoubleWritable(1.0 / (double) this.getNumVertices()));
+        this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
       }
 
       // in the first superstep, there are no messages to check
@@ -68,7 +70,7 @@ public class PageRank {
           DoubleWritable msg = messages.next();
           sum += msg.get();
         }
-        double alpha = (1.0d - DAMPING_FACTOR) / (double) this.getNumVertices();
+        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
         this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
       }
 
@@ -120,6 +122,10 @@ public class PageRank {
       pageJob.set("hama.pagerank.alpha", args[2]);
 
     pageJob.setAggregatorClass(AverageAggregator.class);
+    
+    pageJob.setVertexIDClass(Text.class);
+    pageJob.setVertexValueClass(DoubleWritable.class);
+    pageJob.setEdgeValueClass(NullWritable.class);
 
     pageJob.setInputFormat(SequenceFileInputFormat.class);
     pageJob.setPartitioner(HashPartitioner.class);
@@ -130,7 +136,7 @@ public class PageRank {
     long startTime = System.currentTimeMillis();
     if (pageJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Fri May 18 15:35:28 2012
@@ -63,7 +63,7 @@ public class PiEstimator {
         }
       }
 
-      double data = 4.0 * (double) in / (double) iterations;
+      double data = 4.0 * in / iterations;
 
       peer.send(masterTask, new DoubleWritable(data));
       peer.sync();
@@ -77,6 +77,7 @@ public class PiEstimator {
       this.masterTask = peer.getPeerName(peer.getNumPeers() / 2);
     }
 
+    @Override
     public void cleanup(
         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
         throws IOException {
@@ -138,7 +139,7 @@ public class PiEstimator {
     if (bsp.waitForCompletion(true)) {
       printOutput(conf);
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Fri May 18 15:35:28 2012
@@ -112,7 +112,7 @@ public class RandBench {
     long startTime = System.currentTimeMillis();
     if (bsp.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Fri May 18 15:35:28 2012
@@ -38,14 +38,15 @@ public class SSSP {
 
   public static final String START_VERTEX = "shortest.paths.start.vertex.name";
 
-  public static class ShortestPathVertex extends Vertex<IntWritable> {
+  public static class ShortestPathVertex extends
+      Vertex<Text, IntWritable, IntWritable> {
 
     public ShortestPathVertex() {
       this.setValue(new IntWritable(Integer.MAX_VALUE));
     }
 
     public boolean isStartVertex() {
-      String startVertex = getConf().get(START_VERTEX);
+      Text startVertex = new Text(getConf().get(START_VERTEX));
       return (this.getVertexID().equals(startVertex)) ? true : false;
     }
 
@@ -62,8 +63,8 @@ public class SSSP {
 
       if (minDist < this.getValue().get()) {
         this.setValue(new IntWritable(minDist));
-        for (Edge e : this.getOutEdges()) {
-          sendMessage(e, new IntWritable(minDist + e.getCost()));
+        for (Edge<Text, IntWritable> e : this.getOutEdges()) {
+          sendMessage(e, new IntWritable(minDist + e.getValue().get()));
         }
       }
     }
@@ -122,11 +123,16 @@ public class SSSP {
     ssspJob.setOutputValueClass(IntWritable.class);
     // Iterate until all the nodes have been reached.
     ssspJob.setMaxIteration(Integer.MAX_VALUE);
+    
+    ssspJob.setVertexIDClass(Text.class);
+    ssspJob.setVertexValueClass(IntWritable.class);
+    ssspJob.setEdgeValueClass(IntWritable.class);
+    
 
     long startTime = System.currentTimeMillis();
     if (ssspJob.waitForCompletion(true)) {
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SuperstepPiEstimator.java Fri May 18 15:35:28 2012
@@ -75,7 +75,7 @@ public class SuperstepPiEstimator {
         }
       }
 
-      double data = 4.0 * (double) in / (double) iterations;
+      double data = 4.0 * in / iterations;
       DoubleMessage estimate = new DoubleMessage(peer.getPeerName(), data);
 
       peer.send(masterTask, estimate);
@@ -159,7 +159,7 @@ public class SuperstepPiEstimator {
     if (bsp.waitForCompletion(true)) {
       printOutput(conf);
       System.out.println("Job Finished in "
-          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + (System.currentTimeMillis() - startTime) / 1000.0
           + " seconds");
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/PagerankTextToSeq.java Fri May 18 15:35:28 2012
@@ -20,7 +20,9 @@ package org.apache.hama.examples.util;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.SequenceFile.Writer;
+import org.apache.hadoop.io.Text;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 import org.apache.hama.util.KeyValuePair;
@@ -40,6 +42,7 @@ import org.apache.hama.util.KeyValuePair
  *    bin/hama -jar examples.jar pagerank-text2seq /tmp/in /tmp/out ";"
  * </pre>
  */
+@SuppressWarnings("rawtypes")
 public class PagerankTextToSeq extends TextToSequenceFile {
 
   public PagerankTextToSeq(Path inPath, Path outPath, String delimiter)
@@ -47,6 +50,7 @@ public class PagerankTextToSeq extends T
     super(inPath, outPath, delimiter);
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   protected KeyValuePair<VertexWritable, VertexArrayWritable> processLine(
       String line) throws IOException {
@@ -54,7 +58,8 @@ public class PagerankTextToSeq extends T
     VertexWritable key = new VertexWritable(split[0]);
     VertexWritable[] v = new VertexWritable[split.length - 1];
     for (int i = 1; i < split.length; i++) {
-      v[i - 1] = new VertexWritable(split[i]);
+      v[i - 1] = new VertexWritable(new DoubleWritable(0.0),
+          new Text(split[i]), Text.class, DoubleWritable.class);
     }
     VertexArrayWritable value = new VertexArrayWritable();
     value.set(v);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextToSequenceFile.java Fri May 18 15:35:28 2012
@@ -38,6 +38,7 @@ import org.apache.hama.util.KeyValuePair
  * Abstract base class for turning a text graph into a sequence file. It offers
  * help for multiple inputs in a directory.
  */
+@SuppressWarnings("rawtypes")
 public abstract class TextToSequenceFile {
 
   protected static final Log LOG = LogFactory.getLog(TextToSequenceFile.class);
@@ -57,6 +58,7 @@ public abstract class TextToSequenceFile
     this.delimiter = delimiter;
 
     this.conf = new Configuration();
+    VertexWritable.CONFIGURATION = conf;
     this.sourceFs = inPath.getFileSystem(conf);
     this.destFs = outPath.getFileSystem(conf);
   }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Fri May 18 15:35:28 2012
@@ -24,12 +24,16 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -47,7 +51,7 @@ import org.apache.hama.graph.VertexWrita
 
 public class MindistSearchTest extends TestCase {
 
-  private static final Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
+  private static final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>();
   // mapping of our index of the vertex to the resulting component id
   private static final String[] resultList = new String[] { "0", "1", "2", "2",
       "1", "2", "2", "1", "2", "0" };
@@ -61,14 +65,17 @@ public class MindistSearchTest extends T
       String[] adjacencyStringArray = lineArray[i].split(";");
       int vertexId = Integer.parseInt(adjacencyStringArray[0]);
       String name = pages[vertexId];
-      VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      @SuppressWarnings("unchecked")
+      VertexWritable<Text, IntWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
       for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable(
-            pages[Integer.parseInt(adjacencyStringArray[j])]);
+        arr[j - 1] = new VertexWritable<Text, IntWritable>(new IntWritable(0),
+            new Text(pages[Integer.parseInt(adjacencyStringArray[j])]),
+            Text.class, IntWritable.class);
       }
       VertexArrayWritable wr = new VertexArrayWritable();
       wr.set(arr);
-      tmp.put(new VertexWritable(name), wr);
+      tmp.put(
+          new VertexWritable<Text, IntWritable>(new Text(name), Text.class), wr);
     }
   }
   private static String INPUT = "/tmp/pagerank-tmp.seq";
@@ -107,22 +114,30 @@ public class MindistSearchTest extends T
   }
 
   private void verifyResult() throws IOException {
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
-        + "/part-00000"), conf);
-    Text key = new Text();
-    Writable value = new Text();
-    while (reader.next(key, value)) {
-      System.out.println(key + " | " + value);
-      assertEquals(resultList[Integer.parseInt(key.toString())],
-          value.toString());
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    int itemsRead = 0;
+    for (FileStatus fts : globStatus) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(),
+          conf);
+      Text key = new Text();
+      Writable value = new Text();
+      while (reader.next(key, value)) {
+        System.out.println(key + " | " + value);
+        assertEquals(resultList[Integer.parseInt(key.toString())],
+            value.toString());
+        itemsRead++;
+      }
     }
+    assertEquals(resultList.length, itemsRead);
   }
 
-  private void generateSeqTestData(Map<VertexWritable, VertexArrayWritable> map)
+  private void generateSeqTestData(
+      Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> tmp)
       throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
         INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : map.entrySet()) {
+    for (Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : tmp
+        .entrySet()) {
       writer.append(e.getKey(), e.getValue());
     }
     writer.close();
@@ -143,11 +158,10 @@ public class MindistSearchTest extends T
 
   public void testRepairFunctionality() throws Exception {
     // make a copy to be safe with parallel test executions
-    final Map<VertexWritable, VertexArrayWritable> map = new HashMap<VertexWritable, VertexArrayWritable>(
-        tmp);
+    final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>(tmp);
     // removing 7 should resulting in creating it and getting the same result as
     // usual
-    map.remove(new VertexWritable("7"));
+    map.remove(new VertexWritable<Text, IntWritable>("7"));
     generateSeqTestData(map);
     try {
       HamaConfiguration conf = new HamaConfiguration(new Configuration());
@@ -169,6 +183,10 @@ public class MindistSearchTest extends T
       connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
       connectedComponentsJob.setOutputKeyClass(Text.class);
       connectedComponentsJob.setOutputValueClass(Text.class);
+      
+      connectedComponentsJob.setVertexIDClass(Text.class);
+      connectedComponentsJob.setVertexValueClass(Text.class);
+      connectedComponentsJob.setEdgeValueClass(NullWritable.class);
 
       if (connectedComponentsJob.waitForCompletion(true)) {
         verifyResult();
@@ -180,13 +198,16 @@ public class MindistSearchTest extends T
     }
   }
 
-  private void generateTestTextData() throws IOException {
+  private static void generateTestTextData() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+    for (Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : tmp
+        .entrySet()) {
       writer.write(e.getKey() + "\t");
       for (int i = 0; i < e.getValue().get().length; i++) {
-        VertexWritable writable = (VertexWritable) e.getValue().get()[i];
-        writer.write(writable.getName() + "\t");
+        @SuppressWarnings("unchecked")
+        VertexWritable<Text, IntWritable> writable = (VertexWritable<Text, IntWritable>) e
+            .getValue().get()[i];
+        writer.write(writable.getVertexId() + "\t");
       }
       writer.write("\n");
     }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Fri May 18 15:35:28 2012
@@ -22,13 +22,16 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
@@ -36,13 +39,13 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.examples.PageRank.PageRankVertex;
-import org.apache.hama.examples.util.PagerankTextToSeq;
 import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
+@SuppressWarnings("unchecked")
 public class PageRankTest extends TestCase {
   /**
    * The graph looks like this (adjacency list, [] contains outlinks):<br/>
@@ -54,8 +57,10 @@ public class PageRankTest extends TestCa
    * nasa.gov [yahoo.com, stackoverflow.com]<br/>
    * youtube.com [google.com, yahoo.com]<br/>
    */
-  private static final Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
+  private static final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>();
   static {
+    Configuration conf = new HamaConfiguration();
+    VertexWritable.CONFIGURATION = conf;
     // our first entry is null, because our indices in hama 3.0 pre calculated
     // example starts at 1.
     // FIXME This is really ugly.
@@ -70,14 +75,16 @@ public class PageRankTest extends TestCa
       String[] adjacencyStringArray = lineArray[i].split(";");
       int vertexId = Integer.parseInt(adjacencyStringArray[0]);
       String name = pages[vertexId];
-      VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      VertexWritable<Text, DoubleWritable>[] arr = new VertexWritable[adjacencyStringArray.length - 1];
       for (int j = 1; j < adjacencyStringArray.length; j++) {
-        arr[j - 1] = new VertexWritable(
-            pages[Integer.parseInt(adjacencyStringArray[j])]);
+        arr[j - 1] = new VertexWritable<Text, DoubleWritable>(
+            new DoubleWritable(0.0d), new Text(
+                pages[Integer.parseInt(adjacencyStringArray[j])]), Text.class,
+            DoubleWritable.class);
       }
       VertexArrayWritable wr = new VertexArrayWritable();
       wr.set(arr);
-      tmp.put(new VertexWritable(name), wr);
+      tmp.put(new VertexWritable<Text, DoubleWritable>(name), wr);
     }
   }
   private static String INPUT = "/tmp/pagerank-tmp.seq";
@@ -93,65 +100,42 @@ public class PageRankTest extends TestCa
     fs = FileSystem.get(conf);
   }
 
-  public void testPageRank() throws Exception {
-    generateSeqTestData(tmp);
-    try {
-      // Usage: <input> <output> [damping factor (default 0.85)] [Epsilon
-      // (convergence error, default 0.001)] [Max iterations (default 30)]
-      // [tasks]
-      PageRank.main(new String[] { INPUT, OUTPUT, "0.85", "0.0001", "-1" });
-
-      verifyResult();
-    } finally {
-      deleteTempDirs();
-    }
-  }
-
   private void verifyResult() throws IOException {
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
-        + "/part-00000"), conf);
-    Text key = new Text();
-    DoubleWritable value = new DoubleWritable();
     double sum = 0.0;
-    while (reader.next(key, value)) {
-      sum += value.get();
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    for (FileStatus fts : globStatus) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(),
+          conf);
+      Text key = new Text();
+      DoubleWritable value = new DoubleWritable();
+
+      while (reader.next(key, value)) {
+        sum += value.get();
+      }
     }
     System.out.println("Sum is: " + sum);
-    assertTrue(sum > 0.99d && sum <= 1d);
+    assertTrue(sum > 0.99d && sum <= 1.1d);
   }
 
-  private void generateSeqTestData(Map<VertexWritable, VertexArrayWritable> tmp)
+  private void generateSeqTestData(
+      Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> tmp)
       throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
         INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+    for (Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
+        .entrySet()) {
       writer.append(e.getKey(), e.getValue());
     }
     writer.close();
   }
 
-  public void testPageRankUtil() throws IOException, InterruptedException,
-      ClassNotFoundException, InstantiationException, IllegalAccessException {
-    generateTestTextData();
-    // <input path> <output path>
-    PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
-    try {
-      PageRank
-          .main(new String[] { TEXT_OUTPUT, OUTPUT, "0.85", "0.0001", "-1" });
-
-      verifyResult();
-    } finally {
-      deleteTempDirs();
-    }
-  }
-
   public void testRepairFunctionality() throws Exception {
     // make a copy to be safe with parallel test executions
-    final Map<VertexWritable, VertexArrayWritable> map = new HashMap<VertexWritable, VertexArrayWritable>(
+    final Map<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> map = new HashMap<VertexWritable<Text, DoubleWritable>, VertexArrayWritable>(
         tmp);
     // removing google should resulting in creating it and getting the same
     // result as usual
-    map.remove(new VertexWritable("google.com"));
+    map.remove(new VertexWritable<Text, DoubleWritable>("google.com"));
     generateSeqTestData(map);
     try {
       HamaConfiguration conf = new HamaConfiguration(new Configuration());
@@ -178,21 +162,29 @@ public class PageRankTest extends TestCa
       pageJob.setOutputKeyClass(Text.class);
       pageJob.setOutputValueClass(DoubleWritable.class);
 
+      pageJob.setVertexIDClass(Text.class);
+      pageJob.setVertexValueClass(DoubleWritable.class);
+      pageJob.setEdgeValueClass(NullWritable.class);
+
       if (!pageJob.waitForCompletion(true)) {
         fail("Job did not complete normally!");
       }
+      verifyResult();
     } finally {
       deleteTempDirs();
     }
   }
 
-  private void generateTestTextData() throws IOException {
+  @SuppressWarnings("unused")
+  private static void generateTestTextData() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+    for (Map.Entry<VertexWritable<Text, DoubleWritable>, VertexArrayWritable> e : tmp
+        .entrySet()) {
       writer.write(e.getKey() + "\t");
       for (int i = 0; i < e.getValue().get().length; i++) {
-        VertexWritable writable = (VertexWritable) e.getValue().get()[i];
-        writer.write(writable.getName() + "\t");
+        VertexWritable<Text, DoubleWritable> writable = (VertexWritable<Text, DoubleWritable>) e
+            .getValue().get()[i];
+        writer.write(writable.getVertexId() + "\t");
       }
       writer.write("\n");
     }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java Fri May 18 15:35:28 2012
@@ -26,13 +26,13 @@ import java.util.Map;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.examples.util.SSSPTextToSeq;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
@@ -40,88 +40,91 @@ import org.apache.hama.graph.VertexWrita
  * Testcase for {@link ShortestPaths}
  */
 
+@SuppressWarnings("unchecked")
 public class SSSPTest extends TestCase {
 
-  private static final Map<VertexWritable, VertexArrayWritable> testData = new HashMap<VertexWritable, VertexArrayWritable>();
+  private static final Map<VertexWritable<Text, IntWritable>, VertexArrayWritable> testData = new HashMap<VertexWritable<Text, IntWritable>, VertexArrayWritable>();
 
   static {
+    Configuration conf = new Configuration();
+    VertexWritable.CONFIGURATION = conf;
     String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
         "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
         "Muenchen" };
 
     for (String city : cities) {
       if (city.equals("Frankfurt")) {
-        VertexWritable[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable(85, "Mannheim");
-        textArr[1] = new VertexWritable(173, "Kassel");
-        textArr[2] = new VertexWritable(217, "Wuerzburg");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
+        textArr[0] = new VertexWritable<Text, IntWritable>(85, "Mannheim");
+        textArr[1] = new VertexWritable<Text, IntWritable>(173, "Kassel");
+        textArr[2] = new VertexWritable<Text, IntWritable>(217, "Wuerzburg");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Stuttgart")) {
-        VertexWritable[] textArr = new VertexWritable[1];
-        textArr[0] = new VertexWritable(183, "Nuernberg");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[1];
+        textArr[0] = new VertexWritable<Text, IntWritable>(183, "Nuernberg");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Kassel")) {
-        VertexWritable[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable(502, "Muenchen");
-        textArr[1] = new VertexWritable(173, "Frankfurt");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
+        textArr[0] = new VertexWritable<Text, IntWritable>(502, "Muenchen");
+        textArr[1] = new VertexWritable<Text, IntWritable>(173, "Frankfurt");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Erfurt")) {
-        VertexWritable[] textArr = new VertexWritable[1];
-        textArr[0] = new VertexWritable(186, "Wuerzburg");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[1];
+        textArr[0] = new VertexWritable<Text, IntWritable>(186, "Wuerzburg");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Wuerzburg")) {
-        VertexWritable[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable(217, "Frankfurt");
-        textArr[1] = new VertexWritable(186, "Erfurt");
-        textArr[2] = new VertexWritable(103, "Nuernberg");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
+        textArr[0] = new VertexWritable<Text, IntWritable>(217, "Frankfurt");
+        textArr[1] = new VertexWritable<Text, IntWritable>(186, "Erfurt");
+        textArr[2] = new VertexWritable<Text, IntWritable>(103, "Nuernberg");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Mannheim")) {
-        VertexWritable[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable(80, "Karlsruhe");
-        textArr[1] = new VertexWritable(85, "Frankfurt");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
+        textArr[0] = new VertexWritable<Text, IntWritable>(80, "Karlsruhe");
+        textArr[1] = new VertexWritable<Text, IntWritable>(85, "Frankfurt");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Karlsruhe")) {
-        VertexWritable[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable(250, "Augsburg");
-        textArr[1] = new VertexWritable(80, "Mannheim");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
+        textArr[0] = new VertexWritable<Text, IntWritable>(250, "Augsburg");
+        textArr[1] = new VertexWritable<Text, IntWritable>(80, "Mannheim");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Augsburg")) {
-        VertexWritable[] textArr = new VertexWritable[2];
-        textArr[0] = new VertexWritable(250, "Karlsruhe");
-        textArr[1] = new VertexWritable(84, "Muenchen");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[2];
+        textArr[0] = new VertexWritable<Text, IntWritable>(250, "Karlsruhe");
+        textArr[1] = new VertexWritable<Text, IntWritable>(84, "Muenchen");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Nuernberg")) {
-        VertexWritable[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable(183, "Stuttgart");
-        textArr[1] = new VertexWritable(167, "Muenchen");
-        textArr[2] = new VertexWritable(103, "Wuerzburg");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
+        textArr[0] = new VertexWritable<Text, IntWritable>(183, "Stuttgart");
+        textArr[1] = new VertexWritable<Text, IntWritable>(167, "Muenchen");
+        textArr[2] = new VertexWritable<Text, IntWritable>(103, "Wuerzburg");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       } else if (city.equals("Muenchen")) {
-        VertexWritable[] textArr = new VertexWritable[3];
-        textArr[0] = new VertexWritable(167, "Nuernberg");
-        textArr[1] = new VertexWritable(502, "Kassel");
-        textArr[2] = new VertexWritable(84, "Augsburg");
+        VertexWritable<Text, IntWritable>[] textArr = new VertexWritable[3];
+        textArr[0] = new VertexWritable<Text, IntWritable>(167, "Nuernberg");
+        textArr[1] = new VertexWritable<Text, IntWritable>(502, "Kassel");
+        textArr[2] = new VertexWritable<Text, IntWritable>(84, "Augsburg");
         VertexArrayWritable arr = new VertexArrayWritable();
         arr.set(textArr);
-        testData.put(new VertexWritable(0, city), arr);
+        testData.put(new VertexWritable<Text, IntWritable>(0, city), arr);
       }
     }
   }
@@ -152,20 +155,6 @@ public class SSSPTest extends TestCase {
     }
   }
 
-  public void testShortestPathsUtil() throws IOException, InterruptedException,
-      ClassNotFoundException, InstantiationException, IllegalAccessException {
-    generateTestTextData();
-    // <input path> <output path>
-    SSSPTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
-    try {
-      SSSP.main(new String[] { "Frankfurt", TEXT_OUTPUT, OUTPUT });
-
-      verifyResult();
-    } finally {
-      deleteTempDirs();
-    }
-  }
-
   private void verifyResult() throws IOException {
     Map<String, Integer> rs = new HashMap<String, Integer>();
     rs.put("Erfurt", 403);
@@ -179,31 +168,41 @@ public class SSSPTest extends TestCase {
     rs.put("Wuerzburg", 217);
     rs.put("Karlsruhe", 165);
 
-    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
-        + "/part-00000"), conf);
-    Text key = new Text();
-    IntWritable value = new IntWritable();
-    while (reader.next(key, value)) {
-      assertEquals(value.get(), (int) rs.get(key.toString()));
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    for (FileStatus fts : globStatus) {
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, fts.getPath(),
+          conf);
+      Text key = new Text();
+      IntWritable value = new IntWritable();
+      while (reader.next(key, value)) {
+        assertEquals(value.get(), (int) rs.get(key.toString()));
+      }
     }
   }
 
   private void generateTestSequenceFileData() throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
         INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : testData.entrySet()) {
+    for (Map.Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : testData
+        .entrySet()) {
       writer.append(e.getKey(), e.getValue());
     }
     writer.close();
   }
 
-  private void generateTestTextData() throws IOException {
+  @SuppressWarnings("unused")
+  private static void generateTestTextData() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : testData.entrySet()) {
-      writer.write(e.getKey().getName() + "\t");
+    for (Map.Entry<VertexWritable<Text, IntWritable>, VertexArrayWritable> e : testData
+        .entrySet()) {
+      writer.write(e.getKey().getVertexId() + "\t");
       for (int i = 0; i < e.getValue().get().length; i++) {
-        writer.write(((VertexWritable) e.getValue().get()[i]).getName() + ":"
-            + ((VertexWritable) e.getValue().get()[i]).getWeight() + "\t");
+        writer
+            .write(((VertexWritable<Text, IntWritable>) e.getValue().get()[i])
+                .getVertexId()
+                + ":"
+                + ((VertexWritable<Text, IntWritable>) e.getValue().get()[i])
+                    .getVertexValue() + "\t");
       }
       writer.write("\n");
     }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java Fri May 18 15:35:28 2012
@@ -27,7 +27,9 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.graph.VertexArrayWritable;
@@ -55,7 +57,7 @@ public class PagerankTextToSeqTest exten
     }
   }
 
-  private void writeTextFile() throws IOException {
+  private static void writeTextFile() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TXT_INPUT));
     for (int lines = 0; lines < 10; lines++) {
       for (int cols = 0; cols < 5; cols++) {
@@ -66,20 +68,22 @@ public class PagerankTextToSeqTest exten
     writer.close();
   }
 
+  @SuppressWarnings("unchecked")
   private void verifyOutput() throws IOException {
     SequenceFile.Reader reader = new SequenceFile.Reader(fs,
         new Path(SEQ_INPUT), conf);
-    VertexWritable vertex = new VertexWritable();
+    VertexWritable<Text, DoubleWritable> vertex = new VertexWritable<Text, DoubleWritable>();
     VertexArrayWritable vertexArray = new VertexArrayWritable();
 
     while (reader.next(vertex, vertexArray)) {
       int count = 0;
-      assertEquals(vertex.getName(), count + "");
+      assertEquals(vertex.getVertexId().toString(), count + "");
       Writable[] writables = vertexArray.get();
       assertEquals(writables.length, 4);
       for (int i = 0; i < 4; i++) {
         count++;
-        assertEquals(((VertexWritable) writables[i]).getName(), count + "");
+        assertEquals(((VertexWritable<Text, DoubleWritable>) writables[i])
+            .getVertexId().toString(), count + "");
       }
     }
     reader.close();

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java Fri May 18 15:35:28 2012
@@ -27,7 +27,9 @@ import junit.framework.TestCase;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.graph.VertexArrayWritable;
@@ -56,7 +58,7 @@ public class SSSPTextToSeqTest extends T
     }
   }
 
-  private void writeTextFile() throws IOException {
+  private static void writeTextFile() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TXT_INPUT));
     for (int lines = 0; lines < 10; lines++) {
       writer.append(lines + DELIMITER);
@@ -68,22 +70,25 @@ public class SSSPTextToSeqTest extends T
     writer.close();
   }
 
+  @SuppressWarnings("unchecked")
   private void verifyOutput() throws IOException {
     SequenceFile.Reader reader = new SequenceFile.Reader(fs,
         new Path(SEQ_INPUT), conf);
-    VertexWritable vertex = new VertexWritable();
+    VertexWritable<Text, IntWritable> vertex = new VertexWritable<Text, IntWritable>();
     VertexArrayWritable vertexArray = new VertexArrayWritable();
 
     int lines = 0;
     while (reader.next(vertex, vertexArray)) {
       int count = 0;
-      assertEquals(vertex.getName(), lines + "");
-      assertEquals(vertex.getWeight(), 0);
+      assertEquals(vertex.getVertexId().toString(), lines + "");
+      assertEquals(vertex.getVertexValue().get(), 0);
       Writable[] writables = vertexArray.get();
       assertEquals(writables.length, 5);
       for (int i = 0; i < 5; i++) {
-        assertEquals(((VertexWritable) writables[i]).getName(), count + "");
-        assertEquals(((VertexWritable) writables[i]).getWeight(), lines);
+        assertEquals(((VertexWritable<Text, IntWritable>) writables[i])
+            .getVertexId().toString(), count + "");
+        assertEquals(((VertexWritable<Text, IntWritable>) writables[i])
+            .getVertexValue().get(), lines);
         count++;
       }
       lines++;

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java Fri May 18 15:35:28 2012
@@ -26,8 +26,7 @@ public class AverageAggregator extends A
 
   @Override
   public DoubleWritable finalizeAggregation() {
-    return new DoubleWritable(getValue().get()
-        / (double) getTimesAggregated().get());
+    return new DoubleWritable(getValue().get() / getTimesAggregated().get());
   }
 
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Fri May 18 15:35:28 2012
@@ -17,34 +17,45 @@
  */
 package org.apache.hama.graph;
 
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+
 /**
  * The edge class
  */
-public class Edge {
-  private String sourceVertexID;
-  private String destVertexID;
-  private int cost;
-
-  public Edge(String sourceVertexID, String destVertexID, int cost) {
-    this.sourceVertexID = sourceVertexID;
-    this.destVertexID = destVertexID;
-    this.cost = cost;
+public class Edge<VERTEX_ID, EDGE_VALUE_TYPE extends Writable> {
+
+  private VERTEX_ID destinationVertexID;
+  // actually the destination peer address
+  private String destinationPeerName;
+  private EDGE_VALUE_TYPE cost;
+
+  public Edge(VERTEX_ID sourceVertexID, String destVertexID,
+      EDGE_VALUE_TYPE cost) {
+    this.destinationVertexID = sourceVertexID;
+    this.destinationPeerName = destVertexID;
+    if (cost instanceof NullWritable) {
+      this.cost = null;
+    } else {
+      this.cost = cost;
+    }
   }
 
-  public String getName() {
-    return sourceVertexID;
+  public VERTEX_ID getDestinationVertexID() {
+    return destinationVertexID;
   }
 
-  public String getDestVertexID() {
-    return destVertexID;
+  public String getDestinationPeerName() {
+    return destinationPeerName;
   }
 
-  public int getCost() {
+  public EDGE_VALUE_TYPE getValue() {
     return cost;
   }
 
+  @Override
   public String toString() {
-    return this.getName() + " -> " + this.getDestVertexID() + ":"
-        + this.getCost();
+    return this.destinationVertexID + ":" + this.getValue() + " (resides on "
+        + destinationPeerName + ")";
   }
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Fri May 18 15:35:28 2012
@@ -19,6 +19,8 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSPJob;
@@ -27,36 +29,76 @@ import org.apache.hama.bsp.Combiner;
 public class GraphJob extends BSPJob {
 
   public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
+  public final static String VERTEX_ID_CLASS_ATTR = "hama.graph.vertex.id.class";
+  public final static String VERTEX_VALUE_CLASS_ATTR = "hama.graph.vertex.id.class";
+  public final static String VERTEX_EDGE_VALUE_CLASS_ATTR = "hama.graph.vertex.edge.value.class";
+
   public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
   public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
 
+  /**
+   * Creates a new Graph Job with the given configuration and an exampleClass.
+   * The exampleClass is used to determine the user's jar to distribute in the
+   * cluster. This constructor sets the vertex id class to {@link Text}, the
+   * vertex value class to {@link IntWritable} and the edge value class to
+   * {@link IntWritable}.
+   */
   public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
       throws IOException {
     super(conf);
+    VertexWritable.CONFIGURATION = conf;
     this.setBspClass(GraphJobRunner.class);
     this.setJarByClass(exampleClass);
+    this.setVertexIDClass(Text.class);
+    this.setVertexValueClass(IntWritable.class);
+    this.setEdgeValueClass(IntWritable.class);
   }
 
   /**
    * Set the Vertex class for the job.
    */
-  public void setVertexClass(Class<? extends Vertex<? extends Writable>> cls)
+  public void setVertexClass(
+      Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> cls)
       throws IllegalStateException {
     conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
   }
 
   /**
+   * Set the Vertex ID class for the job.
+   */
+  public void setVertexIDClass(Class<? extends Writable> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_ID_CLASS_ATTR, cls, Writable.class);
+  }
+
+  /**
+   * Set the Vertex value class for the job.
+   */
+  public void setVertexValueClass(Class<? extends Writable> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_VALUE_CLASS_ATTR, cls, Writable.class);
+  }
+
+  /**
+   * Set the Edge value class for the job.
+   */
+  public void setEdgeValueClass(Class<? extends Writable> cls)
+      throws IllegalStateException {
+    conf.setClass(VERTEX_EDGE_VALUE_CLASS_ATTR, cls, Writable.class);
+  }
+
+  /**
    * Set the aggregator for the job.
    */
-  public void setAggregatorClass(
-      Class<? extends Aggregator<? extends Writable>> cls) {
+  public void setAggregatorClass(@SuppressWarnings("rawtypes")
+  Class<? extends Aggregator> cls) {
     conf.setClass(AGGREGATOR_CLASS_ATTR, cls, Aggregator.class);
   }
 
   @SuppressWarnings("unchecked")
-  public Class<? extends Vertex<? extends Writable>> getVertexClass() {
-    return (Class<? extends Vertex<? extends Writable>>) conf.getClass(
-        VERTEX_CLASS_ATTR, Vertex.class);
+  public Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>> getVertexClass() {
+    return (Class<? extends Vertex<? extends Writable, ? extends Writable, ? extends Writable>>) conf
+        .getClass(VERTEX_CLASS_ATTR, Vertex.class);
   }
 
   @Override

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Fri May 18 15:35:28 2012
@@ -42,10 +42,18 @@ import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
 
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class GraphJobRunner extends BSP {
+/**
+ * Fully generic graph job runner.
+ * 
+ * @param <VERTEX_ID> the id type of a vertex.
+ * @param <VERTEX_VALUE> the value type of a vertex.
+ * @param <VERTEX_VALUE> the value type of an edge.
+ */
+public class GraphJobRunner<VERTEX_ID extends Writable, VERTEX_VALUE extends Writable, EDGE_VALUE_TYPE extends Writable>
+    extends
+    BSP<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> {
 
-  public static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+  private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
 
   // make sure that these values don't collide with the vertex names
   private static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
@@ -63,17 +71,17 @@ public class GraphJobRunner extends BSP 
   public static final String GRAPH_REPAIR = "hama.graph.repair";
 
   private Configuration conf;
-  private Combiner<? extends Writable> combiner;
+  private Combiner<VERTEX_VALUE> combiner;
 
-  private Aggregator<Writable> aggregator;
+  private Aggregator<VERTEX_VALUE> aggregator;
   private Writable globalAggregatorResult;
   private IntWritable globalAggregatorIncrement;
   private boolean isAbstractAggregator;
 
   // aggregator on the master side
-  private Aggregator<Writable> masterAggregator;
+  private Aggregator<VERTEX_VALUE> masterAggregator;
 
-  private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
+  private Map<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> vertices = new HashMap<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>>();
 
   private String masterTask;
   private boolean updated = true;
@@ -84,19 +92,36 @@ public class GraphJobRunner extends BSP 
   private int maxIteration = -1;
   private long iteration;
 
-  public void setup(BSPPeer peer) throws IOException, SyncException,
-      InterruptedException {
+  // aimed to be accessed by vertex writables to serialize stuff
+  Class<VERTEX_ID> vertexIdClass;
+  Class<VERTEX_VALUE> vertexValueClass;
+  Class<EDGE_VALUE_TYPE> edgeValueClass;
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void setup(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      throws IOException, SyncException, InterruptedException {
     this.conf = peer.getConfiguration();
+    VertexWritable.CONFIGURATION = conf;
     // Choose one as a master to collect global updates
     this.masterTask = peer.getPeerName(0);
 
+    vertexIdClass = (Class<VERTEX_ID>) conf.getClass(
+        GraphJob.VERTEX_ID_CLASS_ATTR, Text.class, Writable.class);
+    vertexValueClass = (Class<VERTEX_VALUE>) conf.getClass(
+        GraphJob.VERTEX_VALUE_CLASS_ATTR, IntWritable.class, Writable.class);
+    edgeValueClass = (Class<EDGE_VALUE_TYPE>) conf.getClass(
+        GraphJob.VERTEX_EDGE_VALUE_CLASS_ATTR, IntWritable.class,
+        Writable.class);
+
     boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
 
     if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
         Combiner.class)) {
       LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
 
-      combiner = (Combiner<? extends Writable>) ReflectionUtils.newInstance(
+      combiner = (Combiner<VERTEX_VALUE>) ReflectionUtils.newInstance(
           conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
           conf);
     }
@@ -117,16 +142,17 @@ public class GraphJobRunner extends BSP 
     loadVertices(peer, repairNeeded);
     numberVertices = vertices.size() * peer.getNumPeers();
     // TODO refactor this to a single step
-    for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
-      LinkedList<Writable> msgIterator = new LinkedList<Writable>();
-      Vertex v = e.getValue();
+    for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
+        .entrySet()) {
+      LinkedList<VERTEX_VALUE> msgIterator = new LinkedList<VERTEX_VALUE>();
+      Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> v = e.getValue();
       msgIterator.add(v.getValue());
-      Writable lastValue = v.getValue();
+      VERTEX_VALUE lastValue = v.getValue();
       v.compute(msgIterator.iterator());
       if (aggregator != null) {
         aggregator.aggregate(v.getValue());
         if (isAbstractAggregator) {
-          AbstractAggregator intern = ((AbstractAggregator) aggregator);
+          AbstractAggregator<VERTEX_VALUE> intern = ((AbstractAggregator<VERTEX_VALUE>) aggregator);
           intern.aggregate(lastValue, v.getValue());
           intern.aggregateInternal();
         }
@@ -136,8 +162,9 @@ public class GraphJobRunner extends BSP 
   }
 
   @Override
-  public void bsp(BSPPeer peer) throws IOException, SyncException,
-      InterruptedException {
+  public void bsp(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      throws IOException, SyncException, InterruptedException {
 
     maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
         -1);
@@ -147,7 +174,7 @@ public class GraphJobRunner extends BSP 
       peer.sync();
 
       // Map <vertexID, messages>
-      final Map<String, LinkedList<Writable>> messages = parseMessages(peer);
+      final Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> messages = parseMessages(peer);
       // use iterations here, since repair can skew the number of supersteps
       if (isMasterTask(peer) && iteration > 1) {
         MapWritable updatedCnt = new MapWritable();
@@ -159,7 +186,7 @@ public class GraphJobRunner extends BSP 
           if (aggregator != null) {
             Writable lastAggregatedValue = masterAggregator.getValue();
             if (isAbstractAggregator) {
-              final AbstractAggregator intern = ((AbstractAggregator) aggregator);
+              final AbstractAggregator<VERTEX_VALUE> intern = ((AbstractAggregator<VERTEX_VALUE>) aggregator);
               final Writable finalizeAggregation = intern.finalizeAggregation();
               if (intern.finalizeAggregation() != null) {
                 lastAggregatedValue = finalizeAggregation;
@@ -198,23 +225,24 @@ public class GraphJobRunner extends BSP 
       }
 
       int messagesSize = messages.size();
-      Iterator<Entry<String, LinkedList<Writable>>> iterator = messages
+      Iterator<Entry<VERTEX_ID, LinkedList<VERTEX_VALUE>>> iterator = messages
           .entrySet().iterator();
       while (iterator.hasNext()) {
-        Entry<String, LinkedList<Writable>> e = iterator.next();
-        LinkedList msgs = e.getValue();
+        Entry<VERTEX_ID, LinkedList<VERTEX_VALUE>> e = iterator.next();
+        LinkedList<VERTEX_VALUE> msgs = e.getValue();
         if (combiner != null) {
-          Writable combined = combiner.combine(msgs);
-          msgs = new LinkedList();
+          VERTEX_VALUE combined = combiner.combine(msgs);
+          msgs = new LinkedList<VERTEX_VALUE>();
           msgs.add(combined);
         }
-        Vertex vertex = vertices.get(e.getKey());
-        Writable lastValue = vertex.getValue();
+        Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = vertices
+            .get(e.getKey());
+        VERTEX_VALUE lastValue = vertex.getValue();
         vertex.compute(msgs.iterator());
         if (aggregator != null) {
           aggregator.aggregate(vertex.getValue());
           if (isAbstractAggregator) {
-            AbstractAggregator intern = ((AbstractAggregator) aggregator);
+            AbstractAggregator<VERTEX_VALUE> intern = ((AbstractAggregator<VERTEX_VALUE>) aggregator);
             intern.aggregate(lastValue, vertex.getValue());
             intern.aggregateInternal();
           }
@@ -230,7 +258,8 @@ public class GraphJobRunner extends BSP 
         updatedCnt.put(FLAG_AGGREGATOR_VALUE, aggregator.getValue());
         if (isAbstractAggregator) {
           updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
-              ((AbstractAggregator) aggregator).getTimesAggregated());
+              ((AbstractAggregator<VERTEX_VALUE>) aggregator)
+                  .getTimesAggregated());
         }
       }
       peer.send(masterTask, updatedCnt);
@@ -238,36 +267,37 @@ public class GraphJobRunner extends BSP 
     }
   }
 
-  private Map<String, LinkedList<Writable>> parseMessages(BSPPeer peer)
+  @SuppressWarnings("unchecked")
+  private Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> parseMessages(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
       throws IOException {
     MapWritable msg = null;
-    Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
+    Map<VERTEX_ID, LinkedList<VERTEX_VALUE>> msgMap = new HashMap<VERTEX_ID, LinkedList<VERTEX_VALUE>>();
     while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
       for (Entry<Writable, Writable> e : msg.entrySet()) {
-        String vertexID = ((Text) e.getKey()).toString();
-        if (vertexID.equals(S_FLAG_MESSAGE_COUNTS)) {
+        VERTEX_ID vertexID = (VERTEX_ID) e.getKey();
+        if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
           if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
             updated = false;
           } else {
             globalUpdateCounts += ((IntWritable) e.getValue()).get();
           }
+        } else if (aggregator != null && FLAG_AGGREGATOR_VALUE.equals(vertexID)) {
+          masterAggregator.aggregate((VERTEX_VALUE) e.getValue());
         } else if (aggregator != null
-            && vertexID.equals(S_FLAG_AGGREGATOR_VALUE)) {
-          masterAggregator.aggregate(e.getValue());
-        } else if (aggregator != null
-            && vertexID.equals(S_FLAG_AGGREGATOR_INCREMENT)) {
+            && FLAG_AGGREGATOR_INCREMENT.equals(vertexID)) {
           if (isAbstractAggregator) {
-            ((AbstractAggregator) masterAggregator)
+            ((AbstractAggregator<VERTEX_VALUE>) masterAggregator)
                 .addTimesAggregated(((IntWritable) e.getValue()).get());
           }
         } else {
-          Writable value = e.getValue();
+          VERTEX_VALUE value = (VERTEX_VALUE) e.getValue();
           if (msgMap.containsKey(vertexID)) {
-            LinkedList<Writable> msgs = msgMap.get(vertexID);
+            LinkedList<VERTEX_VALUE> msgs = msgMap.get(vertexID);
             msgs.add(value);
             msgMap.put(vertexID, msgs);
           } else {
-            LinkedList<Writable> msgs = new LinkedList<Writable>();
+            LinkedList<VERTEX_VALUE> msgs = new LinkedList<VERTEX_VALUE>();
             msgs.add(value);
             msgMap.put(vertexID, msgs);
           }
@@ -277,37 +307,43 @@ public class GraphJobRunner extends BSP 
     return msgMap;
   }
 
-  private void loadVertices(BSPPeer peer, boolean repairNeeded)
-      throws IOException {
+  @SuppressWarnings("unchecked")
+  private void loadVertices(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer,
+      boolean repairNeeded) throws IOException {
     LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
-    KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;
+    KeyValuePair<? extends VertexWritable<VERTEX_ID, VERTEX_VALUE>, ? extends VertexArrayWritable> next = null;
     while ((next = peer.readNext()) != null) {
-      Vertex<? extends Writable> vertex = (Vertex<? extends Writable>) ReflectionUtils
+      Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
           .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class),
               conf);
 
-      vertex.setVertexID(next.getKey().getName());
+      vertex.setVertexID(next.getKey().getVertexId());
       vertex.peer = peer;
       vertex.runner = this;
 
-      VertexWritable[] arr = (VertexWritable[]) next.getValue().toArray();
+      VertexWritable<VERTEX_ID, VERTEX_VALUE>[] arr = (VertexWritable[]) next
+          .getValue().toArray();
       if (selfReference) {
-        VertexWritable[] tmp = new VertexWritable[arr.length + 1];
+        VertexWritable<VERTEX_ID, VERTEX_VALUE>[] tmp = new VertexWritable[arr.length + 1];
         System.arraycopy(arr, 0, tmp, 0, arr.length);
-        tmp[arr.length] = new VertexWritable(vertex.getVertexID());
+        tmp[arr.length] = new VertexWritable<VERTEX_ID, VERTEX_VALUE>(
+            vertex.getValue(), vertex.getVertexID(), vertexIdClass,
+            vertexValueClass);
         arr = tmp;
       }
-      List<Edge> edges = new ArrayList<Edge>();
-      for (VertexWritable e : arr) {
+      List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> edges = new ArrayList<Edge<VERTEX_ID, EDGE_VALUE_TYPE>>();
+      for (VertexWritable<VERTEX_ID, VERTEX_VALUE> e : arr) {
         String target = peer.getPeerName(Math.abs((e.hashCode() % peer
             .getAllPeerNames().length)));
-        edges.add(new Edge(e.getName(), target, e.getWeight()));
+        edges.add(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(e.getVertexId(), target,
+            (EDGE_VALUE_TYPE) e.getVertexValue()));
       }
 
       vertex.edges = edges;
       vertex.setup(conf);
-      vertices.put(next.getKey().getName(), vertex);
+      vertices.put(next.getKey().getVertexId(), vertex);
     }
 
     /*
@@ -319,11 +355,12 @@ public class GraphJobRunner extends BSP 
      */
     if (repairNeeded) {
       LOG.debug("Starting repair of this graph!");
-      final Collection<Vertex> entries = vertices.values();
-      for (Vertex entry : entries) {
-        List<Edge> outEdges = entry.getOutEdges();
-        for (Edge e : outEdges) {
-          peer.send(e.getDestVertexID(), new Text(e.getName()));
+      final Collection<Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> entries = vertices
+          .values();
+      for (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> entry : entries) {
+        List<Edge<VERTEX_ID, EDGE_VALUE_TYPE>> outEdges = entry.getOutEdges();
+        for (Edge<VERTEX_ID, EDGE_VALUE_TYPE> e : outEdges) {
+          peer.send(e.getDestinationPeerName(), e.getDestinationVertexID());
         }
       }
       try {
@@ -332,26 +369,26 @@ public class GraphJobRunner extends BSP 
         // we can't really recover from that, so fail this task
         throw new RuntimeException(e);
       }
-      Text vertexName = null;
-      while ((vertexName = (Text) peer.getCurrentMessage()) != null) {
-        String vName = vertexName.toString();
-        if (!vertices.containsKey(vName)) {
-          Vertex<? extends Writable> vertex = (Vertex<? extends Writable>) ReflectionUtils
+      VERTEX_ID vertexName = null;
+      while ((vertexName = (VERTEX_ID) peer.getCurrentMessage()) != null) {
+        if (!vertices.containsKey(vertexName)) {
+          Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE> vertex = (Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>) ReflectionUtils
               .newInstance(
                   conf.getClass("hama.graph.vertex.class", Vertex.class), conf);
           vertex.peer = peer;
-          vertex.setVertexID(vName);
+          vertex.setVertexID(vertexName);
           vertex.runner = this;
           if (selfReference) {
             String target = peer.getPeerName(Math.abs((vertex.hashCode() % peer
                 .getAllPeerNames().length)));
             vertex.edges = Collections
-                .singletonList(new Edge(vertex.getVertexID(), target, 0));
+                .singletonList(new Edge<VERTEX_ID, EDGE_VALUE_TYPE>(vertex
+                    .getVertexID(), target, null));
           } else {
             vertex.edges = Collections.emptyList();
           }
           vertex.setup(conf);
-          vertices.put(vName, vertex);
+          vertices.put(vertexName, vertex);
         }
       }
     }
@@ -359,20 +396,26 @@ public class GraphJobRunner extends BSP 
   }
 
   /**
-   * Just write <new Text(vertexID), (Writable) value> pair as a result
+   * Just write <ID as Writable, Value as Writable> pair as a result
    */
-  public void cleanup(BSPPeer peer) throws IOException {
-    for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
-      peer.write(new Text(e.getValue().getVertexID()), e.getValue().getValue());
+  @Override
+  public void cleanup(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer)
+      throws IOException {
+    for (Entry<VERTEX_ID, Vertex<VERTEX_ID, VERTEX_VALUE, EDGE_VALUE_TYPE>> e : vertices
+        .entrySet()) {
+      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
     }
   }
 
-  private Aggregator<Writable> getNewAggregator() {
-    return (Aggregator<Writable>) ReflectionUtils.newInstance(
+  @SuppressWarnings("unchecked")
+  private Aggregator<VERTEX_VALUE> getNewAggregator() {
+    return (Aggregator<VERTEX_VALUE>) ReflectionUtils.newInstance(
         conf.getClass("hama.graph.aggregator.class", Aggregator.class), conf);
   }
 
-  private boolean isMasterTask(BSPPeer peer) {
+  private boolean isMasterTask(
+      BSPPeer<VertexWritable<VERTEX_ID, VERTEX_VALUE>, VertexArrayWritable, Writable, Writable, Writable> peer) {
     return peer.getPeerName().equals(masterTask);
   }
 

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java?rev=1340131&r1=1340130&r2=1340131&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java Fri May 18 15:35:28 2012
@@ -23,12 +23,14 @@ public class MaxAggregator extends Abstr
 
   int max = Integer.MIN_VALUE;
 
+  @Override
   public void aggregate(IntWritable value) {
     if (value.get() > max) {
       max = value.get();
     }
   }
 
+  @Override
   public IntWritable getValue() {
     return new IntWritable(max);
   }



Mime
View raw message