hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1449666 [1/2] - in /hama/trunk: ./ conf/ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/bsp/ft/ core/src/main/java/org/apache/hama/util/ core/src/test/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama...
Date Mon, 25 Feb 2013 11:40:14 GMT
Author: tjungblut
Date: Mon Feb 25 11:40:13 2013
New Revision: 1449666

URL: http://svn.apache.org/r1449666
Log:
[HAMA-704]:  Optimization of memory usage during message processing 

Added:
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexMessageIterable.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VerticesInfo.java
      - copied, changed from r1449611, hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAbsDiffAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestAverageAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestDiskVerticesInfo.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestGraphJobMessage.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestMinMaxAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSumAggregator.java
Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/IVerticesInfo.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/conf/hama-default.xml
    hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
    hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
    hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Mon Feb 25 11:40:13 2013
@@ -19,6 +19,7 @@ Release 0.7 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-704:  Optimization of memory usage during message processing (tjungblut)
    HAMA-735: Tighten the graph API (tjungblut)
    HAMA-714: Align format consistency between examples and generators (edwardyoon)
    HAMA-531: Reimplementation of partitioner (edwardyoon)

Modified: hama/trunk/conf/hama-default.xml
URL: http://svn.apache.org/viewvc/hama/trunk/conf/hama-default.xml?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/conf/hama-default.xml (original)
+++ hama/trunk/conf/hama-default.xml Mon Feb 25 11:40:13 2013
@@ -92,6 +92,11 @@
     <description>Temporary directory on the local message buffer on disk.</description>
   </property>
   <property>
