hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1351702 - in /hama/trunk: ./ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/example/
Date Tue, 19 Jun 2012 13:08:10 GMT
Author: tjungblut
Date: Tue Jun 19 13:08:09 2012
New Revision: 1351702

URL: http://svn.apache.org/viewvc?rev=1351702&view=rev
Log:
[HAMA-591]: Improve Pagerank

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Jun 19 13:08:09 2012
@@ -3,7 +3,8 @@ Hama Change Log
 Release 0.5 - April 10, 2012 
 
   NEW FEATURES
- 
+
+   HAMA-591: Improve Pagerank (tjungblut) 
    HAMA-550: Implementation of Bipartite Matching (Apurv Verma via tjungblut)
    HAMA-588: Add voteToHalt() mechanism in Graph API (edwardyoon)
    HAMA-566: Add disk-based queue (tjungblut)

Modified: hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Tue Jun 19 13:08:09
2012
@@ -30,12 +30,16 @@ import org.apache.hama.HamaConfiguration
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.TextInputFormat;
+import org.apache.hama.graph.AbstractAggregator;
 import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.Edge;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.Vertex;
 import org.apache.hama.graph.VertexInputReader;
 
+/**
+ * Real pagerank with dangling node contribution.
+ */
 public class PageRank {
 
   public static class PageRankVertex extends
@@ -64,17 +68,22 @@ public class PageRank {
       // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
         this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
-      }
-
-      // in the first superstep, there are no messages to check
-      if (this.getSuperstepCount() >= 1) {
+      } else if (this.getSuperstepCount() >= 1) {
+        DoubleWritable danglingNodeContribution = getLastAggregatedValue(1);
         double sum = 0;
         while (messages.hasNext()) {
           DoubleWritable msg = messages.next();
           sum += msg.get();
         }
-        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-        this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+        if (danglingNodeContribution == null) {
+          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+          this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+        } else {
+          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+          this.setValue(new DoubleWritable(alpha
+              + (DAMPING_FACTOR * (sum + danglingNodeContribution.get()
+                  / this.getNumVertices()))));
+        }
       }
 
       // if we have not reached our global error yet, then proceed.
@@ -130,6 +139,18 @@ public class PageRank {
       printUsage();
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    GraphJob pageJob = createJob(args, conf);
+
+    long startTime = System.currentTimeMillis();
+    if (pageJob.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static GraphJob createJob(String[] args, HamaConfiguration conf)
+      throws IOException {
     GraphJob pageJob = new GraphJob(conf, PageRank.class);
     pageJob.setJobName("Pagerank");
 
@@ -140,9 +161,6 @@ public class PageRank {
     // set the defaults
     pageJob.setMaxIteration(30);
     pageJob.set("hama.pagerank.alpha", "0.85");
-    // we need to include a vertex in its adjacency list,
-    // otherwise the pagerank result has a constant loss
-    pageJob.set("hama.graph.self.ref", "true");
 
     if (args.length == 6)
       pageJob.setNumBspTask(Integer.parseInt(args[5]));
@@ -153,7 +171,9 @@ public class PageRank {
     if (args.length >= 3)
       pageJob.set("hama.pagerank.alpha", args[2]);
 
-    pageJob.setAggregatorClass(AverageAggregator.class);
+    // error, dangling node probability sum
+    pageJob.setAggregatorClass(AverageAggregator.class,
+        DanglingNodeAggregator.class);
 
     pageJob.setVertexIDClass(Text.class);
     pageJob.setVertexValueClass(DoubleWritable.class);
@@ -167,11 +187,31 @@ public class PageRank {
     pageJob.setOutputFormat(SequenceFileOutputFormat.class);
     pageJob.setOutputKeyClass(Text.class);
     pageJob.setOutputValueClass(DoubleWritable.class);
+    return pageJob;
+  }
 
-    long startTime = System.currentTimeMillis();
-    if (pageJob.waitForCompletion(true)) {
-      System.out.println("Job Finished in "
-          + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+  public static class DanglingNodeAggregator
+      extends
+      AbstractAggregator<DoubleWritable, Vertex<Text, NullWritable, DoubleWritable>>
{
+
+    double danglingNodeSum;
+
+    @Override
+    public void aggregate(Vertex<Text, NullWritable, DoubleWritable> vertex,
+        DoubleWritable value) {
+      if (vertex != null) {
+        if (vertex.getEdges().size() == 0) {
+          danglingNodeSum += value.get();
+        }
+      } else {
+        danglingNodeSum += value.get();
+      }
+    }
+
+    @Override
+    public DoubleWritable getValue() {
+      return new DoubleWritable(danglingNodeSum);
     }
+
   }
 }

Modified: hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java (original)
+++ hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java Tue Jun 19
13:08:09 2012
@@ -28,16 +28,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
-import org.apache.hama.bsp.HashPartitioner;
-import org.apache.hama.bsp.SequenceFileOutputFormat;
-import org.apache.hama.bsp.TextInputFormat;
-import org.apache.hama.examples.PageRank.PageRankVertex;
-import org.apache.hama.graph.AverageAggregator;
 import org.apache.hama.graph.GraphJob;
 import org.apache.hama.graph.GraphJobRunner;
 
@@ -55,11 +48,11 @@ public class PageRankTest extends TestCa
    * functionality.
    */
   String[] input = new String[] { "stackoverflow.com\tyahoo.com",
-      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov]",
-      "yahoo.com\tnasa.gov\tstackoverflow.com]",
-      "twitter.com\tgoogle.com\tfacebook.com]",
-      "nasa.gov\tyahoo.com\tstackoverflow.com]",
-      "youtube.com\tgoogle.com\tyahoo.com]" };
+      "facebook.com\ttwitter.com\tgoogle.com\tnasa.gov",
+      "yahoo.com\tnasa.gov\tstackoverflow.com",
+      "twitter.com\tgoogle.com\tfacebook.com",
+      "nasa.gov\tyahoo.com\tstackoverflow.com",
+      "youtube.com\tgoogle.com\tyahoo.com" };
 
   private static String INPUT = "/tmp/pagerank-tmp.seq";
   private static String TEXT_INPUT = "/tmp/pagerank.txt";
@@ -84,6 +77,7 @@ public class PageRankTest extends TestCa
       DoubleWritable value = new DoubleWritable();
 
       while (reader.next(key, value)) {
+        System.out.println(key + " / " + value);
         sum += value.get();
       }
     }
@@ -95,33 +89,10 @@ public class PageRankTest extends TestCa
     generateTestData();
     try {
       HamaConfiguration conf = new HamaConfiguration(new Configuration());
+      conf.set("bsp.local.tasks.maximum", "1"); 
       conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
-      GraphJob pageJob = new GraphJob(conf, PageRank.class);
-      pageJob.setJobName("Pagerank");
-
-      pageJob.setVertexClass(PageRankVertex.class);
-      pageJob.setInputPath(new Path(INPUT));
-      pageJob.setOutputPath(new Path(OUTPUT));
-      pageJob.setNumBspTask(2);
-      // set the defaults
-      pageJob.setMaxIteration(30);
-      pageJob.set("hama.pagerank.alpha", "0.85");
-      // we need to include a vertex in its adjacency list,
-      // otherwise the pagerank result has a constant loss
-      pageJob.set("hama.graph.self.ref", "true");
-
-      pageJob.setAggregatorClass(AverageAggregator.class);
-      pageJob.setInputKeyClass(LongWritable.class);
-      pageJob.setInputValueClass(Text.class);
-      pageJob.setInputFormat(TextInputFormat.class);
-      pageJob.setPartitioner(HashPartitioner.class);
-      pageJob.setOutputFormat(SequenceFileOutputFormat.class);
-      pageJob.setOutputKeyClass(Text.class);
-      pageJob.setOutputValueClass(DoubleWritable.class);
-      pageJob.setVertexInputReaderClass(PageRank.PagerankTextReader.class);
-      pageJob.setVertexIDClass(Text.class);
-      pageJob.setVertexValueClass(DoubleWritable.class);
-      pageJob.setEdgeValueClass(NullWritable.class);
+      GraphJob pageJob = PageRank.createJob(new String[] { INPUT, OUTPUT },
+          conf);
 
       if (!pageJob.waitForCompletion(true)) {
         fail("Job did not complete normally!");

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AbsDiffAggregator.java Tue Jun 19
13:08:09 2012
@@ -38,7 +38,7 @@ public class AbsDiffAggregator extends
     }
   }
 
-  // we a master aggregates he aggregated values, he calls this, so let's just
+  // when a master aggregates he aggregated values, he calls this, so let's just
   // sum up here.
   @Override
   public void aggregate(Vertex<?, ?, DoubleWritable> vertex,

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java?rev=1351702&r1=1351701&r2=1351702&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/example/PageRank.java Tue Jun 19
13:08:09 2012
@@ -30,6 +30,7 @@ import org.apache.hama.graph.Vertex;
 import org.apache.hama.graph.VertexInputReader;
 
 public class PageRank {
+
   public static class PageRankVertex extends
       Vertex<Text, NullWritable, DoubleWritable> {
 
@@ -56,24 +57,21 @@ public class PageRank {
       // initialize this vertex to 1 / count of global vertices in this graph
       if (this.getSuperstepCount() == 0) {
         this.setValue(new DoubleWritable(1.0 / this.getNumVertices()));
-      }
-
-      // in the first superstep, there are no messages to check
-      if (this.getSuperstepCount() >= 1) {
+      } else if (this.getSuperstepCount() >= 1) {
+        DoubleWritable danglingNodeContribution = getLastAggregatedValue(1);
         double sum = 0;
         while (messages.hasNext()) {
           DoubleWritable msg = messages.next();
           sum += msg.get();
         }
-        double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
-        this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
-        if (this.getSuperstepCount() > 1) {
-          if (this.getLastAggregatedValue(1).get() < 0.99
-              || this.getLastAggregatedValue(1).get() > 1.1) {
-            throw new RuntimeException(
-                "Sum aggregator hasn't summed correctly! "
-                    + this.getLastAggregatedValue(1).get());
-          }
+        if (danglingNodeContribution == null) {
+          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+          this.setValue(new DoubleWritable(alpha + (DAMPING_FACTOR * sum)));
+        } else {
+          double alpha = (1.0d - DAMPING_FACTOR) / this.getNumVertices();
+          this.setValue(new DoubleWritable(alpha
+              + (DAMPING_FACTOR * (sum + danglingNodeContribution.get()
+                  / this.getNumVertices()))));
         }
       }
 



Mime
View raw message