incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1230852 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ examples/src/main/java/org/apache/hama/examples/ examples/src/test/java/org/apache/hama/examples/
Date Fri, 13 Jan 2012 01:06:06 GMT
Author: edwardyoon
Date: Fri Jan 13 01:06:05 2012
New Revision: 1230852

URL: http://svn.apache.org/viewvc?rev=1230852&view=rev
Log:
HAMA-488 PageRank refactor and fix bugs

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
    incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Fri Jan 13 01:06:05 2012
@@ -17,6 +17,7 @@ Release 0.4 - Unreleased
 
   BUG FIXES
 
+    HAMA-488: PageRank refactor and fix bugs (tjungblut via edwardyoon)
     HAMA-489: Check port availability before forking child process (edwardyoon)
     HAMA-474: ClusterStatus.getTasks() always returns 0 (edwardyoon)
     HAMA-472: The task should be killed if it fails to initialize (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJob.java Fri Jan 13 01:06:05
2012
@@ -230,7 +230,7 @@ public class BSPJob extends BSPJobContex
   }
 
   public int getNumBspTask() {
-	// default is 1, because with zero, we will hang in infinity
+  // default is 1, because with zero, we will hang in infinity
     return conf.getInt("bsp.peers.num", 1);
   }
 

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPJobClient.java Fri Jan
13 01:06:05 2012
@@ -897,7 +897,17 @@ public class BSPJobClient extends Config
       throw new IOException("Expect one token as the result of "
           + Shell.USER_NAME_COMMAND + ": " + toString(result));
     }