+    <name>hama.disk.vertices.path</name>
+    <value>${hama.tmp.dir}/graph/</value>
+    <description>Disk directory for graph data.</description>
+  </property>
+  <property>
     <name>bsp.child.java.opts</name>
     <value>-Xmx1024m</value>
     <description>Java opts for the groom server child processes.  

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/SimpleTaskScheduler.java Mon Feb 25 11:40:13 2013
@@ -357,8 +357,7 @@ class SimpleTaskScheduler extends TaskSc
         final Act act = new Act(new ZKCollector(zk, "jvm", "Jvm metrics.",
             jvmPath), new CollectorHandler() {
           @Override
-          public void handle(@SuppressWarnings("rawtypes")
-          Future future) {
+          public void handle(@SuppressWarnings("rawtypes") Future future) {
             try {
               MetricsRecord record = (MetricsRecord) future.get();
               if (null != record) {

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/AsyncRcvdMsgCheckpointImpl.java Mon Feb 25 11:40:13 2013
@@ -280,8 +280,7 @@ public class AsyncRcvdMsgCheckpointImpl<
 
   @Override
   public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
-      @SuppressWarnings("rawtypes")
-      BSPPeer bspPeer, PeerSyncClient syncClient,
+      @SuppressWarnings("rawtypes") BSPPeer bspPeer, PeerSyncClient syncClient,
       InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
       long superstep, Configuration conf, MessageManager<M> messenger)
       throws Exception {
@@ -327,8 +326,9 @@ public class AsyncRcvdMsgCheckpointImpl<
     volatile private FSDataOutputStream checkpointStream;
     volatile private long checkpointMessageCount;
 
-    public void initialize(BSPJob job, @SuppressWarnings("rawtypes")
-    BSPPeer bspPeer, PeerSyncClient syncClient, InetSocketAddress peerAddress,
+    public void initialize(BSPJob job,
+        @SuppressWarnings("rawtypes") BSPPeer bspPeer,
+        PeerSyncClient syncClient, InetSocketAddress peerAddress,
         TaskAttemptID taskAttemptId, long superstep, Configuration conf,
         MessageManager<M> messenger) throws IOException {
 

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/ft/BSPFaultTolerantService.java Mon Feb 25 11:40:13 2013
@@ -81,8 +81,7 @@ public interface BSPFaultTolerantService
    *         <code>FaultTolerantPeerService</code>
    */
   public FaultTolerantPeerService<M> constructPeerFaultTolerance(BSPJob job,
-      @SuppressWarnings("rawtypes")
-      BSPPeer bspPeer, PeerSyncClient syncClient,
+      @SuppressWarnings("rawtypes") BSPPeer bspPeer, PeerSyncClient syncClient,
       InetSocketAddress peerAddress, TaskAttemptID taskAttemptId,
       long superstep, Configuration conf, MessageManager<M> messenger)
       throws Exception;

Modified: hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/util/ReflectionUtils.java Mon Feb 25 11:40:13 2013
@@ -49,6 +49,7 @@ public class ReflectionUtils {
       Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass);
       if (null == meth) {
         meth = theClass.getDeclaredConstructor(new Class[0]);
+        meth.setAccessible(true);
         CONSTRUCTOR_CACHE.put(theClass, meth);
       }
       result = meth.newInstance();

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Mon Feb 25 11:40:13 2013
@@ -442,8 +442,8 @@ public class TestCheckpoint extends Test
   }
 
   private static void checkSuperstepMsgCount(PeerSyncClient syncClient,
-      @SuppressWarnings("rawtypes")
-      BSPPeer bspTask, BSPJob job, long step, long count) {
+      @SuppressWarnings("rawtypes") BSPPeer bspTask, BSPJob job, long step,
+      long count) {
 
     ArrayWritable writableVal = new ArrayWritable(LongWritable.class);
 

Modified: hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/monitor/TestFederator.java Mon Feb 25 11:40:13 2013
@@ -71,8 +71,7 @@ public class TestFederator extends TestC
     final Act act = new Act(new DummyCollector(expected),
         new CollectorHandler() {
           @Override
-          public void handle(@SuppressWarnings("rawtypes")
-          Future future) {
+          public void handle(@SuppressWarnings("rawtypes") Future future) {
             try {
               finalResult.set(((Integer) future.get()).intValue());
               LOG.info("Value after submitted: " + finalResult);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/BipartiteMatching.java Mon Feb 25 11:40:13 2013
@@ -17,8 +17,6 @@
  */
 package org.apache.hama.examples;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -59,22 +57,15 @@ public final class BipartiteMatching {
     private final static Text LEFT = new Text("L");
     private final static Text RIGHT = new Text("R");
 
-    // Needed because Vertex value and message sent have same types.
-    private TextPair reusableMessage;
-    private Random random;
-
     @Override
     public void setup(Configuration conf) {
       this.getPeer().getNumCurrentMessages();
-      reusableMessage = new TextPair(new Text(getVertexID()), new Text("1"))
-          .setNames("SourceVertex", "Vestige");
-      random = new Random(Long.parseLong(getConf().get(SEED_CONFIGURATION_KEY,
-          System.currentTimeMillis() + "")));
     }
 
     @Override
-    public void compute(Iterator<TextPair> messages) throws IOException {
-
+    public void compute(Iterable<TextPair> msgs) throws IOException {
+      Random random = new Random(Long.parseLong(getConf().get(
+          SEED_CONFIGURATION_KEY)));
       if (isMatched()) {
         voteToHalt();
         return;
@@ -90,8 +81,8 @@ public final class BipartiteMatching {
         case 1:
           if (Objects.equal(getComponent(), RIGHT)) {
             List<TextPair> buffer = new ArrayList<TextPair>();
-            while (messages.hasNext()) {
-              buffer.add(messages.next());
+            for (TextPair next : msgs) {
+              buffer.add(new TextPair(next.getFirst(), next.getSecond()));
             }
             if (buffer.size() > 0) {
               TextPair luckyMsg = buffer.get(RandomUtils.nextInt(random,
@@ -106,9 +97,8 @@ public final class BipartiteMatching {
         case 2:
           if (Objects.equal(getComponent(), LEFT)) {
             List<TextPair> buffer = new ArrayList<TextPair>();
-
-            while (messages.hasNext()) {
-              buffer.add(messages.next());
+            for (TextPair next : msgs) {
+              buffer.add(new TextPair(next.getFirst(), next.getSecond()));
             }
             if (buffer.size() > 0) {
               TextPair luckyMsg = buffer.get(RandomUtils.nextInt(random,
@@ -123,8 +113,10 @@ public final class BipartiteMatching {
 
         case 3:
           if (Objects.equal(getComponent(), RIGHT)) {
+            Iterator<TextPair> messages = msgs.iterator();
             if (messages.hasNext()) {
-              Text sourceVertex = getSourceVertex(messages.next());
+              TextPair next = messages.next();
+              Text sourceVertex = getSourceVertex(next);
               setMatchVertex(sourceVertex);
             }
           }
@@ -147,7 +139,7 @@ public final class BipartiteMatching {
     }
 
     private TextPair getNewMessage() {
-      return reusableMessage;
+      return new TextPair(new Text(getVertexID()), new Text("1"));
     }
 
     /**
@@ -161,26 +153,6 @@ public final class BipartiteMatching {
       return !getValue().getFirst().equals(UNMATCHED);
     }
 
-    @Override
-    public void readState(DataInput in) throws IOException {
-      if (in.readBoolean()) {
-        reusableMessage = new TextPair();
-        reusableMessage.readFields(in);
-      }
-
-    }
-
-    @Override
-    public void writeState(DataOutput out) throws IOException {
-      if (reusableMessage == null) {
-        out.writeBoolean(false);
-      } else {
-        out.writeBoolean(true);
-        reusableMessage.write(out);
-      }
-
-    }
-
   }
 
   /**
@@ -204,8 +176,7 @@ public final class BipartiteMatching {
       String[] selfArray = tokenArray[0].trim().split(" ");
 
       vertex.setVertexID(new Text(selfArray[0]));
-      vertex.setValue(new TextPair(UNMATCHED, new Text(selfArray[1])).setNames(
-          "MatchVertex", "Component"));
+      vertex.setValue(new TextPair(UNMATCHED, new Text(selfArray[1])));
       // initially a node is unmatched, which is denoted by U.
 
       for (String adjNode : adjArray) {

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/InlinkCount.java Mon Feb 25 11:40:13 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
@@ -37,14 +36,13 @@ import com.google.common.base.Optional;
 public class InlinkCount extends Vertex<Text, NullWritable, IntWritable> {
 
   @Override
-  public void compute(Iterator<IntWritable> messages) throws IOException {
+  public void compute(Iterable<IntWritable> messages) throws IOException {
 
     if (getSuperstepCount() == 0L) {
       setValue(new IntWritable(0));
       sendMessageToNeighbors(new IntWritable(1));
     } else {
-      while (messages.hasNext()) {
-        IntWritable msg = messages.next();
+      for (IntWritable msg : messages) {
         this.setValue(new IntWritable(this.getValue().get() + msg.get()));
       }
       voteToHalt();

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/MindistSearch.java Mon Feb 25 11:40:13 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -49,7 +48,7 @@ public class MindistSearch {
       Vertex<Text, NullWritable, Text> {
 
     @Override
-    public void compute(Iterator<Text> messages) throws IOException {
+    public void compute(Iterable<Text> messages) throws IOException {
       Text currentComponent = getValue();
       if (getSuperstepCount() == 0L) {
         // if we have no associated component, pick the lowest in our direct
@@ -66,8 +65,7 @@ public class MindistSearch {
         }
       } else {
         boolean updated = false;
-        while (messages.hasNext()) {
-          Text next = messages.next();
+        for (Text next : messages) {
           if (currentComponent != null && next != null) {
             if (currentComponent.compareTo(next) > 0) {
               updated = true;

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Mon Feb 25 11:40:13 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.examples;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -31,7 +30,6 @@ import org.apache.hama.bsp.HashPartition
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.TextArrayWritable;
 import org.apache.hama.bsp.TextOutputFormat;
-import org.apache.hama.graph.AbstractAggregator;
 import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
@@ -49,8 +47,6 @@ public class PageRank {
     static double DAMPING_FACTOR = 0.85;
     static double MAXIMUM_CONVERGENCE_ERROR = 0.001;
 
-    int numEdges;
-
     @Override
     public void setup(Configuration conf) {
       String val = conf.get("hama.pagerank.alpha");
@@ -61,30 +57,20 @@ public class PageRank {
       if (val != null) {
         MAXIMUM_CONVERGENCE_ERROR = Double.parseDouble(val);
       }
-      numEdges = this.getEdges().size();
     }
 
     @Override
-    public void compute(Iterator<DoubleWritable> messages) throws IOException {
+    public void compute(Iterable<DoubleWritable> messages) throws IOException {
       // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
         this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
       } else if (this.getSuperstepCount() >= 1) {
-        DoubleWritable danglingNodeContribution = getLastAggregatedValue(1);
         double sum = 0;
-        while (messages.hasNext()) {
-          DoubleWritable msg = messages.next();
+        for (DoubleWritable msg : messages) {
           sum += msg.get();
         }
-        if (danglingNodeContribution == null) {
-          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-          this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
-        } else {
-          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-          this.setValue(new DoubleWritable(alpha
-              + (DAMPING_FACTOR * (sum + danglingNodeContribution.get()
-                  / this.getNumVertices()))));
-        }
+        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+        this.setValue(new DoubleWritable(alpha + (sum * DAMPING_FACTOR)));
       }
 
       // if we have not reached our global error yet, then proceed.
@@ -94,34 +80,10 @@ public class PageRank {
         voteToHalt();
         return;
       }
+
       // in each superstep we are going to send a new rank to our neighbours
       sendMessageToNeighbors(new DoubleWritable(this.getValue().get()
-          / numEdges));
-    }
-
-  }
-
-  public static class DanglingNodeAggregator
-      extends
-      AbstractAggregator<DoubleWritable, Vertex<Text, NullWritable, DoubleWritable>> {
-
-    double danglingNodeSum;
-
-    @Override
-    public void aggregate(Vertex<Text, NullWritable, DoubleWritable> vertex,
-        DoubleWritable value) {
-      if (vertex != null) {
-        if (vertex.getEdges().size() == 0) {
-          danglingNodeSum += value.get();
-        }
-      } else {
-        danglingNodeSum += value.get();
-      }
-    }
-
-    @Override
-    public DoubleWritable getValue() {
-      return new DoubleWritable(danglingNodeSum);
+          / this.getEdges().size()));
     }
 
   }
@@ -142,7 +104,6 @@ public class PageRank {
     }
   }
 
-  @SuppressWarnings("unchecked")
   public static GraphJob createJob(String[] args, HamaConfiguration conf)
       throws IOException {
     GraphJob pageJob = new GraphJob(conf, PageRank.class);
@@ -155,15 +116,17 @@ public class PageRank {
     // set the defaults
     pageJob.setMaxIteration(30);
     pageJob.set("hama.pagerank.alpha", "0.85");
+    // reference vertices to itself, because we don't have a dangling node
+    // contribution here
+    pageJob.set("hama.graph.self.ref", "true");
     pageJob.set("hama.graph.max.convergence.error", "0.001");
 
     if (args.length == 3) {
       pageJob.setNumBspTask(Integer.parseInt(args[2]));
     }
 
-    // error, dangling node probability sum
-    pageJob.setAggregatorClass(AverageAggregator.class,
-        DanglingNodeAggregator.class);
+    // error
+    pageJob.setAggregatorClass(AverageAggregator.class);
 
     // Vertex reader
     pageJob.setVertexInputReaderClass(PagerankSeqReader.class);

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/SSSP.java Mon Feb 25 11:40:13 2013
@@ -20,6 +20,7 @@ package org.apache.hama.examples;
 import java.io.IOException;
 import java.util.Iterator;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -41,7 +42,8 @@ public class SSSP {
   public static class ShortestPathVertex extends
       Vertex<Text, IntWritable, IntWritable> {
 
-    public ShortestPathVertex() {
+    @Override
+    public void setup(Configuration conf) {
       this.setValue(new IntWritable(Integer.MAX_VALUE));
     }
 
@@ -51,11 +53,10 @@ public class SSSP {
     }
 
     @Override
-    public void compute(Iterator<IntWritable> messages) throws IOException {
+    public void compute(Iterable<IntWritable> messages) throws IOException {
       int minDist = isStartVertex() ? 0 : Integer.MAX_VALUE;
 
-      while (messages.hasNext()) {
-        IntWritable msg = messages.next();
+      for (IntWritable msg : messages) {
         if (msg.get() < minDist) {
           minDist = msg.get();
         }

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/util/TextPair.java Mon Feb 25 11:40:13 2013
@@ -24,8 +24,6 @@ import java.io.IOException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 
-import com.google.common.base.Objects;
-
 /**
  * TextPair class for use in BipartiteMatching algorithm.
  * 
@@ -35,9 +33,6 @@ public final class TextPair implements W
   Text first;
   Text second;
 
-  String nameFirst = "First";
-  String nameSecond = "Second";
-
   public TextPair() {
     first = new Text();
     second = new Text();
@@ -48,15 +43,6 @@ public final class TextPair implements W
     this.second = second;
   }
 
-  /**
-   * Sets the names of the attributes
-   */
-  public TextPair setNames(String nameFirst, String nameSecond) {
-    this.nameFirst = nameFirst;
-    this.nameSecond = nameSecond;
-    return this;
-  }
-
   public Text getFirst() {
     return first;
   }
@@ -75,8 +61,6 @@ public final class TextPair implements W
 
   @Override
   public void write(DataOutput out) throws IOException {
-    (new Text(nameFirst)).write(out);
-    (new Text(nameSecond)).write(out);
     first.write(out);
     second.write(out);
   }
@@ -84,20 +68,13 @@ public final class TextPair implements W
   @Override
   public void readFields(DataInput in) throws IOException {
 
-    Text t1 = new Text();
-    Text t2 = new Text();
-    t1.readFields(in);
-    t2.readFields(in);
-    nameFirst = t1.toString();
-    nameSecond = t2.toString();
     first.readFields(in);
     second.readFields(in);
   }
 
   @Override
   public String toString() {
-    return Objects.toStringHelper(this).add(nameFirst, getFirst())
-        .add(nameSecond, getSecond()).toString();
+    return first + " " + second;
   }
 
 }

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/BipartiteMatchingTest.java Mon Feb 25 11:40:13 2013
@@ -46,15 +46,13 @@ public class BipartiteMatchingTest exten
 
   private final static String DELIMETER = "\t";
 
-  @SuppressWarnings("serial")
-  private Map<String, String> output1 = new HashMap<String, String>() {
-    {
-      put("C", "TextPair{MatchVertex=D, Component=L}");
-      put("A", "TextPair{MatchVertex=B, Component=L}");
-      put("D", "TextPair{MatchVertex=C, Component=R}");
-      put("B", "TextPair{MatchVertex=A, Component=R}");
-    }
-  };
+  private Map<String, String> output1 = new HashMap<String, String>();
+  {
+    output1.put("A", "D L");
+    output1.put("B", "C R");
+    output1.put("C", "B L");
+    output1.put("D", "A R");
+  }
 
   public static class CustomTextPartitioner implements
       Partitioner<Text, TextPair> {
@@ -110,36 +108,27 @@ public class BipartiteMatchingTest exten
 
   private void verifyResult() throws IOException {
     FileStatus[] files = fs.globStatus(new Path(OUTPUT + "/part-*"));
-    assertTrue(files.length == 2);
-
-    Text key = new Text();
-    Text value = new Text();
+    assertTrue("Not enough files found: " + files.length, files.length == 2);
 
     for (FileStatus file : files) {
       if (file.getLen() > 0) {
         FSDataInputStream in = fs.open(file.getPath());
         BufferedReader bin = new BufferedReader(new InputStreamReader(in));
 
-        String s = bin.readLine();
-        while (s != null) {
-          next(key, value, s);
-          String expValue = output1.get(key.toString());
-          System.out.println(key + " " + value + " expvalue = " + expValue);
-          assertEquals(expValue, value.toString());
-
-          s = bin.readLine();
+        String s = null;
+        while ((s = bin.readLine()) != null) {
+          String[] lineA = s.split(DELIMETER);
+          String expValue = output1.get(lineA[0]);
+          assertNotNull(expValue);
+          System.out.println(lineA[0] + " -> " + lineA[1] + " expvalue = "
+              + expValue);
+          assertEquals(expValue, lineA[1]);
         }
         in.close();
       }
     }
   }
 
-  private static void next(Text key, Text value, String line) {
-    String[] lineA = line.split(DELIMETER);
-    key.set(lineA[0]);
-    value.set(lineA[1]);
-  }
-
   private void deleteTempDirs() {
     try {
       if (fs.exists(new Path(INPUT)))

Added: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (added)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Mon Feb 25 11:40:13 2013
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.TextArrayWritable;
+import org.junit.Test;
+
+/**
+ * Testcase for {@link PageRank}
+ */
+public class PageRankTest extends TestCase {
+  String[] input = new String[] { "1\t2\t3", "2", "3\t1\t2\t5", "4\t5\t6",
+      "5\t4\t6", "6\t4", "7\t2\t4" };
+
+  private static String INPUT = "/tmp/page-tmp.seq";
+  private static String TEXT_INPUT = "/tmp/page.txt";
+  private static String TEXT_OUTPUT = INPUT + "page.txt.seq";
+  private static String OUTPUT = "/tmp/page-out";
+  private Configuration conf = new HamaConfiguration();
+  private FileSystem fs;
+
+  @Override
+  protected void setUp() throws Exception {
+    super.setUp();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testPageRank() throws IOException, InterruptedException,
+      ClassNotFoundException, InstantiationException, IllegalAccessException {
+
+    generateTestData();
+    try {
+      PageRank.main(new String[] { INPUT, OUTPUT, "3" });
+      verifyResult();
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
+  private void verifyResult() throws IOException {
+    FileStatus[] globStatus = fs.globStatus(new Path(OUTPUT + "/part-*"));
+    double sum = 0d;
+    for (FileStatus fts : globStatus) {
+      BufferedReader reader = new BufferedReader(new InputStreamReader(
+          fs.open(fts.getPath())));
+      String line = null;
+      while ((line = reader.readLine()) != null) {
+        System.out.println(line);
+        String[] split = line.split("\t");
+        sum += Double.parseDouble(split[1]);
+      }
+    }
+    System.out.println(sum);
+    assertTrue("Sum was: " + sum, sum > 0.9 && sum < 1.1);
+  }
+
+  private void generateTestData() {
+    try {
+      SequenceFile.Writer writer1 = SequenceFile.createWriter(fs, conf,
+          new Path(INPUT + "/part0"), Text.class, TextArrayWritable.class);
+
+      for (int i = 0; i < input.length; i++) {
+        String[] x = input[i].split("\t");
+
+        Text vertex = new Text(x[0]);
+        TextArrayWritable arr = new TextArrayWritable();
+        Writable[] values = new Writable[x.length - 1];
+        for (int j = 1; j < x.length; j++) {
+          values[j - 1] = new Text(x[j]);
+        }
+        arr.set(values);
+        writer1.append(vertex, arr);
+      }
+
+      writer1.close();
+
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private void deleteTempDirs() {
+    try {
+      if (fs.exists(new Path(INPUT)))
+        fs.delete(new Path(INPUT), true);
+      if (fs.exists(new Path(OUTPUT)))
+        fs.delete(new Path(OUTPUT), true);
+      if (fs.exists(new Path(TEXT_INPUT)))
+        fs.delete(new Path(TEXT_INPUT), true);
+      if (fs.exists(new Path(TEXT_OUTPUT)))
+        fs.delete(new Path(TEXT_OUTPUT), true);
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+}

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/SSSPTest.java Mon Feb 25 11:40:13 2013
@@ -32,14 +32,23 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hama.HamaConfiguration;
+import org.junit.Test;
 
 /**
  * Testcase for {@link ShortestPaths}
  */
 public class SSSPTest extends TestCase {
-  String[] input = new String[] { "1:85\t2:217\t4:173", "0:85\t5:80",
-      "0:217\t6:186\t7:103", "7:183", "0:173\t9:502", "1:80\t8:250", "2:186",
-      "3:183\t9:167\t2:103", "5:250\t9:84", "4:502\t7:167\t8:84" };
+  String[] input = new String[] { "1:85\t2:217\t4:173",// 0
+      "0:85\t5:80",// 1
+      "0:217\t6:186\t7:103",// 2
+      "7:183",// 3
+      "0:173\t9:502", // 4
+      "1:80\t8:250", // 5
+      "2:186", // 6
+      "3:183\t9:167\t2:103", // 7
+      "5:250\t9:84", // 8
+      "4:502\t7:167\t8:84" // 9
+  };
 
   private static String INPUT = "/tmp/sssp-tmp.seq";
   private static String TEXT_INPUT = "/tmp/sssp.txt";
@@ -54,6 +63,7 @@ public class SSSPTest extends TestCase {
     fs = FileSystem.get(conf);
   }
 
+  @Test
   public void testShortestPaths() throws IOException, InterruptedException,
       ClassNotFoundException, InstantiationException, IllegalAccessException {
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Mon Feb 25 11:40:13 2013
@@ -36,7 +36,7 @@ import com.google.common.base.Preconditi
  * configured.
  * 
  */
-public final class AggregationRunner<V extends WritableComparable<V>, E extends Writable, M extends Writable> {
+public final class AggregationRunner<V extends WritableComparable<? super V>, E extends Writable, M extends Writable> {
 
   // multiple aggregator arrays
   private Aggregator<M, Vertex<V, E, M>>[] aggregators;
@@ -166,31 +166,23 @@ public final class AggregationRunner<V e
   }
 
   /**
-   * Receives aggregated values from a master task, by doing an additional
-   * barrier sync and parsing the messages.
+   * Receives aggregated values from a master task.
    * 
    * @return always true if no aggregators are defined, false if aggregators say
    *         we haven't seen any messages anymore.
    */
-  public boolean receiveAggregatedValues(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+  public boolean receiveAggregatedValues(MapWritable updatedValues,
       long iteration) throws IOException, SyncException, InterruptedException {
-    // if we have an aggregator defined, we must make an additional sync
-    // to have the updated values available on all our peers.
-    if (isEnabled() && iteration > 1) {
-      peer.sync();
-
-      MapWritable updatedValues = peer.getCurrentMessage().getMap();
-      for (int i = 0; i < aggregators.length; i++) {
-        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
-        globalAggregatorIncrement[i] = (IntWritable) updatedValues
-            .get(aggregatorIncrementFlag[i]);
-      }
-      IntWritable count = (IntWritable) updatedValues
-          .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
-      if (count != null && count.get() == Integer.MIN_VALUE) {
-        return false;
-      }
+    // map is the first value that is in the queue
+    for (int i = 0; i < aggregators.length; i++) {
+      globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+      globalAggregatorIncrement[i] = (IntWritable) updatedValues
+          .get(aggregatorIncrementFlag[i]);
+    }
+    IntWritable count = (IntWritable) updatedValues
+        .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
+    if (count != null && count.get() == Integer.MIN_VALUE) {
+      return false;
     }
     return true;
   }

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/DiskVerticesInfo.java Mon Feb 25 11:40:13 2013
@@ -0,0 +1,359 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
+import org.apache.hama.graph.IDSkippingIterator.Strategy;
+
+public final class DiskVerticesInfo<V extends WritableComparable<? super V>, E extends Writable, M extends Writable>
+    implements VerticesInfo<V, E, M> {
+
+  public static final String DISK_VERTICES_PATH_KEY = "hama.disk.vertices.path";
+
+  private static final byte NULL = 0;
+  private static final byte NOT_NULL = 1;
+
+  private RandomAccessFile staticGraphParts;
+  private RandomAccessFile softGraphParts;
+  private RandomAccessFile softGraphPartsNextIteration;
+
+  private BitSet activeVertices;
+  private long[] softValueOffsets;
+  private long[] softValueOffsetsNextIteration;
+  private long[] staticOffsets;
+
+  private ArrayList<Long> tmpSoftOffsets;
+  private ArrayList<Long> tmpStaticOffsets;
+
+  private int size;
+  private boolean lockedAdditions = false;
+  private String rootPath;
+  private Vertex<V, E, M> cachedVertexInstance;
+  private int currentStep = 0;
+  private int index = 0;
+  private Configuration conf;
+  private GraphJobRunner<V, E, M> runner;
+
+  @Override
+  public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+      TaskAttemptID attempt) throws IOException {
+    this.runner = runner;
+    this.conf = conf;
+    tmpSoftOffsets = new ArrayList<Long>();
+    tmpStaticOffsets = new ArrayList<Long>();
+    String p = conf.get(DISK_VERTICES_PATH_KEY, "/tmp/graph/");
+    rootPath = p + attempt.getJobID().toString() + "/" + attempt.toString()
+        + "/";
+    LocalFileSystem local = FileSystem.getLocal(conf);
+    local.mkdirs(new Path(rootPath));
+    // make sure that those files do not exist
+    String staticFile = rootPath + "static.graph";
+    local.delete(new Path(staticFile), false);
+    staticGraphParts = new RandomAccessFile(staticFile, "rw");
+    String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
+    local.delete(new Path(softGraphFileName), false);
+    softGraphParts = new RandomAccessFile(softGraphFileName, "rw");
+  }
+
+  @Override
+  public void cleanup(Configuration conf, TaskAttemptID attempt)
+      throws IOException {
+    IOUtils.cleanup(null, softGraphParts, softGraphPartsNextIteration);
+    // delete the contents
+    FileSystem.getLocal(conf).delete(new Path(rootPath), true);
+  }
+
+  @Override
+  public void addVertex(Vertex<V, E, M> vertex) throws IOException {
+    // messages must be added in sorted order to work this out correctly
+    checkArgument(!lockedAdditions,
+        "Additions are locked now, nobody is allowed to change the structure anymore.");
+
+    // write the static parts
+    tmpStaticOffsets.add(staticGraphParts.length());
+    vertex.getVertexID().write(staticGraphParts);
+    staticGraphParts.writeInt(vertex.getEdges() == null ? 0 : vertex.getEdges()
+        .size());
+    for (Edge<?, ?> e : vertex.getEdges()) {
+      e.getDestinationVertexID().write(staticGraphParts);
+    }
+
+    serializeSoft(vertex, -1, null, softGraphParts);
+
+    size++;
+  }
+
+  /**
+   * Serializes the vertex's soft parts to its file. If the vertex does not have
+   * an index yet (e.G. at startup) you can provide -1 and it will be added to
+   * the temporary storage.
+   */
+  private void serializeSoft(Vertex<V, E, M> vertex, int index,
+      long[] softValueOffsets, RandomAccessFile softGraphParts)
+      throws IOException {
+    // safe offset write the soft parts
+    if (index >= 0) {
+      softValueOffsets[index] = softGraphParts.length();
+      // only set the bitset if we've finished the setup
+      activeVertices.set(index, vertex.isHalted());
+    } else {
+      tmpSoftOffsets.add(softGraphParts.length());
+    }
+    if (vertex.getValue() == null) {
+      softGraphParts.write(NULL);
+    } else {
+      softGraphParts.write(NOT_NULL);
+      vertex.getValue().write(softGraphParts);
+    }
+    vertex.writeState(softGraphParts);
+    softGraphParts.writeInt(vertex.getEdges().size());
+    for (Edge<?, ?> e : vertex.getEdges()) {
+      if (e.getValue() == null) {
+        softGraphParts.write(NULL);
+      } else {
+        softGraphParts.write(NOT_NULL);
+        e.getValue().write(softGraphParts);
+      }
+    }
+  }
+
+  @Override
+  public void finishAdditions() {
+    // copy the arraylist to a plain array
+    softValueOffsets = copy(tmpSoftOffsets);
+    softValueOffsetsNextIteration = copy(tmpSoftOffsets);
+    staticOffsets = copy(tmpStaticOffsets);
+    activeVertices = new BitSet(size);
+
+    tmpStaticOffsets = null;
+    tmpSoftOffsets = null;
+
+    // prevent additional vertices from beeing added
+    lockedAdditions = true;
+  }
+
+  private static long[] copy(ArrayList<Long> lst) {
+    long[] arr = new long[lst.size()];
+    for (int i = 0; i < arr.length; i++) {
+      arr[i] = lst.get(i);
+    }
+    return arr;
+  }
+
+  @Override
+  public boolean isFinishedAdditions() {
+    return lockedAdditions;
+  }
+
+  @Override
+  public void startSuperstep() throws IOException {
+    index = 0;
+    String softGraphFileName = getSoftGraphFileName(rootPath, currentStep);
+    FileSystem.getLocal(conf).delete(new Path(softGraphFileName), true);
+    softGraphPartsNextIteration = new RandomAccessFile(softGraphFileName, "rw");
+    softValueOffsets = softValueOffsetsNextIteration;
+    softValueOffsetsNextIteration = new long[softValueOffsetsNextIteration.length];
+  }
+
+  @Override
+  public void finishVertexComputation(Vertex<V, E, M> vertex)
+      throws IOException {
+    // write to the soft parts
+    serializeSoft(vertex, index++, softValueOffsetsNextIteration,
+        softGraphPartsNextIteration);
+  }
+
+  @Override
+  public void finishSuperstep() throws IOException {
+    // do not delete files in the first step
+    if (currentStep > 0) {
+      softGraphParts.close();
+      FileSystem.getLocal(conf).delete(
+          new Path(getSoftGraphFileName(rootPath, currentStep - 1)), true);
+      softGraphParts = softGraphPartsNextIteration;
+    }
+    currentStep++;
+  }
+
+  @Override
+  public int size() {
+    return size;
+  }
+
+  private final class IDSkippingDiskIterator extends
+      IDSkippingIterator<V, E, M> {
+
+    int currentIndex = 0;
+
+    @Override
+    public Vertex<V, E, M> next() {
+      return cachedVertexInstance;
+    }
+
+    @Override
+    public boolean hasNext(V e,
+        org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+      if (currentIndex >= size) {
+        return false;
+      } else {
+        currentIndex = fill(strat, currentIndex, e);
+        return true;
+      }
+    }
+
+  }
+
+  @Override
+  public IDSkippingIterator<V, E, M> skippingIterator() {
+    try {
+      // reset
+      staticGraphParts.seek(0);
+      softGraphParts.seek(0);
+      // ensure the vertex is not null
+      if (cachedVertexInstance == null) {
+        cachedVertexInstance = GraphJobRunner
+            .<V, E, M> newVertexInstance(GraphJobRunner.VERTEX_CLASS);
+        cachedVertexInstance.runner = runner;
+      }
+      ensureVertexIDNotNull();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return new IDSkippingDiskIterator();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void ensureVertexIDNotNull() {
+    if (cachedVertexInstance.getVertexID() == null) {
+      cachedVertexInstance.setVertexID((V) GraphJobRunner
+          .createVertexIDObject());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void ensureVertexValueNotNull() {
+    if (cachedVertexInstance.getValue() == null) {
+      cachedVertexInstance.setValue((M) GraphJobRunner.createVertexValue());
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "static-method" })
+  private void ensureEdgeIDNotNull(Edge<V, E> edge) {
+    if (edge.getDestinationVertexID() == null) {
+      edge.setDestinationVertexID((V) GraphJobRunner.createVertexIDObject());
+    }
+  }
+
+  @SuppressWarnings({ "unchecked", "static-method" })
+  private void ensureEdgeValueNotNull(Edge<V, E> edge) {
+    if (edge.getValue() == null) {
+      edge.setCost((E) GraphJobRunner.createEdgeCostObject());
+    }
+  }
+
+  /**
+   * Fills the cachedVertexInstance with the next acceptable item after the
+   * given index that matches the given messageVertexID if provided.
+   * 
+   * @param strat the strategy that defines if a vertex that is serialized
+   *          should be accepted.
+   * @param index the index of the vertices to start from.
+   * @param messageVertexId the message vertex id that can be matched by the
+   *          strategy. Can be null as well, this is handled by the strategy.
+   * @return the index of the item after the currently found item.
+   */
+  private int fill(Strategy strat, int index, V messageVertexId) {
+    try {
+      while (true) {
+        // seek until we found something that satisfied our strategy
+        staticGraphParts.seek(staticOffsets[index]);
+        boolean halted = activeVertices.get(index);
+        cachedVertexInstance.setVotedToHalt(halted);
+        cachedVertexInstance.getVertexID().readFields(staticGraphParts);
+        if (strat.accept(cachedVertexInstance, messageVertexId)) {
+          break;
+        }
+        if (++index >= size) {
+          return size;
+        }
+      }
+      softGraphParts.seek(softValueOffsets[index]);
+
+      // setting vertex value null here, because it may be overridden. Messaging
+      // is not materializing the message directly- so it is possible for the
+      // read fields method to change this object (thus a new object).
+      cachedVertexInstance.setValue(null);
+      if (softGraphParts.readByte() == NOT_NULL) {
+        ensureVertexValueNotNull();
+        cachedVertexInstance.getValue().readFields(softGraphParts);
+      }
+
+      cachedVertexInstance.readState(softGraphParts);
+      int numEdges = staticGraphParts.readInt();
+      int softEdges = softGraphParts.readInt();
+      if (softEdges != numEdges) {
+        throw new IllegalArgumentException(
+            "Number of edges seemed to change. This is not possible (yet).");
+      }
+      // edges could actually be cached, however the local mode is preventing it
+      // sometimes as edge destinations are send and possible overridden in
+      // messages here.
+      ArrayList<Edge<V, E>> edges = new ArrayList<Edge<V, E>>();
+      // read the soft file in parallel
+      for (int i = 0; i < numEdges; i++) {
+        Edge<V, E> edge = new Edge<V, E>();
+        ensureEdgeValueNotNull(edge);
+        ensureEdgeIDNotNull(edge);
+        edge.getDestinationVertexID().readFields(staticGraphParts);
+        if (softGraphParts.readByte() == NOT_NULL) {
+          ensureEdgeValueNotNull(edge);
+          edge.getCost().readFields(softGraphParts);
+        } else {
+          edge.setCost(null);
+        }
+        edges.add(edge);
+      }
+
+      // make edges unmodifiable
+      cachedVertexInstance.setEdges(Collections.unmodifiableList(edges));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    return index + 1;
+  }
+
+  private static String getSoftGraphFileName(String root, int step) {
+    return root + "soft_" + step + ".graph";
+  }
+
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Mon Feb 25 11:40:13 2013
@@ -25,8 +25,12 @@ import org.apache.hadoop.io.WritableComp
  * The edge class
  */
 public final class Edge<VERTEX_ID extends WritableComparable<? super VERTEX_ID>, EDGE_VALUE_TYPE extends Writable> {
-  private final VERTEX_ID destinationVertexID;
-  private final EDGE_VALUE_TYPE cost;
+
+  private VERTEX_ID destinationVertexID;
+  private EDGE_VALUE_TYPE cost;
+
+  public Edge() {
+  }
 
   public Edge(VERTEX_ID sourceVertexID, EDGE_VALUE_TYPE cost) {
     this.destinationVertexID = sourceVertexID;
@@ -45,6 +49,18 @@ public final class Edge<VERTEX_ID extend
     return cost;
   }
 
+  public EDGE_VALUE_TYPE getCost() {
+    return cost;
+  }
+
+  void setCost(EDGE_VALUE_TYPE cost) {
+    this.cost = cost;
+  }
+
+  void setDestinationVertexID(VERTEX_ID destinationVertexID) {
+    this.destinationVertexID = destinationVertexID;
+  }
+
   @Override
   public String toString() {
     return this.destinationVertexID + ":" + this.getValue();

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJob.java Mon Feb 25 11:40:13 2013
@@ -30,6 +30,9 @@ import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.PartitioningRunner.RecordConverter;
+import org.apache.hama.bsp.message.MessageManager;
+import org.apache.hama.bsp.message.queue.MessageQueue;
+import org.apache.hama.bsp.message.queue.SortedMessageQueue;
 
 import com.google.common.base.Preconditions;
 
@@ -134,8 +137,8 @@ public class GraphJob extends BSPJob {
   }
 
   @Override
-  public void setPartitioner(@SuppressWarnings("rawtypes")
-  Class<? extends Partitioner> theClass) {
+  public void setPartitioner(
+      @SuppressWarnings("rawtypes") Class<? extends Partitioner> theClass) {
     super.setPartitioner(theClass);
     conf.setBoolean(Constants.ENABLE_RUNTIME_PARTITIONING, true);
   }
@@ -177,6 +180,10 @@ public class GraphJob extends BSPJob {
                 Constants.RUNTIME_PARTITION_RECORDCONVERTER) != null,
             "Please provide a converter class for your vertex by using GraphJob#setVertexInputReaderClass!");
 
+    // add the default message queue to the sorted one
+    this.getConfiguration().setClass(MessageManager.QUEUE_TYPE_CLASS,
+        SortedMessageQueue.class, MessageQueue.class);
+
     super.submit();
   }
 

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Mon Feb 25 11:40:13 2013
@@ -20,8 +20,6 @@ package org.apache.hama.graph;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
@@ -34,21 +32,20 @@ import org.apache.hadoop.util.Reflection
  * real message (vertex ID and value). It can be extended by adding flags, for
  * example for a graph repair call.
  */
-public final class GraphJobMessage implements Writable {
+public final class GraphJobMessage implements
+    WritableComparable<GraphJobMessage> {
 
   public static final int MAP_FLAG = 0x01;
   public static final int VERTEX_FLAG = 0x02;
-  public static final int REPAIR_FLAG = 0x04;
-  public static final int PARTITION_FLAG = 0x08;
-  public static final int VERTICES_SIZE_FLAG = 0x10;
+  public static final int VERTICES_SIZE_FLAG = 0x04;
 
   // default flag to -1 "unknown"
   private int flag = -1;
   private MapWritable map;
-  private Writable vertexId;
+  @SuppressWarnings("rawtypes")
+  private WritableComparable vertexId;
   private Writable vertexValue;
-  private Vertex<?, ?, ?> vertex;
-  private IntWritable vertices_size;
+  private IntWritable verticesSize;
 
   public GraphJobMessage() {
   }
@@ -58,25 +55,15 @@ public final class GraphJobMessage imple
     this.map = map;
   }
 
-  public GraphJobMessage(Writable vertexId) {
-    this.flag = REPAIR_FLAG;
-    this.vertexId = vertexId;
-  }
-
-  public GraphJobMessage(Writable vertexId, Writable vertexValue) {
+  public GraphJobMessage(WritableComparable<?> vertexId, Writable vertexValue) {
     this.flag = VERTEX_FLAG;
     this.vertexId = vertexId;
     this.vertexValue = vertexValue;
   }
 
-  public GraphJobMessage(Vertex<?, ?, ?> vertex) {
-    this.flag = PARTITION_FLAG;
-    this.vertex = vertex;
-  }
-
   public GraphJobMessage(IntWritable size) {
     this.flag = VERTICES_SIZE_FLAG;
-    this.vertices_size = size;
+    this.verticesSize = size;
   }
 
   @Override
@@ -89,28 +76,8 @@ public final class GraphJobMessage imple
       vertexValue.write(out);
     } else if (isMapMessage()) {
       map.write(out);
-    } else if (isPartitioningMessage()) {
-      vertex.getVertexID().write(out);
-      if (vertex.getValue() != null) {
-        out.writeBoolean(true);
-        vertex.getValue().write(out);
-      } else {
-        out.writeBoolean(false);
-      }
-      List<?> outEdges = vertex.getEdges();
-      out.writeInt(outEdges.size());
-      for (Object e : outEdges) {
-        Edge<?, ?> edge = (Edge<?, ?>) e;
-        edge.getDestinationVertexID().write(out);
-        if (edge.getValue() != null) {
-          out.writeBoolean(true);
-          edge.getValue().write(out);
-        } else {
-          out.writeBoolean(false);
-        }
-      }
     } else if (isVerticesSizeMessage()) {
-      vertices_size.write(out);
+      verticesSize.write(out);
     } else {
       vertexId.write(out);
     }
@@ -128,39 +95,9 @@ public final class GraphJobMessage imple
     } else if (isMapMessage()) {
       map = new MapWritable();
       map.readFields(in);
-    } else if (isPartitioningMessage()) {
-      Vertex<WritableComparable<Writable>, Writable, Writable> vertex = GraphJobRunner
-          .newVertexInstance(GraphJobRunner.VERTEX_CLASS);
-      WritableComparable<Writable> vertexId = GraphJobRunner
-          .createVertexIDObject();
-      vertexId.readFields(in);
-      vertex.setVertexID(vertexId);
-      if (in.readBoolean()) {
-        Writable vertexValue = GraphJobRunner.createVertexValue();
-        vertexValue.readFields(in);
-        vertex.setValue(vertexValue);
-      }
-      int size = in.readInt();
-      vertex
-          .setEdges(new ArrayList<Edge<WritableComparable<Writable>, Writable>>(
-              size));
-      for (int i = 0; i < size; i++) {
-        WritableComparable<Writable> edgeVertexID = GraphJobRunner
-            .createVertexIDObject();
-        edgeVertexID.readFields(in);
-        Writable edgeValue = null;
-        if (in.readBoolean()) {
-          edgeValue = GraphJobRunner.createEdgeCostObject();
-          edgeValue.readFields(in);
-        }
-        vertex.getEdges().add(
-            new Edge<WritableComparable<Writable>, Writable>(edgeVertexID,
-                edgeValue));
-      }
-      this.vertex = vertex;
     } else if (isVerticesSizeMessage()) {
-      vertices_size = new IntWritable();
-      vertices_size.readFields(in);
+      verticesSize = new IntWritable();
+      verticesSize.readFields(in);
     } else {
       vertexId = ReflectionUtils.newInstance(GraphJobRunner.VERTEX_ID_CLASS,
           null);
@@ -168,6 +105,21 @@ public final class GraphJobMessage imple
     }
   }
 
+  @SuppressWarnings("unchecked")
+  @Override
+  public int compareTo(GraphJobMessage that) {
+    if (this.flag != that.flag) {
+      return (this.flag - that.flag);
+    } else {
+      if (this.isVertexMessage()) {
+        return this.vertexId.compareTo(that.vertexId);
+      } else if (this.isMapMessage()) {
+        return Integer.MIN_VALUE;
+      }
+    }
+    return 0;
+  }
+
   public MapWritable getMap() {
     return map;
   }
@@ -180,12 +132,8 @@ public final class GraphJobMessage imple
     return vertexValue;
   }
 
-  public Vertex<?, ?, ?> getVertex() {
-    return vertex;
-  }
-
   public IntWritable getVerticesSize() {
-    return vertices_size;
+    return verticesSize;
   }
 
   public boolean isMapMessage() {
@@ -196,23 +144,22 @@ public final class GraphJobMessage imple
     return flag == VERTEX_FLAG;
   }
 
-  public boolean isRepairMessage() {
-    return flag == REPAIR_FLAG;
-  }
-
-  public boolean isPartitioningMessage() {
-    return flag == PARTITION_FLAG;
-  }
-
   public boolean isVerticesSizeMessage() {
     return flag == VERTICES_SIZE_FLAG;
   }
 
   @Override
   public String toString() {
-    return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
-        + vertexId + ", vertexValue=" + vertexValue + ", vertex=" + vertex
-        + "]";
+    if (isVertexMessage()) {
+      return "ID: " + vertexId + " Val: " + vertexValue;
+    } else if (isMapMessage()) {
+      return "Map: " + map;
+    } else if (isVerticesSizeMessage()) {
+      return "#Vertices: " + verticesSize;
+    } else {
+      return "GraphJobMessage [flag=" + flag + ", map=" + map + ", vertexId="
+          + vertexId + ", vertexValue=" + vertexValue + "]";
+    }
   }
 
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Feb 25 11:40:13 2013
@@ -18,11 +18,7 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -30,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -39,7 +36,7 @@ import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.util.KeyValuePair;
+import org.apache.hama.graph.IDSkippingIterator.Strategy;
 import org.apache.hama.util.ReflectionUtils;
 
 /**
@@ -49,7 +46,7 @@ import org.apache.hama.util.ReflectionUt
  * @param <E> the value type of an edge.
  * @param <M> the value type of a vertex.
  */
-public final class GraphJobRunner<V extends WritableComparable<V>, E extends Writable, M extends Writable>
+public final class GraphJobRunner<V extends WritableComparable<? super V>, E extends Writable, M extends Writable>
     extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
 
   public static enum GraphJobCounter {
@@ -77,7 +74,7 @@ public final class GraphJobRunner<V exte
   public static Class<? extends Writable> EDGE_VALUE_CLASS;
   public static Class<Vertex<?, ?, ?>> vertexClass;
 
-  private IVerticesInfo<V, E, M> vertices;
+  private VerticesInfo<V, E, M> vertices;
   private boolean updated = true;
   private int globalUpdateCounts = 0;
 
@@ -119,15 +116,16 @@ public final class GraphJobRunner<V exte
       peer.sync();
 
       // note that the messages must be parsed here
-      final Map<V, List<M>> messages = parseMessages(peer);
-      // master needs to update
-      doMasterUpdates(peer);
-      // if aggregators say we don't have updates anymore, break
-      if (!aggregationRunner.receiveAggregatedValues(peer, iteration)) {
+      GraphJobMessage firstVertexMessage = parseMessages(peer);
+      // master/slaves needs to update
+      firstVertexMessage = doAggregationUpdates(firstVertexMessage, peer);
+      // check if updated changed by our aggregators
+      if (!updated) {
         break;
       }
+
       // loop over vertices and do their computation
-      doSuperstep(messages, peer);
+      doSuperstep(firstVertexMessage, peer);
 
       if (isMasterTask(peer)) {
         peer.getCounter(GraphJobCounter.ITERATIONS).increment(1);
@@ -144,9 +142,12 @@ public final class GraphJobRunner<V exte
   public final void cleanup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Vertex<V, E, M> e : vertices) {
+    IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
+    while (skippingIterator.hasNext()) {
+      Vertex<V, E, M> e = skippingIterator.next();
       peer.write(e.getVertexID(), e.getValue());
     }
+    vertices.cleanup(conf, peer.getTaskId());
   }
 
   /**
@@ -154,9 +155,12 @@ public final class GraphJobRunner<V exte
    * master aggregation. In case of no aggregators defined, we save a sync by
    * reading multiple typed messages.
    */
-  private void doMasterUpdates(
+  private GraphJobMessage doAggregationUpdates(
+      GraphJobMessage firstVertexMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
-      throws IOException {
+      throws IOException, SyncException, InterruptedException {
+
+    // this is only done in every second iteration
     if (isMasterTask(peer) && iteration > 1) {
       MapWritable updatedCnt = new MapWritable();
       // exit if there's no update made
@@ -165,70 +169,150 @@ public final class GraphJobRunner<V exte
       } else {
         aggregationRunner.doMasterAggregation(updatedCnt);
       }
-      // send the updates from the mater tasks back to the slaves
+      // send the updates from the master tasks back to the slaves
       for (String peerName : peer.getAllPeerNames()) {
         peer.send(peerName, new GraphJobMessage(updatedCnt));
       }
     }
+    if (aggregationRunner.isEnabled() && iteration > 1) {
+      // in case we need to sync, we need to replay the messages that already
+      // are added to the queue. This prevents loosing messages when using
+      // aggregators.
+      if (firstVertexMessage != null) {
+        peer.send(peer.getPeerName(), firstVertexMessage);
+      }
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        peer.send(peer.getPeerName(), msg);
+      }
+      // now sync
+      peer.sync();
+      // now the map message must be read that might be send from the master
+      updated = aggregationRunner.receiveAggregatedValues(peer
+          .getCurrentMessage().getMap(), iteration);
+      // set the first vertex message back to the message it had before sync
+      firstVertexMessage = peer.getCurrentMessage();
+    }
+    return firstVertexMessage;
   }
 
   /**
    * Do the main logic of a superstep, namely checking if vertices are active,
    * feeding compute with messages and controlling combiners/aggregators.
    */
-  private void doSuperstep(Map<V, List<M>> messages,
+  @SuppressWarnings("unchecked")
+  private void doSuperstep(GraphJobMessage currentMessage,
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
-    for (Vertex<V, E, M> vertex : vertices) {
-      List<M> msgs = messages.get(vertex.getVertexID());
-      // If there are newly received messages, restart.
-      if (vertex.isHalted() && msgs != null) {
-        vertex.setActive();
+    vertices.startSuperstep();
+    /*
+     * We iterate over our messages and vertices in sorted order. That means
+     * that we need to seek the first vertex that has the same ID as the
+     * currentMessage or the first vertex that is active.
+     */
+    IDSkippingIterator<V, E, M> iterator = vertices.skippingIterator();
+    // note that can't skip inactive vertices because we have to rewrite the
+    // complete vertex file in each iteration
+    while (iterator.hasNext(
+        currentMessage == null ? null : (V) currentMessage.getVertexId(),
+        Strategy.ALL)) {
+
+      Vertex<V, E, M> vertex = iterator.next();
+      VertexMessageIterable<V, M> iterable = null;
+      if (currentMessage != null) {
+        iterable = iterate(currentMessage, (V) currentMessage.getVertexId(),
+            vertex, peer);
       }
-      if (msgs == null) {
-        msgs = Collections.emptyList();
+      if (iterable != null && vertex.isHalted()) {
+        vertex.setActive();
       }
-
       if (!vertex.isHalted()) {
-        if (combiner != null) {
-          M combined = combiner.combine(msgs);
-          msgs = new ArrayList<M>();
-          msgs.add(combined);
-        }
         M lastValue = vertex.getValue();
-        vertex.compute(msgs.iterator());
+        if (iterable == null) {
+          vertex.compute(Collections.<M> emptyList());
+        } else {
+          if (combiner != null) {
+            M combined = combiner.combine(iterable);
+            vertex.compute(Collections.singleton(combined));
+          } else {
+            vertex.compute(iterable);
+          }
+          currentMessage = iterable.getOverflowMessage();
+        }
         aggregationRunner.aggregateVertex(lastValue, vertex);
+        // check for halt again after computation
         if (!vertex.isHalted()) {
           activeVertices++;
         }
       }
+
+      // note that we even need to rewrite the vertex if it is halted for
+      // consistency reasons
+      vertices.finishVertexComputation(vertex);
     }
+    vertices.finishSuperstep();
 
     aggregationRunner.sendAggregatorValues(peer, activeVertices);
     iteration++;
   }
 
   /**
+   * Iterating utility that ensures following things: <br/>
+   * - if vertex is active, but the given message does not match the vertexID,
+   * return null. <br/>
+   * - if vertex is inactive, but received a message that matches the ID, build
+   * an iterator that can be iterated until the next vertex has been reached
+   * (not buffer in memory) and set the vertex active <br/>
+   * - if vertex is active, and the given message does match the vertexID,
+   * return an iterator that can be iterated until the next vertex has been
+   * reached. <br/>
+   * - if vertex is inactive, and received no message, return null.
+   */
+  private VertexMessageIterable<V, M> iterate(GraphJobMessage currentMessage,
+      V firstMessageId, Vertex<V, E, M> vertex,
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    int comparision = firstMessageId.compareTo(vertex.getVertexID());
+    if (comparision < 0) {
+      throw new IllegalArgumentException(
+          "Messages must never be behind the vertex in ID! Current Message ID: "
+              + firstMessageId + " vs. " + vertex.getVertexID());
+    } else if (comparision == 0) {
+      // vertex id matches with the vertex, return an iterator with newest
+      // message
+      return new VertexMessageIterable<V, M>(currentMessage,
+          vertex.getVertexID(), peer);
+    } else {
+      // return null
+      return null;
+    }
+  }
+
+  /**
    * Seed the vertices first with their own values in compute. This is the first
    * superstep after the vertices have been loaded.
    */
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Vertex<V, E, M> vertex : vertices) {
-      List<M> singletonList = Collections.singletonList(vertex.getValue());
+    vertices.startSuperstep();
+    IDSkippingIterator<V, E, M> skippingIterator = vertices.skippingIterator();
+    while (skippingIterator.hasNext()) {
+      Vertex<V, E, M> vertex = skippingIterator.next();
       M lastValue = vertex.getValue();
-      vertex.compute(singletonList.iterator());
+      vertex.compute(Collections.singleton(vertex.getValue()));
       aggregationRunner.aggregateVertex(lastValue, vertex);
+      vertices.finishVertexComputation(vertex);
     }
+    vertices.finishSuperstep();
     aggregationRunner.sendAggregatorValues(peer, 1);
     iteration++;
   }
 
   @SuppressWarnings("unchecked")
   private void setupFields(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
     this.peer = peer;
     this.conf = peer.getConfiguration();
     maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
@@ -253,7 +337,8 @@ public final class GraphJobRunner<V exte
     aggregationRunner = new AggregationRunner<V, E, M>();
     aggregationRunner.setupAggregators(peer);
 
-    vertices = new ListVerticesInfo<V, E, M>();
+    vertices = new DiskVerticesInfo<V, E, M>();
+    vertices.init(this, conf, peer.getTaskId());
   }
 
   @SuppressWarnings("unchecked")
@@ -279,33 +364,31 @@ public final class GraphJobRunner<V exte
   /**
    * Loads vertices into memory of each peer.
    */
-  @SuppressWarnings("unchecked")
   private void loadVertices(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
     final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
 
-    if (LOG.isDebugEnabled())
-      LOG.debug("Vertex class: " + vertexClass);
+    LOG.debug("Vertex class: " + vertexClass);
 
-    KeyValuePair<Writable, Writable> next;
-    // our VertexInputReader ensures that the incoming vertices are sorted by
-    // ID.
-    while ((next = peer.readNext()) != null) {
-      Vertex<V, E, M> vertex = (Vertex<V, E, M>) next.getKey();
-      vertex.runner = this;
+    // our VertexInputReader ensures incoming vertices are sorted by their ID
+    Vertex<V, E, M> vertex = GraphJobRunner
+        .<V, E, M> newVertexInstance(VERTEX_CLASS);
+    vertex.runner = this;
+    while (peer.readNext(vertex, NullWritable.get())) {
       vertex.setup(conf);
       if (selfReference) {
         vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }
       vertices.addVertex(vertex);
     }
+    vertices.finishAdditions();
+    // finish the "superstep" because we have written a new file here
+    vertices.finishSuperstep();
 
     LOG.info(vertices.size() + " vertices are loaded into "
         + peer.getPeerName());
-
-    if (LOG.isDebugEnabled())
-      LOG.debug("Starting Vertex processing!");
+    LOG.debug("Starting Vertex processing!");
   }
 
   /**
@@ -337,26 +420,21 @@ public final class GraphJobRunner<V exte
    * Parses the messages in every superstep and does actions according to flags
    * in the messages.
    * 
-   * @return a map that contains messages pro vertex.
+   * @return the first vertex message, null if none received.
    */
   @SuppressWarnings("unchecked")
-  private Map<V, List<M>> parseMessages(
+  private GraphJobMessage parseMessages(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
-      throws IOException {
-    GraphJobMessage msg;
-    final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+      throws IOException, SyncException, InterruptedException {
+    GraphJobMessage msg = null;
     while ((msg = peer.getCurrentMessage()) != null) {
       // either this is a vertex message or a directive that must be read
       // as map
       if (msg.isVertexMessage()) {
-        final V vertexID = (V) msg.getVertexId();
-        final M value = (M) msg.getVertexValue();
-        List<M> msgs = msgMap.get(vertexID);
-        if (msgs == null) {
-          msgs = new ArrayList<M>();
-          msgMap.put(vertexID, msgs);
-        }
-        msgs.add(value);
+        // if we found a vertex message (ordering defines they come after map
+        // messages, we return that as the first message so the outward process
+        // can join them correctly with the VerticesInfo.
+        break;
       } else if (msg.isMapMessage()) {
         for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
           Text vertexID = (Text) e.getKey();
@@ -376,12 +454,13 @@ public final class GraphJobRunner<V exte
                 (M) e.getValue());
           }
         }
+
       } else {
         throw new UnsupportedOperationException("Unknown message type: " + msg);
       }
 
     }
-    return msgMap;
+    return msg;
   }
 
   /**

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java?rev=1449666&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/IDSkippingIterator.java Mon Feb 25 11:40:13 2013
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Iterator that allows skipping of items on disk based on some given stategy.
+ */
+public abstract class IDSkippingIterator<V extends WritableComparable<? super V>, E extends Writable, M extends Writable> {
+
+  enum Strategy {
+    ALL, ACTIVE, ACTIVE_AND_MESSAGES, INACTIVE;
+
+    // WritableComparable is really sucking in this type constellation
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    public boolean accept(Vertex v, WritableComparable msgId) {
+      switch (this) {
+        case ACTIVE_AND_MESSAGES:
+          if (msgId != null) {
+            return !v.isHalted() || v.getVertexID().compareTo(msgId) == 0;
+          }
+          // fallthrough to activeness if we don't have a message anymore
+        case ACTIVE:
+          return !v.isHalted();
+        case INACTIVE:
+          return v.isHalted();
+        case ALL:
+          // fall through intended
+        default:
+          return true;
+      }
+    }
+  }
+
+  /**
+   * Skips nothing, accepts everything.
+   * 
+   * @return true if the strategy found a new item, false if not.
+   */
+  public boolean hasNext() {
+    return hasNext(null, Strategy.ALL);
+  }
+
+  /**
+   * Skips until the given strategy is satisfied.
+   * 
+   * @return true if the strategy found a new item, false if not.
+   */
+  public abstract boolean hasNext(V e, Strategy strat);
+
+  /**
+   * @return a found vertex that can be read safely.
+   */
+  public abstract Vertex<V, E, M> next();
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/ListVerticesInfo.java Mon Feb 25 11:40:13 2013
@@ -17,14 +17,14 @@
  */
 package org.apache.hama.graph;
 
-import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hama.bsp.TaskAttemptID;
 
 /**
  * VerticesInfo encapsulates the storage of vertices in a BSP Task.
@@ -34,16 +34,11 @@ import org.apache.hadoop.io.WritableComp
  * @param <M> Vertex value object type
  */
 public final class ListVerticesInfo<V extends WritableComparable<V>, E extends Writable, M extends Writable>
-    implements IVerticesInfo<V, E, M> {
+    implements VerticesInfo<V, E, M> {
 
   private final List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>(
       100);
 
-  /*
-   * (non-Javadoc)
-   * @see
-   * org.apache.hama.graph.IVerticesInfo#addVertex(org.apache.hama.graph.Vertex)
-   */
   @Override
   public void addVertex(Vertex<V, E, M> vertex) {
     vertices.add(vertex);
@@ -53,39 +48,74 @@ public final class ListVerticesInfo<V ex
     vertices.clear();
   }
 
-  /*
-   * (non-Javadoc)
-   * @see org.apache.hama.graph.IVerticesInfo#size()
-   */
   @Override
   public int size() {
     return this.vertices.size();
   }
 
-  /*
-   * (non-Javadoc)
-   * @see org.apache.hama.graph.IVerticesInfo#iterator()
-   */
   @Override
-  public Iterator<Vertex<V, E, M>> iterator() {
-    return vertices.iterator();
+  public IDSkippingIterator<V, E, M> skippingIterator() {
+    return new IDSkippingIterator<V, E, M>() {
+      int currentIndex = 0;
+
+      @Override
+      public boolean hasNext(V e,
+          org.apache.hama.graph.IDSkippingIterator.Strategy strat) {
+        if (currentIndex < vertices.size()) {
+
+          while (!strat.accept(vertices.get(currentIndex), e)) {
+            currentIndex++;
+          }
+
+          return true;
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public Vertex<V, E, M> next() {
+        return vertices.get(currentIndex++);
+      }
+
+    };
   }
 
-  /*
-   * (non-Javadoc)
-   * @see org.apache.hama.graph.IVerticesInfo#recoverState(java.io.DataInput)
-   */
   @Override
-  public void recoverState(DataInput in) {
+  public void finishVertexComputation(Vertex<V, E, M> vertex) {
 
   }
 
-  /*
-   * (non-Javadoc)
-   * @see org.apache.hama.graph.IVerticesInfo#saveState(java.io.DataOutput)
-   */
   @Override
-  public void saveState(DataOutput out) {
+  public void finishAdditions() {
+
+  }
 
+  @Override
+  public boolean isFinishedAdditions() {
+    return false;
   }
+
+  @Override
+  public void finishSuperstep() {
+
+  }
+
+  @Override
+  public void cleanup(Configuration conf, TaskAttemptID attempt)
+      throws IOException {
+
+  }
+
+  @Override
+  public void startSuperstep() throws IOException {
+
+  }
+
+  @Override
+  public void init(GraphJobRunner<V, E, M> runner, Configuration conf,
+      TaskAttemptID attempt) throws IOException {
+
+  }
+
 }

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Mon Feb 25 11:40:13 2013
@@ -212,6 +212,10 @@ public abstract class Vertex<V extends W
     return votedToHalt;
   }
 
+  void setVotedToHalt(boolean votedToHalt) {
+    this.votedToHalt = votedToHalt;
+  }
+
   @Override
   public int hashCode() {
     return ((vertexID == null) ? 0 : vertexID.hashCode());
@@ -236,8 +240,8 @@ public abstract class Vertex<V extends W
 
   @Override
   public String toString() {
-    return getVertexID() + (getValue() != null ? " = " + getValue() : "")
-        + " // " + edges;
+    return "Active: " + !votedToHalt + " -> ID: " + getVertexID()
+        + (getValue() != null ? " = " + getValue() : "") + " // " + edges;
   }
 
   @Override

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java?rev=1449666&r1=1449665&r2=1449666&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/VertexInterface.java Mon Feb 25 11:40:13 2013
@@ -18,7 +18,6 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -55,7 +54,7 @@ public interface VertexInterface<V exten
   /**
    * The user-defined function
    */
-  public void compute(Iterator<M> messages) throws IOException;
+  public void compute(Iterable<M> messages) throws IOException;
 
   /**
    * @return a list of outgoing edges of this vertex in the input graph.



Mime
View raw message