hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1335670 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ examples/src/test/java...
Date Tue, 08 May 2012 18:13:41 GMT
Author: tjungblut
Date: Tue May  8 18:13:39 2012
New Revision: 1335670

URL: http://svn.apache.org/viewvc?rev=1335670&view=rev
Log:
[HAMA-556]: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank

Added:
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue May  8 18:13:39 2012
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.5 - April 10, 2012 
 
   NEW FEATURES
-
+   
+   HAMA-556: Graph package to support stopping the interations when the node changes are within the tolerance value as in the case of page rank (tjungblut)
    HAMA-508: Add clean plugin (Mikalai Parafeniuk via edwardyoon)
    HAMA-503: Chainable computations for fault tolerance (tjungblut)
    HAMA-517: Add documentation for Graph package (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Tue May  8 18:13:39 2012
@@ -52,7 +52,10 @@ public final class BSPPeerImpl<K1, V1, K
   private static final Log LOG = LogFactory.getLog(BSPPeerImpl.class);
 
   public static enum PeerCounter {
-    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS, IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED, TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT, COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
+    SUPERSTEP_SUM, SUPERSTEPS, TASK_INPUT_RECORDS, TASK_OUTPUT_RECORDS,
+    IO_BYTES_READ, MESSAGE_BYTES_TRANSFERED, MESSAGE_BYTES_RECEIVED,
+    TOTAL_MESSAGES_SENT, TOTAL_MESSAGES_RECEIVED, COMPRESSED_BYTES_SENT,
+    COMPRESSED_BYTES_RECEIVED, TIME_IN_SYNC_MS
   }
 
   private final Configuration conf;

Modified: incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ incubator/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Tue May  8 18:13:39 2012
@@ -109,6 +109,7 @@ public class TestCheckpoint extends Test
 
     try {
       BSPJob job = new BSPJob(conf);
+      job.setOutputFormat(NullOutputFormat.class);
       final BSPPeerProtocol proto = (BSPPeerProtocol) RPC.getProxy(
           BSPPeerProtocol.class, BSPPeerProtocol.versionID,
           new InetSocketAddress("127.0.0.1", port), conf);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java Tue May  8 18:13:39 2012
@@ -29,11 +29,14 @@ public class ExampleDriver {
     ProgramDriver pgd = new ProgramDriver();
     try {
       pgd.addClass("pi", PiEstimator.class, "Pi Estimator");
-      pgd.addClass("sssp-text2seq", SSSPTextToSeq.class, "Generates SSSP input from textfile");
+      pgd.addClass("sssp-text2seq", SSSPTextToSeq.class,
+          "Generates SSSP input from textfile");
       pgd.addClass("sssp", SSSP.class, "Single Shortest Path");
+      pgd.addClass("mdstsearch", MindistSearch.class, "Mindist search / Connected Components");
       pgd.addClass("cmb", CombineExample.class, "Combine");
       pgd.addClass("bench", RandBench.class, "Random Benchmark");
-      pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class, "Generates Pagerank input from textfile");
+      pgd.addClass("pagerank-text2seq", PagerankTextToSeq.class,
+          "Generates Pagerank and mindist search input from textfile");
       pgd.addClass("pagerank", PageRank.class, "PageRank");
       pgd.driver(args);
     } catch (Throwable e) {

Added: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (added)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Tue May  8 18:13:39 2012
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.graph.Edge;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.Vertex;
+
+/**
+ * Finding the mindist vertex in a connected component.
+ */
+public class MindistSearch {
+
+  /*
+   * Make sure that you know that you're comparing text, and not integers!
+   */
+  public static class MindistSearchVertex extends Vertex<Text> {
+
+    @Override
+    public void compute(Iterator<Text> messages) throws IOException {
+      Text currentComponent = getValue();
+      if (getSuperstepCount() == 0L) {
+        // if we have no associated component, pick the lowest in our direct
+        // neighbourhood.
+        if (currentComponent == null) {
+          setValue(new Text(getVertexID()));
+          for (Edge e : edges) {
+            String id = getVertexID();
+            if (id.compareTo(e.getName()) > 0) {
+              setValue(new Text(e.getName()));
+            }
+          }
+          sendMessageToNeighbors(getValue());
+        }
+      } else {
+        boolean updated = false;
+        while (messages.hasNext()) {
+          Text next = messages.next();
+          if (currentComponent.compareTo(next) > 0) {
+            updated = true;
+            setValue(next);
+          }
+        }
+        if (updated) {
+          sendMessageToNeighbors(getValue());
+        }
+      }
+    }
+  }
+
+  public static class MinTextCombiner extends Combiner<Text> {
+
+    @Override
+    public Text combine(Iterable<Text> messages) {
+      Text min = null;
+      for (Text m : messages) {
+        if (min == null || min.compareTo(m) > 0) {
+          min = m;
+        }
+      }
+      return min;
+    }
+
+  }
+
+  private static void printUsage() {
+    System.out
+        .println("Usage: <input> <output> [maximum iterations (default 30)] [tasks]");
+    System.exit(-1);
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    if (args.length < 2)
+      printUsage();
+
+    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    GraphJob connectedComponentsJob = new GraphJob(conf,
+        MindistSearchVertex.class);
+    connectedComponentsJob.setJobName("Mindist Search");
+
+    connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
+    connectedComponentsJob.setInputPath(new Path(args[0]));
+    connectedComponentsJob.setOutputPath(new Path(args[1]));
+    // set the min text combiner here
+    connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
+
+    // set the defaults
+    connectedComponentsJob.setMaxIteration(30);
+    if (args.length == 4)
+      connectedComponentsJob.setNumBspTask(Integer.parseInt(args[3]));
+    if (args.length >= 3)
+      connectedComponentsJob.setMaxIteration(Integer.parseInt(args[2]));
+
+    connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
+    connectedComponentsJob.setPartitioner(HashPartitioner.class);
+    connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
+    connectedComponentsJob.setOutputKeyClass(Text.class);
+    connectedComponentsJob.setOutputValueClass(Text.class);
+
+    long startTime = System.currentTimeMillis();
+    if (connectedComponentsJob.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+    }
+  }
+
+}

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Tue May  8 18:13:39 2012
@@ -28,6 +28,7 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
 
@@ -35,33 +36,57 @@ public class PageRank {
 
   public static class PageRankVertex extends Vertex<DoubleWritable> {
 
+    static double DAMPING_FACTOR = 0.85;
+    static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
+
+    int numEdges;
+
+    @Override
+    public void setup(Configuration conf) {
+      String val = conf.get("hama.pagerank.alpha");
+      if (val != null) {
+        DAMPING_FACTOR = Double.parseDouble(val);
+      }
+      val = conf.get("hama.graph.max.convergence.error");
+      if (val != null) {
+        MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
+      }
+      numEdges = this.getOutEdges().size();
+    }
+
     @Override
     public void compute(Iterator<DoubleWritable> messages) throws IOException {
+      // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
         this.setValue(new DoubleWritable(1.0 / (double) this.getNumVertices()));
       }
 
+      // in the first superstep, there are no messages to check
       if (this.getSuperstepCount() >= 1) {
         double sum = 0;
         while (messages.hasNext()) {
           DoubleWritable msg = messages.next();
           sum += msg.get();
         }
-
-        double ALPHA = (1 - 0.85) / (double) this.getNumVertices();
-        this.setValue(new DoubleWritable(ALPHA + (0.85 * sum)));
+        double alpha = (1.0d - DAMPING_FACTOR) / (double) this.getNumVertices();
+        this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
       }
 
-      if (this.getSuperstepCount() < this.getMaxIteration()) {
-        int numEdges = this.getOutEdges().size();
-        sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
-            / numEdges));
+      // if we have not reached our global error yet, then proceed.
+      DoubleWritable globalError = getLastAggregatedValue();
+      if (globalError != null && this.getSuperstepCount() > 2
+          && MAXIMUM_CONVERGENCE_ERROR > globalError.get()) {
+        return;
       }
+      // in each superstep we are going to send a new rank to our neighbours
+      sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
+          / numEdges));
     }
   }
 
   private static void printUsage() {
-    System.out.println("Usage: <input> <output> [tasks]");
+    System.out
+        .println("Usage: <input> <output> [damping factor (default 0.85)] [Epsilon (convergence error, default 0.001)] [Max iterations (default 30)] [tasks]");
     System.exit(-1);
   }
 