-    return result[0];
+    String fixResult = fixCygwinName(result[0]);
+    return fixResult;
+  }
+
+  private static String fixCygwinName(String in) {
+    String string = in;
+    if (string.contains("\\")) {
+      // this is for cygwin systems
+      string = string.substring(string.indexOf("\\"));
+    }
+    return string;
   }
 
   static String getUnixUserGroupName(String user) throws IOException {

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ExampleDriver.java
Fri Jan 13 01:06:05 2012
@@ -30,6 +30,7 @@ public class ExampleDriver {
       pgd.addClass("sssp", ShortestPaths.class, "Single Shortest Path");
       pgd.addClass("cmb", CombineExample.class, "Combine");
       pgd.addClass("bench", RandBench.class, "Random Benchmark");
+      pgd.addClass("pagerank", PageRank.class, "PageRank");
       pgd.driver(args);
     } catch (Throwable e) {
       e.printStackTrace();

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java (original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/PageRank.java Fri
Jan 13 01:06:05 2012
@@ -28,25 +28,24 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
 import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPJob;
-import org.apache.hama.bsp.BSPJobClient;
 import org.apache.hama.bsp.BSPPeer;
-import org.apache.hama.bsp.ClusterStatus;
 import org.apache.hama.bsp.DoubleMessage;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.SequenceFileInputFormat;
 import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 import org.apache.hama.util.KeyValuePair;
 
 public class PageRank extends
-    BSP<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable> {
+    BSP<VertexWritable, VertexArrayWritable, Text, DoubleWritable> {
+
   public static final Log LOG = LogFactory.getLog(PageRank.class);
 
   private final HashMap<VertexWritable, VertexWritable[]> adjacencyList = new HashMap<VertexWritable,
VertexWritable[]>();
@@ -64,29 +63,40 @@ public class PageRank extends
 
   @Override
   public void setup(
-      BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer)
+      BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
       throws IOException {
-    // map our stuff into ram
 
-    KeyValuePair<VertexWritable, ShortestPathVertexArrayWritable> next = null;
+    DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
+    EPSILON = Double.parseDouble(conf.get("epsilon.error"));
+    MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
+    masterTaskName = peer.getPeerName(0);
+
+    // map our stuff into ram
+    KeyValuePair<VertexWritable, VertexArrayWritable> next = null;
     while ((next = peer.readNext()) != null) {
-      adjacencyList.put(next.getKey(), (ShortestPathVertex[]) next.getValue()
+      adjacencyList.put(next.getKey(), (VertexWritable[]) next.getValue()
           .toArray());
       vertexLookupMap.put(next.getKey().getName(), next.getKey());
     }
 
-    // normally this is the global number of vertices
+    // normally this should be the global number of vertices
     numOfVertices = vertexLookupMap.size();
-    DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
     ALPHA = (1 - DAMPING_FACTOR) / (double) numOfVertices;
-    EPSILON = Double.parseDouble(conf.get("epsilon.error"));
-    MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
-    masterTaskName = peer.getPeerName(0);
+
+    // reread the input to save ram
+    peer.reopenInput();
+    VertexWritable key = new VertexWritable();
+    VertexArrayWritable value = new VertexArrayWritable();
+    while (peer.readNext(key, value)) {
+      VertexWritable vertexWritable = vertexLookupMap.get(key.getName());
+      tentativePagerank
+          .put(vertexWritable, Double.valueOf(1.0 / numOfVertices));
+    }
   }
 
   @Override
   public void bsp(
-      BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer)
+      BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer)
       throws IOException, SyncException, InterruptedException {
 
     // while the error not converges against epsilon do the pagerank stuff
@@ -142,11 +152,11 @@ public class PageRank extends
 
   @Override
   public void cleanup(
-      BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer) {
+      BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer) {
     try {
       for (Entry<VertexWritable, Double> row : tentativePagerank.entrySet()) {
-        peer.write(new Text(row.getKey().getName()), new DoubleWritable(row
-            .getValue()));
+        peer.write(new Text(row.getKey().getName()),
+            new DoubleWritable(row.getValue()));
       }
     } catch (IOException e) {
       e.printStackTrace();
@@ -154,7 +164,7 @@ public class PageRank extends
   }
 
   private double broadcastError(
-      BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer,
+      BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
       double error) throws IOException, SyncException, InterruptedException {
     peer.send(masterTaskName, new DoubleMessage("", error));
     peer.sync();
@@ -194,7 +204,7 @@ public class PageRank extends
   }
 
   private void sendMessageToNeighbors(
-      BSPPeer<VertexWritable, ShortestPathVertexArrayWritable, Text, DoubleWritable>
peer,
+      BSPPeer<VertexWritable, VertexArrayWritable, Text, DoubleWritable> peer,
       VertexWritable v) throws IOException {
     VertexWritable[] outgoingEdges = adjacencyList.get(v);
     for (VertexWritable adjacent : outgoingEdges) {
@@ -206,10 +216,32 @@ public class PageRank extends
     }
   }
 
+  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+    LOG.info("-------------------- RESULTS --------------------");
+    FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
+    for (FileStatus status : stati) {
+      if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
+        Path path = status.getPath();
+        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+        Text key = new Text();
+        DoubleWritable value = new DoubleWritable();
+        int count = 0;
+        while (reader.next(key, value)) {
+          LOG.info(key.toString() + " | " + value.get());
+          count++;
+          if (count > 5)
+            break;
+        }
+        reader.close();
+      }
+    }
+  }
+
   public static void printUsage() {
     System.out.println("PageRank Example:");
     System.out
-        .println("<damping factor> <epsilon error> <optional: output path>
<optional: input path>");
+        .println("<input path> <output path> [damping factor] [epsilon error]
[tasks]");
+
   }
 
   public static void main(String[] args) throws IOException,
@@ -221,77 +253,34 @@ public class PageRank extends
     }
 
     HamaConfiguration conf = new HamaConfiguration(new Configuration());
-    BSPJob job = new BSPJob(conf);
-    job.setOutputPath(new Path("pagerank/output"));
+    BSPJob job = new BSPJob(conf, PageRank.class);
+    job.setJobName("Pagerank");
 
-    // set the defaults
-    conf.set("damping.factor", "0.85");
-    conf.set("epsilon.error", "0.000001");
-
-    boolean inputGiven = false;
-    if (args.length < 2) {
-      System.out.println("You have to provide a damping factor and an error!");
-      System.out.println("Try using 0.85 0.001 as parameter!");
-      System.exit(-1);
-    } else {
-      conf.set("damping.factor", args[0]);
-      conf.set("epsilon.error", args[1]);
-      LOG.info("Set damping factor to " + args[0]);
-      LOG.info("Set epsilon error to " + args[1]);
-      if (args.length > 2) {
-        LOG.info("Set output path to " + args[2]);
-        job.setOutputPath(new Path(args[2]));
-        if (args.length == 4) {
-          job.setInputPath(new Path(args[3]));
-          LOG.info("Using custom input at " + args[3]);
-          inputGiven = true;
-        }
-      }
+    job.setInputPath(new Path(args[0]));
+    job.setOutputPath(new Path(args[1]));
+    
+    conf.set("damping.factor", (args.length > 2) ? args[2] : "0.85");
+    conf.set("epsilon.error", (args.length > 3) ? args[3] : "0.000001");
+    if (args.length == 5) {
+      job.setNumBspTask(Integer.parseInt(args[4]));
     }
 
-    BSPJobClient jobClient = new BSPJobClient(conf);
-    ClusterStatus cluster = jobClient.getClusterStatus(true);
-
     // leave the iterations on default
     conf.set("max.iterations", "0");
 
-    if (!inputGiven) {
-      Path tmp = new Path("pagerank/input");
-      FileSystem.get(conf).delete(tmp, true);
-      // ShortestPathsGraphLoader.loadGraph(conf, tmp);
-      job.setInputPath(tmp);
-    }
-
     job.setInputFormat(SequenceFileInputFormat.class);
     job.setPartitioner(HashPartitioner.class);
     job.setOutputFormat(SequenceFileOutputFormat.class);
     job.setOutputKeyClass(Text.class);
-    job.setOutputValueClass(IntWritable.class);
-
-    job.setNumBspTask(cluster.getMaxTasks());
+    job.setOutputValueClass(DoubleWritable.class);
     job.setBspClass(PageRank.class);
-    job.setJarByClass(PageRank.class);
-    job.setJobName("Pagerank");
+
+    long startTime = System.currentTimeMillis();
     if (job.waitForCompletion(true)) {
       printOutput(FileSystem.get(conf), conf);
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
     }
   }
-
-  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
-    LOG.info("-------------------- RESULTS --------------------");
-    FileStatus[] stati = fs.listStatus(new Path(conf.get("bsp.output.dir")));
-    for (FileStatus status : stati) {
-      if (!status.isDir() && !status.getPath().getName().endsWith(".crc")) {
-        Path path = status.getPath();
-        SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-        Text key = new Text();
-        DoubleWritable value = new DoubleWritable();
-        while (reader.next(key, value)) {
-          LOG.info(key.toString() + " | " + value.get());
-        }
-        reader.close();
-      }
-    }
-  }
-
 }

Modified: incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
(original)
+++ incubator/hama/trunk/examples/src/main/java/org/apache/hama/examples/ShortestPaths.java
Fri Jan 13 01:06:05 2012
@@ -199,7 +199,7 @@ public class ShortestPaths extends
   }
 
   public static void printUsage() {
-    System.out.println("Usage: <startNode> <output path> <input path> [numTasks]");
+    System.out.println("Usage: <startNode> <input path> <output path> [tasks]");
   }
 
   public static void main(String[] args) throws IOException,
@@ -218,8 +218,8 @@ public class ShortestPaths extends
     bsp.setJobName("Single Source Shortest Path");
 
     conf.set(START_VERTEX, args[0]);
-    bsp.setOutputPath(new Path(args[1]));
-    bsp.setInputPath(new Path(args[2]));
+    bsp.setInputPath(new Path(args[1]));
+    bsp.setOutputPath(new Path(args[2]));
 
     if(args.length == 4) {
       bsp.setNumBspTask(Integer.parseInt(args[3]));

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java?rev=1230852&r1=1230851&r2=1230852&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
(original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/ShortestPathsTest.java
Fri Jan 13 01:06:05 2012
@@ -48,7 +48,7 @@ public class ShortestPathsTest extends T
 
     generateTestData();
     try {
-      ShortestPaths.main(new String[] { "Frankfurt", OUTPUT, INPUT });
+      ShortestPaths.main(new String[] { "Frankfurt", INPUT, OUTPUT });
 
       verifyResult();
     } finally {



Mime
View raw message