@@ -75,14 +100,26 @@ public class PageRank {
     pageJob.setJobName("Pagerank");
 
     pageJob.setVertexClass(PageRankVertex.class);
-    pageJob.setMaxIteration(30);
-
     pageJob.setInputPath(new Path(args[0]));
     pageJob.setOutputPath(new Path(args[1]));
 
-    if (args.length == 3) {
-      pageJob.setNumBspTask(Integer.parseInt(args[2]));
-    }
+    // set the defaults
+    pageJob.setMaxIteration(30);
+    pageJob.set("hama.pagerank.alpha", "0.85");
+    // we need to include a vertex in its adjacency list,
+    // otherwise the pagerank result has a constant loss
+    pageJob.set("hama.graph.self.ref", "true");
+
+    if (args.length == 6)
+      pageJob.setNumBspTask(Integer.parseInt(args[5]));
+    if (args.length >= 5)
+      pageJob.setMaxIteration(Integer.parseInt(args[4]));
+    if (args.length >= 4)
+      pageJob.set("hama.graph.max.convergence.error", args[3]);
+    if (args.length >= 3)
+      pageJob.set("hama.pagerank.alpha", args[2]);
+
+    pageJob.setAggregatorClass(AverageAggregator.class);
 
     pageJob.setInputFormat(SequenceFileInputFormat.class);
     pageJob.setPartitioner(HashPartitioner.class);

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PiEstimator.java Tue May  8 18:13:39 2012
@@ -41,7 +41,8 @@ import org.apache.hama.bsp.TextOutputFor
 import org.apache.hama.bsp.sync.SyncException;
 
 public class PiEstimator {
-  private static Path TMP_OUTPUT = new Path("/tmp/pi-" + System.currentTimeMillis());
+  private static Path TMP_OUTPUT = new Path("/tmp/pi-"
+      + System.currentTimeMillis());
 
   public static class MyEstimator extends
       BSP<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> {
@@ -54,13 +55,11 @@ public class PiEstimator {
         BSPPeer<NullWritable, NullWritable, Text, DoubleWritable, DoubleWritable> peer)
         throws IOException, SyncException, InterruptedException {
 
-      int in = 0, out = 0;
+      int in = 0;
       for (int i = 0; i < iterations; i++) {
         double x = 2.0 * Math.random() - 1.0, y = 2.0 * Math.random() - 1.0;
         if ((Math.sqrt(x * x + y * y) < 1.0)) {
           in++;
-        } else {
-          out++;
         }
       }
 

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/RandBench.java Tue May  8 18:13:39 2012
@@ -78,7 +78,8 @@ public class RandBench {
     public void setup(
         BSPPeer<NullWritable, NullWritable, NullWritable, NullWritable, BytesWritable> peer) {
       this.sizeOfMsg = peer.getConfiguration().getInt(SIZEOFMSG, 1);
-      this.nCommunications = peer.getConfiguration().getInt(N_COMMUNICATIONS, 1);
+      this.nCommunications = peer.getConfiguration()
+          .getInt(N_COMMUNICATIONS, 1);
       this.nSupersteps = peer.getConfiguration().getInt(N_SUPERSTEPS, 1);
     }
   }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Tue May  8 18:13:39 2012
@@ -35,6 +35,7 @@ import org.apache.hama.graph.VertexArray
 import org.apache.hama.graph.VertexWritable;
 
 public class SSSP {
+
   public static final String START_VERTEX = "shortest.paths.start.vertex.name";
 
   public static class ShortestPathVertex extends Vertex<IntWritable> {

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/CombineExampleTest.java Tue May  8 18:13:39 2012
@@ -28,7 +28,7 @@ public class CombineExampleTest {
   @Test
   public void testCorrectCombineExecution() {
     try {
-      CombineExample.main(new String[]{});
+      CombineExample.main(new String[] {});
     } catch (Exception e) {
       fail(e.getLocalizedMessage());
     }

Added: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java (added)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java Tue May  8 18:13:39 2012
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedWriter;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.examples.MindistSearch.MinTextCombiner;
+import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.graph.VertexArrayWritable;
+import org.apache.hama.graph.VertexWritable;
+
+public class MindistSearchTest extends TestCase {
+
+  private static final Map<VertexWritable, VertexArrayWritable> tmp = new HashMap<VertexWritable, VertexArrayWritable>();
+  // mapping of our index of the vertex to the resulting component id
+  private static final String[] resultList = new String[] { "0", "1", "2", "2",
+      "1", "2", "2", "1", "2", "0" };
+  static {
+    String[] pages = new String[] { "0", "1", "2", "3", "4", "5", "6", "7",
+        "8", "9" };
+    String[] lineArray = new String[] { "0", "1;4;7", "2;3;8", "3;5", "4;1",
+        "5;6", "6", "7", "8;3", "9;0" };
+
+    for (int i = 0; i < lineArray.length; i++) {
+      String[] adjacencyStringArray = lineArray[i].split(";");
+      int vertexId = Integer.parseInt(adjacencyStringArray[0]);
+      String name = pages[vertexId];
+      VertexWritable[] arr = new VertexWritable[adjacencyStringArray.length - 1];
+      for (int j = 1; j < adjacencyStringArray.length; j++) {
+        arr[j - 1] = new VertexWritable(
+            pages[Integer.parseInt(adjacencyStringArray[j])]);
+      }
+      VertexArrayWritable wr = new VertexArrayWritable();
+      wr.set(arr);
+      tmp.put(new VertexWritable(name), wr);
+    }
+  }
+  private static String INPUT = "/tmp/pagerank-tmp.seq";
+  private static String TEXT_INPUT = "/tmp/pagerank.txt";
+  private static String TEXT_OUTPUT = INPUT + "pagerank.txt.seq";
+  private static String OUTPUT = "/tmp/pagerank-out";
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  public void testPageRank() throws Exception {
+    generateSeqTestData();
+    try {
+      MindistSearch.main(new String[] { INPUT, OUTPUT });
+
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  public void testMinTextCombiner() throws Exception {
+    MinTextCombiner combiner = new MinTextCombiner();
+    Text a = new Text("1");
+    Text b = new Text("2");
+    Text d = new Text("4");
+    Text c = new Text("3");
+    List<Text> asList = Arrays.asList(new Text[] { a, b, c, d });
+    Text combine = combiner.combine(asList);
+    assertEquals(combine, a);
+  }
+
+  private void verifyResult() throws IOException {
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path(OUTPUT
+        + "/part-00000"), conf);
+    Text key = new Text();
+    Writable value = new Text();
+    while (reader.next(key, value)) {
+      System.out.println(key + " | " + value);
+      assertEquals(resultList[Integer.parseInt(key.toString())],
+          value.toString());
+    }
+  }
+
+  private void generateSeqTestData() throws IOException {
+    SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
+        INPUT), VertexWritable.class, VertexArrayWritable.class);
+    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+      writer.append(e.getKey(), e.getValue());
+    }
+    writer.close();
+  }
+
+  public void testPageRankUtil() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+    generateTestTextData();
+    // <input path> <output path>
+    PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
+    try {
+      MindistSearch.main(new String[] { TEXT_OUTPUT, OUTPUT });
+
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  private void generateTestTextData() throws IOException {
+    BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
+    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+      writer.write(e.getKey() + "\t");
+      for (int i = 0; i < e.getValue().get().length; i++) {
+        VertexWritable writable = (VertexWritable) e.getValue().get()[i];
+        writer.write(writable.getName() + "\t");
+      }
+      writer.write("\n");
+    }
+    writer.close();
+  }
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(INPUT)))
+        fs.delete(new Path(INPUT), true);
+      if (fs.exists(new Path(OUTPUT)))
+        fs.delete(new Path(OUTPUT), true);
+      if (fs.exists(new Path(TEXT_INPUT)))
+        fs.delete(new Path(TEXT_INPUT), true);
+      if (fs.exists(new Path(TEXT_OUTPUT)))
+        fs.delete(new Path(TEXT_OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+}

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Tue May  8 18:13:39 2012
@@ -89,7 +89,10 @@ public class PageRankTest extends TestCa
   public void testPageRank() throws Exception {
     generateSeqTestData();
     try {
-      PageRank.main(new String[] { INPUT, OUTPUT, "1" });
+      // Usage: <input> <output> [damping factor (default 0.85)] [Epsilon
+      // (convergence error, default 0.001)] [Max iterations (default 30)]
+      // [tasks]
+      PageRank.main(new String[] { INPUT, OUTPUT, "0.85", "0.0001", "-1" });
 
       verifyResult();
     } finally {
@@ -107,7 +110,7 @@ public class PageRankTest extends TestCa
       sum += value.get();
     }
     System.out.println("Sum is: " + sum);
-    assertTrue(sum > 0 && sum < 1d);
+    assertTrue(sum > 0.99d && sum <= 1d);
   }
 
   private void generateSeqTestData() throws IOException {
@@ -125,7 +128,8 @@ public class PageRankTest extends TestCa
     // <input path> <output path>
     PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
     try {
-      PageRank.main(new String[] { TEXT_OUTPUT, OUTPUT, "1" });
+      PageRank
+          .main(new String[] { TEXT_OUTPUT, OUTPUT, "0.85", "0.0001", "-1" });
 
       verifyResult();
     } finally {

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PiEstimatorTest.java Tue May  8 18:13:39 2012
@@ -28,7 +28,7 @@ public class PiEstimatorTest {
   @Test
   public void testCorrectPiExecution() {
     try {
-      PiEstimator.main(new String[]{"10"});
+      PiEstimator.main(new String[] { "10" });
     } catch (Exception e) {
       fail(e.getLocalizedMessage());
     }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/RandBenchTest.java Tue May  8 18:13:39 2012
@@ -28,7 +28,7 @@ public class RandBenchTest {
   @Test
   public void testCorrectRandBenchExecution() {
     try {
-      RandBench.main(new String[]{"10","3","2"});
+      RandBench.main(new String[] { "10", "3", "2" });
     } catch (Exception e) {
       fail(e.getLocalizedMessage());
     }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/PagerankTextToSeqTest.java Tue May  8 18:13:39 2012
@@ -72,7 +72,6 @@ public class PagerankTextToSeqTest exten
     VertexWritable vertex = new VertexWritable();
     VertexArrayWritable vertexArray = new VertexArrayWritable();
 
- 
     while (reader.next(vertex, vertexArray)) {
       int count = 0;
       assertEquals(vertex.getName(), count + "");

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java (original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/util/SSSPTextToSeqTest.java Tue May  8 18:13:39 2012
@@ -93,7 +93,8 @@ public class SSSPTextToSeqTest extends T
 
   public void testArgs() throws Exception {
     writeTextFile();
-    SSSPTextToSeq.main(new String[] { TXT_INPUT, SEQ_OUTPUT, DELIMITER, EDGE_DELIMITER });
+    SSSPTextToSeq.main(new String[] { TXT_INPUT, SEQ_OUTPUT, DELIMITER,
+        EDGE_DELIMITER });
     verifyOutput();
   }
 

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java Tue May  8 18:13:39 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * A absolute difference aggregator, it collects values before the compute and
+ * after the compute, then calculates the difference and globally accumulates
+ * (sums them up) them.
+ */
+public class AbsDiffAggregator extends AbstractAggregator<DoubleWritable> {
+
+  double absoluteDifference = 0.0d;
+
+  @Override
+  public void aggregate(DoubleWritable oldValue, DoubleWritable newValue) {
+    // make sure it's nullsafe
+    if (oldValue != null) {
+      absoluteDifference += Math.abs(oldValue.get() - newValue.get());
+    }
+  }
+
+  @Override
+  public DoubleWritable getValue() {
+    return new DoubleWritable(absoluteDifference);
+  }
+
+}

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbstractAggregator.java Tue May  8 18:13:39 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Abstract base case of an aggregator. Also defines a sticky operation that
+ * aggregates the last value and the new one of a vertex. <br/>
+ * For tracking cases it increments an internal counter on each call of
+ * aggregate.
+ */
+public abstract class AbstractAggregator<V extends Writable> implements
+    Aggregator<V> {
+
+  private int timesAggregated = 0;
+
+  /**
+   * Used for internal tracking purposes.
+   */
+  void aggregateInternal() {
+    timesAggregated++;
+  }
+
+  /**
+   * Used for internal summing.
+   */
+  void addTimesAggregated(int val) {
+    timesAggregated += val;
+  }
+
+  /**
+   * Observes a value of a vertex after the compute method. This is intended to
+   * be overriden by the user and is just an empty implementation in this class.
+   */
+  @Override
+  public void aggregate(V value) {
+
+  }
+
+  /**
+   * Observes the old value of a vertex and the new value at the same time. This
+   * is intended to be overridden by the user and is just an empty
+   * implementation in this class.
+   */
+  public void aggregate(V oldValue, V newValue) {
+
+  }
+
+  /**
+   * Finalizes the aggregation on a master task. This is intended to be
+   * overridden by the user and is just an empty implementation in this class
+   * (returns null).
+   */
+  public V finalizeAggregation() {
+    return null;
+  }
+
+  /**
+   * Gets the value of the aggregator. This is intended to be overridden by the
+   * user and is just an empty implementation in this class (returns null).
+   */
+  @Override
+  public V getValue() {
+    return null;
+  }
+
+  public IntWritable getTimesAggregated() {
+    return new IntWritable(timesAggregated);
+  }
+
+}

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Aggregator.java Tue May  8 18:13:39 2012
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Aggregators are a mechanism for global communication, monitoring, and data.
+ * Each vertex can provide a value to an aggregator in superstep S, the system
+ * combines those values using a reduction operator, and the resulting value is
+ * made available to all vertices in superstep S + 1. <br/>
+ * The result of an aggregator from the last superstep can be picked up by the
+ * vertex itself via {@link Vertex}#getLastAggregatedValue();
+ */
+public interface Aggregator<V extends Writable> {
+
+  /**
+   * Observes a new vertex value.
+   */
+  public void aggregate(V value);
+
+  /**
+   * Gets a vertex value.
+   */
+  public V getValue();
+
+}

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/AverageAggregator.java Tue May  8 18:13:39 2012
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * Averages the result of the {@link AbsDiffAggregator}.
+ */
+public class AverageAggregator extends AbsDiffAggregator {
+
+  @Override
+  public DoubleWritable finalizeAggregation() {
+    return new DoubleWritable(getValue().get()
+        / (double) getTimesAggregated().get());
+  }
+
+}

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Tue May  8 18:13:39 2012
@@ -25,7 +25,9 @@ import org.apache.hama.bsp.BSPJob;
 import org.apache.hama.bsp.Combiner;
 
 public class GraphJob extends BSPJob {
+
   public final static String VERTEX_CLASS_ATTR = "hama.graph.vertex.class";
+  public final static String AGGREGATOR_CLASS_ATTR = "hama.graph.aggregator.class";
   public final static String VERTEX_MESSAGE_COMBINER_CLASS_ATTR = "hama.vertex.message.combiner.class";
 
   public GraphJob(HamaConfiguration conf, Class<?> exampleClass)
@@ -37,15 +39,20 @@ public class GraphJob extends BSPJob {
 
   /**
    * Set the Vertex class for the job.
-   * 
-   * @param cls
-   * @throws IllegalStateException
    */
   public void setVertexClass(Class<? extends Vertex<? extends Writable>> cls)
       throws IllegalStateException {
     conf.setClass(VERTEX_CLASS_ATTR, cls, Vertex.class);
   }
 
+  /**
+   * Set the aggregator for the job.
+   */
+  public void setAggregatorClass(
+      Class<? extends Aggregator<? extends Writable>> cls) {
+    conf.setClass(AGGREGATOR_CLASS_ATTR, cls, Aggregator.class);
+  }
+
   @SuppressWarnings("unchecked")
   public Class<? extends Vertex<? extends Writable>> getVertexClass() {
     return (Class<? extends Vertex<? extends Writable>>) conf.getClass(
@@ -58,6 +65,10 @@ public class GraphJob extends BSPJob {
     conf.setClass(VERTEX_MESSAGE_COMBINER_CLASS_ATTR, cls, Combiner.class);
   }
 
+  /**
+   * Sets how many iterations the algorithm should perform, -1 for deactivated
+   * is default value.
+   */
   public void setMaxIteration(int maxIteration) {
     conf.setInt("hama.graph.max.iteration", maxIteration);
   }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue May  8 18:13:39 2012
@@ -20,6 +20,7 @@ package org.apache.hama.graph;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -39,61 +40,196 @@ import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.sync.SyncException;
 import org.apache.hama.util.KeyValuePair;
 
-@SuppressWarnings("unchecked")
+@SuppressWarnings({ "unchecked", "rawtypes" })
 public class GraphJobRunner extends BSP {
+
   public static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+
+  // make sure that these values don't collide with the vertex names
+  private static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
+  private static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
+  private static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+
+  private static final Text FLAG_MESSAGE_COUNTS = new Text(
+      S_FLAG_MESSAGE_COUNTS);
+  private static final Text FLAG_AGGREGATOR_VALUE = new Text(
+      S_FLAG_AGGREGATOR_VALUE);
+  private static final Text FLAG_AGGREGATOR_INCREMENT = new Text(
+      S_FLAG_AGGREGATOR_INCREMENT);
+
+  private static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+
   private Configuration conf;
   private Combiner<? extends Writable> combiner;
-  private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
 
-  private String FLAG_MESSAGE = "hama.graph.msg.counts";
-  private final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  private Aggregator<Writable> aggregator;
+  private Writable globalAggregatorResult;
+  private IntWritable globalAggregatorIncrement;
+  private boolean isAbstractAggregator;
+
+  // aggregator on the master side
+  private Aggregator<Writable> masterAggregator;
+
+  private Map<String, Vertex> vertices = new HashMap<String, Vertex>();
 
   private String masterTask;
   private boolean updated = true;
   private int globalUpdateCounts = 0;
 
+  private long numberVertices;
+  // -1 is deactivated
+  private int maxIteration = -1;
+  private long iteration;
+
+  // TODO check if our graph is not broken and repair
+  public void setup(BSPPeer peer) throws IOException, SyncException,
+      InterruptedException {
+    this.conf = peer.getConfiguration();
+    // Choose one as a master to collect global updates
+    this.masterTask = peer.getPeerName(0);
+
+    if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
+        Combiner.class)) {
+      LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
+
+      combiner = (Combiner<? extends Writable>) ReflectionUtils.newInstance(
+          conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
+          conf);
+    }
+
+    if (!conf.getClass("hama.graph.aggregator.class", Aggregator.class).equals(
+        Aggregator.class)) {
+      LOG.debug("Aggregator class: " + conf.get(MESSAGE_COMBINER_CLASS));
+
+      aggregator = getNewAggregator();
+      if (aggregator instanceof AbstractAggregator) {
+        isAbstractAggregator = true;
+      }
+      if (isMasterTask(peer)) {
+        masterAggregator = getNewAggregator();
+      }
+    }
+
+    loadVertices(peer);
+    numberVertices = vertices.size() * peer.getNumPeers();
+    // TODO refactor this to a single step
+    for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
+      LinkedList<Writable> msgIterator = new LinkedList<Writable>();
+      Vertex v = e.getValue();
+      msgIterator.add(v.getValue());
+      Writable lastValue = v.getValue();
+      v.compute(msgIterator.iterator());
+      if (aggregator != null) {
+        aggregator.aggregate(v.getValue());
+        if (isAbstractAggregator) {
+          AbstractAggregator intern = ((AbstractAggregator) aggregator);
+          intern.aggregate(lastValue, v.getValue());
+          intern.aggregateInternal();
+        }
+      }
+    }
+    iteration++;
+  }
+
   @Override
   public void bsp(BSPPeer peer) throws IOException, SyncException,
       InterruptedException {
-    int maxIteration = peer.getConfiguration().getInt(
-        "hama.graph.max.iteration", 30);
 
-    int iteration = 0;
-    while (updated && iteration < maxIteration) {
+    maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
+        -1);
+
+    while (updated && !((maxIteration > 0) && iteration > maxIteration)) {
       globalUpdateCounts = 0;
       peer.sync();
 
       // Map <vertexID, messages>
-      Map<String, LinkedList<Writable>> messages = parseMessages(peer);
+      final Map<String, LinkedList<Writable>> messages = parseMessages(peer);
+      if (isMasterTask(peer) && peer.getSuperstepCount() > 1) {
 
-      // exit if there's no update made
-      if (globalUpdateCounts == 0 && peer.getPeerName().equals(masterTask)
-          && peer.getSuperstepCount() > 1) {
         MapWritable updatedCnt = new MapWritable();
-        updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(
-            Integer.MIN_VALUE));
-
+        // exit if there's no update made
+        if (globalUpdateCounts == 0) {
+          updatedCnt.put(FLAG_MESSAGE_COUNTS,
+              new IntWritable(Integer.MIN_VALUE));
+        } else {
+          if (aggregator != null) {
+            Writable lastAggregatedValue = masterAggregator.getValue();
+            if (isAbstractAggregator) {
+              final AbstractAggregator intern = ((AbstractAggregator) aggregator);
+              final Writable finalizeAggregation = intern.finalizeAggregation();
+              if (intern.finalizeAggregation() != null) {
+                lastAggregatedValue = finalizeAggregation;
+              }
+              // this count is usually the times of active vertices in the graph
+              updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
+                  intern.getTimesAggregated());
+            }
+            updatedCnt.put(FLAG_AGGREGATOR_VALUE, lastAggregatedValue);
+          }
+        }
         for (String peerName : peer.getAllPeerNames()) {
           peer.send(peerName, updatedCnt);
         }
       }
+      // if we have an aggregator defined, we must make an additional sync
+      // to have the updated values available on all our peers.
+      if (aggregator != null && peer.getSuperstepCount() > 1) {
+        peer.sync();
+
+        MapWritable updatedValues = (MapWritable) peer.getCurrentMessage();
+        globalAggregatorResult = updatedValues.get(FLAG_AGGREGATOR_VALUE);
+        globalAggregatorIncrement = (IntWritable) updatedValues
+            .get(FLAG_AGGREGATOR_INCREMENT);
+
+        aggregator = getNewAggregator();
+        if (isMasterTask(peer)) {
+          masterAggregator = getNewAggregator();
+        }
+        IntWritable count = (IntWritable) updatedValues
+            .get(FLAG_MESSAGE_COUNTS);
+        if (count != null && count.get() == Integer.MIN_VALUE) {
+          updated = false;
+          break;
+        }
+      }
 
-      // send msgCounts to the master task
-      MapWritable updatedCnt = new MapWritable();
-      updatedCnt.put(new Text(FLAG_MESSAGE), new IntWritable(messages.size()));
-      peer.send(masterTask, updatedCnt);
-
-      for (Map.Entry<String, LinkedList<Writable>> e : messages.entrySet()) {
+      int messagesSize = messages.size();
+      Iterator<Entry<String, LinkedList<Writable>>> iterator = messages
+          .entrySet().iterator();
+      while (iterator.hasNext()) {
+        Entry<String, LinkedList<Writable>> e = iterator.next();
         LinkedList msgs = e.getValue();
         if (combiner != null) {
           Writable combined = combiner.combine(msgs);
           msgs = new LinkedList();
           msgs.add(combined);
         }
+        Vertex vertex = vertices.get(e.getKey());
+        Writable lastValue = vertex.getValue();
+        vertex.compute(msgs.iterator());
+        if (aggregator != null) {
+          aggregator.aggregate(vertex.getValue());
+          if (isAbstractAggregator) {
+            AbstractAggregator intern = ((AbstractAggregator) aggregator);
+            intern.aggregate(lastValue, vertex.getValue());
+            intern.aggregateInternal();
+          }
+        }
+        iterator.remove();
+      }
 
-        vertices.get(e.getKey()).compute(msgs.iterator());
+      // send msgCounts to the master task
+      MapWritable updatedCnt = new MapWritable();
+      updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(messagesSize));
+      // also send aggregated values to the master
+      if (aggregator != null) {
+        updatedCnt.put(FLAG_AGGREGATOR_VALUE, aggregator.getValue());
+        if (isAbstractAggregator) {
+          updatedCnt.put(FLAG_AGGREGATOR_INCREMENT,
+              ((AbstractAggregator) aggregator).getTimesAggregated());
+        }
       }
+      peer.send(masterTask, updatedCnt);
       iteration++;
     }
   }
@@ -103,19 +239,25 @@ public class GraphJobRunner extends BSP 
     MapWritable msg = null;
     Map<String, LinkedList<Writable>> msgMap = new HashMap<String, LinkedList<Writable>>();
     while ((msg = (MapWritable) peer.getCurrentMessage()) != null) {
-
       for (Entry<Writable, Writable> e : msg.entrySet()) {
         String vertexID = ((Text) e.getKey()).toString();
-
-        if (vertexID.toString().equals(FLAG_MESSAGE)) {
+        if (vertexID.equals(S_FLAG_MESSAGE_COUNTS)) {
           if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
             updated = false;
           } else {
             globalUpdateCounts += ((IntWritable) e.getValue()).get();
           }
+        } else if (aggregator != null
+            && vertexID.equals(S_FLAG_AGGREGATOR_VALUE)) {
+          masterAggregator.aggregate(e.getValue());
+        } else if (aggregator != null
+            && vertexID.equals(S_FLAG_AGGREGATOR_INCREMENT)) {
+          if (isAbstractAggregator) {
+            ((AbstractAggregator) masterAggregator)
+                .addTimesAggregated(((IntWritable) e.getValue()).get());
+          }
         } else {
           Writable value = e.getValue();
-
           if (msgMap.containsKey(vertexID)) {
             LinkedList<Writable> msgs = msgMap.get(vertexID);
             msgs.add(value);
@@ -128,48 +270,29 @@ public class GraphJobRunner extends BSP 
         }
       }
     }
-
     return msgMap;
   }
 
-  public void setup(BSPPeer peer) throws IOException, SyncException,
-      InterruptedException {
-    this.conf = peer.getConfiguration();
-    // Choose one as a master to collect global updates
-    this.masterTask = peer.getPeerName(0);
-
-    if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
-        Combiner.class)) {
-      LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
-
-      combiner = (Combiner<? extends Writable>) ReflectionUtils.newInstance(
-          conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
-          conf);
-    }
-
-    loadVertices(peer);
-    long numberVertices = vertices.size() * peer.getNumPeers();
-
-    for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
-      e.getValue().setNumVertices(numberVertices);
-
-      LinkedList<Writable> msgIterator = new LinkedList<Writable>();
-      msgIterator.add(e.getValue().getValue());
-      e.getValue().compute(msgIterator.iterator());
-    }
-  }
-
   private void loadVertices(BSPPeer peer) throws IOException {
     LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
+    boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
     KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;
     while ((next = peer.readNext()) != null) {
       Vertex<? extends Writable> vertex = (Vertex<? extends Writable>) ReflectionUtils
           .newInstance(conf.getClass("hama.graph.vertex.class", Vertex.class),
               conf);
+
       vertex.setVertexID(next.getKey().getName());
       vertex.peer = peer;
+      vertex.runner = this;
 
       VertexWritable[] arr = (VertexWritable[]) next.getValue().toArray();
+      if (selfReference) {
+        VertexWritable[] tmp = new VertexWritable[arr.length + 1];
+        System.arraycopy(arr, 0, tmp, 0, arr.length);
+        tmp[arr.length] = new VertexWritable(vertex.getVertexID());
+        arr = tmp;
+      }
       List<Edge> edges = new ArrayList<Edge>();
       for (VertexWritable e : arr) {
         String target = peer.getPeerName(Math.abs((e.hashCode() % peer
@@ -178,6 +301,7 @@ public class GraphJobRunner extends BSP 
       }
 
       vertex.edges = edges;
+      vertex.setup(conf);
       vertices.put(next.getKey().getName(), vertex);
     }
   }
@@ -188,7 +312,36 @@ public class GraphJobRunner extends BSP 
   public void cleanup(BSPPeer peer) throws IOException {
     for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
       peer.write(new Text(e.getValue().getVertexID()), e.getValue().getValue());
-      LOG.debug(e.getValue().getVertexID() + ", " + e.getValue().getValue());
     }
   }
+
+  private Aggregator<Writable> getNewAggregator() {
+    return (Aggregator<Writable>) ReflectionUtils.newInstance(
+        conf.getClass("hama.graph.aggregator.class", Aggregator.class), conf);
+  }
+
+  private boolean isMasterTask(BSPPeer peer) {
+    return peer.getPeerName().equals(masterTask);
+  }
+
+  public long getNumberVertices() {
+    return numberVertices;
+  }
+
+  public long getNumberIterations() {
+    return iteration;
+  }
+
+  public int getMaxIteration() {
+    return maxIteration;
+  }
+
+  public Writable getLastAggregatedValue() {
+    return globalAggregatorResult;
+  }
+
+  public IntWritable getNumLastAggregatedVertices() {
+    return globalAggregatorIncrement;
+  }
+
 }

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MaxAggregator.java Tue May  8 18:13:39 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.IntWritable;
+
+public class MaxAggregator extends AbstractAggregator<IntWritable> {
+
+  int max = Integer.MIN_VALUE;
+
+  public void aggregate(IntWritable value) {
+    if (value.get() > max) {
+      max = value.get();
+    }
+  }
+
+  public IntWritable getValue() {
+    return new IntWritable(max);
+  }
+
+}

Added: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java?rev=1335670&view=auto
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java (added)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/MinAggregator.java Tue May  8 18:13:39 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.IntWritable;
+
+public class MinAggregator extends AbstractAggregator<IntWritable> {
+
+  int min = Integer.MAX_VALUE;
+
+  public void aggregate(IntWritable value) {
+    if (value.get() < min) {
+      min = value.get();
+    }
+  }
+
+  public IntWritable getValue() {
+    return new IntWritable(min);
+  }
+
+}

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Tue May  8 18:13:39 2012
@@ -21,17 +21,19 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.bsp.BSPPeer;
 
 public abstract class Vertex<M extends Writable> implements VertexInterface<M> {
+
   private M value;
   private String vertexID;
+  protected GraphJobRunner runner;
   protected BSPPeer<?, ?, ?, ?, MapWritable> peer;
   public List<Edge> edges;
-  private long numVertices;
 
   public Configuration getConf() {
     return peer.getConfiguration();
@@ -43,6 +45,10 @@ public abstract class Vertex<M extends W
   }
 
   @Override
+  public void setup(Configuration conf) {
+  }
+
+  @Override
   public void sendMessage(Edge e, M msg) throws IOException {
     MapWritable message = new MapWritable();
     message.put(new Text(e.getName()), msg);
@@ -60,7 +66,7 @@ public abstract class Vertex<M extends W
 
   @Override
   public long getSuperstepCount() {
-    return peer.getSuperstepCount();
+    return runner.getNumberIterations();
   }
 
   @Override
@@ -83,7 +89,24 @@ public abstract class Vertex<M extends W
   }
 
   public int getMaxIteration() {
-    return peer.getConfiguration().getInt("hama.graph.max.iteration", 30);
+    return runner.getMaxIteration();
+  }
+
+  /**
+   * Get the last aggregated value of the defined aggregator, null if nothing
+   * was configured or not returned a result.
+   */
+  @SuppressWarnings("unchecked")
+  public M getLastAggregatedValue() {
+    return (M) runner.getLastAggregatedValue();
+  }
+
+  /**
+   * Get the number of aggregated vertices in the last superstep. Or null if no
+   * aggregator is available.
+   */
+  public IntWritable getNumLastAggregatedVertices() {
+    return runner.getNumLastAggregatedVertices();
   }
 
   public int getNumPeers() {
@@ -91,11 +114,12 @@ public abstract class Vertex<M extends W
   }
 
   public long getNumVertices() {
-    return numVertices;
+    return runner.getNumberVertices();
   }
 
-  public void setNumVertices(long NumVertices) {
-    this.numVertices = NumVertices;
+  @Override
+  public String toString() {
+    return getVertexID() + "=" + getValue();
   }
 
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Tue May  8 18:13:39 2012
@@ -21,30 +21,44 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 
 public interface VertexInterface<MSGTYPE extends Writable> {
 
+  /**
+   * Used to setup a vertex.
+   */
+  public void setup(Configuration conf);
+
   /** @return the unique identification for the vertex. */
   public String getVertexID();
-  /** @return the number of vertices in the input graph. */ 
+
+  /** @return the number of vertices in the input graph. */
   public long getNumVertices();
-  /** The user-defined function */ 
+
+  /** The user-defined function */
   public void compute(Iterator<MSGTYPE> messages) throws IOException;
+
   /** @return a list of outgoing edges of this vertex in the input graph. */
   public List<Edge> getOutEdges();
+
   /** Sends a message to another vertex. */
   public void sendMessage(Edge e, MSGTYPE msg) throws IOException;
+
   /** Sends a message to neighbors */
   public void sendMessageToNeighbors(MSGTYPE msg) throws IOException;
+
   /** @return the superstep number of the current superstep (starting from 0). */
   public long getSuperstepCount();
+
   /**
    * Sets the vertex value
    * 
    * @param value
    */
   public void setValue(MSGTYPE value);
+
   /**
    * Gets the vertex value
    * 

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java?rev=1335670&r1=1335669&r2=1335670&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexWritable.java Tue May  8 18:13:39 2012
@@ -39,7 +39,7 @@ public class VertexWritable implements W
     this.name = name;
     this.weight = 0;
   }
-  
+
   public VertexWritable(int weight, String name) {
     super();
     this.name = name;
@@ -58,7 +58,7 @@ public class VertexWritable implements W
   public String toString() {
     return getName();
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
     this.name = in.readUTF();



Mime
View raw message