incubator-hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1137983 - in /incubator/hama/trunk: ./ src/examples/org/apache/hama/examples/ src/examples/org/apache/hama/examples/graph/ src/examples/org/apache/hama/examples/sssp/ src/java/org/apache/hama/bsp/
Date Tue, 21 Jun 2011 12:50:58 GMT
Author: edwardyoon
Date: Tue Jun 21 12:50:58 2011
New Revision: 1137983

URL: http://svn.apache.org/viewvc?rev=1137983&view=rev
Log:
Add PageRank example.

Added:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankVertex.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathVertex.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java
    incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
    incubator/hama/trunk/src/java/org/apache/hama/bsp/IntegerDoubleMessage.java
Removed:
    incubator/hama/trunk/src/examples/org/apache/hama/examples/sssp/
Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1137983&r1=1137982&r2=1137983&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Tue Jun 21 12:50:58 2011
@@ -4,6 +4,7 @@ Release 0.3 - Unreleased
 
   NEW FEATURES
 
+    HAMA-395: Add PageRank example (Thomas Jungblut via edwardyoon)
     HAMA-372: Add Web UI for Job monitoring (Thomas Jungblut via edwardyoon)
     HAMA-359: Add shortest path finding example (Thomas Jungblut via edwardyoon)
     HAMA-374: Add LocalBSPRunner (Thomas Jungblut via edwardyoon)

Modified: incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java?rev=1137983&r1=1137982&r2=1137983&view=diff
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java (original)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/ExampleDriver.java Tue Jun 21 12:50:58 2011
@@ -20,7 +20,8 @@
 package org.apache.hama.examples;
 
 import org.apache.hadoop.util.ProgramDriver;
-import org.apache.hama.examples.sssp.ShortestPaths;
+import org.apache.hama.examples.graph.PageRank;
+import org.apache.hama.examples.graph.ShortestPaths;
 
 public class ExampleDriver {
 
@@ -31,7 +32,8 @@ public class ExampleDriver {
       pgd.addClass("bench", RandBench.class, "Random Communication Benchmark");
       pgd.addClass("test", SerializePrinting.class, "Serialize Printing Test");
       pgd.addClass("sssp", ShortestPaths.class, "Single Source Shortest Path");
-
+      pgd.addClass("pagerank", PageRank.class, "PageRank");
+      
       pgd.driver(args);
     } catch (Throwable e) {
       e.printStackTrace();

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRank.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.DoubleMessage;
+import org.apache.zookeeper.KeeperException;
+
+public class PageRank extends PageRankBase {
+  public static final Log LOG = LogFactory.getLog(PageRank.class);
+
+  private Configuration conf;
+
+  private HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList;
+  private final HashMap<String, PageRankVertex> lookupMap = new HashMap<String, PageRankVertex>();
+  private final HashMap<PageRankVertex, Double> tentativePagerank = new HashMap<PageRankVertex, Double>();
+  // backup of the last pagerank to determine the error
+  private final HashMap<PageRankVertex, Double> lastTentativePagerank = new HashMap<PageRankVertex, Double>();
+  private String[] peerNames;
+
+  @Override
+  public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
+      InterruptedException {
+    String master = conf.get(MASTER_TASK);
+    // setup the datasets
+    adjacencyList = PageRankBase.mapAdjacencyList(getConf(), peer);
+    // init the pageranks to 1/n where n is the number of all vertices
+    for (PageRankVertex vertex : adjacencyList.keySet()) {
+      tentativePagerank.put(vertex, Double.valueOf(1.0 / numOfVertices));
+      lookupMap.put(vertex.getUrl(), vertex);
+    }
+    // while the error not converges against epsilon do the pagerank stuff
+    double error = 1.0;
+    int iteration = 0;
+    // if MAX_ITERATIONS are set to 0, ignore the iterations and just go
+    // with the error
+    while ((MAX_ITERATIONS > 0 && iteration < MAX_ITERATIONS)
+        || error >= EPSILON) {
+      peer.sync();
+
+      if (iteration >= 1) {
+        // copy the old pagerank to the backup
+        copyTentativePageRankToBackup();
+        // sum up all incoming messages for a vertex
+        HashMap<PageRankVertex, Double> sumMap = new HashMap<PageRankVertex, Double>();
+        DoubleMessage msg = null;
+        while ((msg = (DoubleMessage) peer.getCurrentMessage()) != null) {
+          PageRankVertex k = lookupMap.get(msg.getTag());
+          if (!sumMap.containsKey(k)) {
+            sumMap.put(k, msg.getData());
+          } else {
+            sumMap.put(k, msg.getData() + sumMap.get(k));
+          }
+        }
+        // pregel formula:
+        // ALPHA = 0.15 / NumVertices()
+        // P(i) = ALPHA + 0.85 * sum
+        for (Entry<PageRankVertex, Double> entry : sumMap.entrySet()) {
+          tentativePagerank.put(entry.getKey(), ALPHA
+              + (entry.getValue() * DAMPING_FACTOR));
+        }
+
+        // determine the error and send this to the master
+        double err = determineError();
+        error = broadcastError(peer, master, err);
+      }
+      // in every step send the tentative pagerank of a vertex to its
+      // adjacent vertices
+      for (PageRankVertex vertex : adjacencyList.keySet()) {
+        sendMessageToNeighbors(peer, vertex);
+      }
+
+      iteration++;
+    }
+
+    // Clears all queues entries.
+    peer.clear();
+    // finally save the chunk of pageranks
+    PageRankBase.savePageRankMap(peer, conf, lastTentativePagerank);
+    LOG.info("Finished with iteration " + iteration + "!");
+  }
+
+  private double broadcastError(BSPPeerProtocol peer, String master,
+      double error) throws IOException, KeeperException, InterruptedException {
+    peer.send(master, new DoubleMessage("", error));
+    peer.sync();
+    if (peer.getPeerName().equals(master)) {
+      double errorSum = 0.0;
+      int count = 0;
+      DoubleMessage message;
+      while ((message = (DoubleMessage) peer.getCurrentMessage()) != null) {
+        errorSum += message.getData();
+        count++;
+      }
+      double avgError = errorSum / (double) count;
+      // LOG.info("Average error: " + avgError);
+      for (String name : peer.getAllPeerNames()) {
+        peer.send(name, new DoubleMessage("", avgError));
+      }
+    }
+
+    peer.sync();
+    DoubleMessage message = (DoubleMessage) peer.getCurrentMessage();
+    return message.getData();
+  }
+
+  private double determineError() {
+    double error = 0.0;
+    for (Entry<PageRankVertex, Double> entry : tentativePagerank.entrySet()) {
+      error += Math.abs(lastTentativePagerank.get(entry.getKey())
+          - entry.getValue());
+    }
+    return error;
+  }
+
+  private void copyTentativePageRankToBackup() {
+    for (Entry<PageRankVertex, Double> entry : tentativePagerank.entrySet()) {
+      lastTentativePagerank.put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  private void sendMessageToNeighbors(BSPPeerProtocol peer, PageRankVertex v)
+      throws IOException {
+    List<PageRankVertex> outgoingEdges = adjacencyList.get(v);
+    for (PageRankVertex adjacent : outgoingEdges) {
+      int mod = Math.abs(adjacent.getId() % peerNames.length);
+      // send a message of the tentative pagerank devided by the size of
+      // the outgoing edges to all adjacents
+      peer.send(peerNames[mod], new DoubleMessage(adjacent.getUrl(),
+          tentativePagerank.get(v) / outgoingEdges.size()));
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+    numOfVertices = Integer.parseInt(conf.get("num.vertices"));
+    ALPHA = 0.15 / (double) numOfVertices;
+    DAMPING_FACTOR = Double.parseDouble(conf.get("damping.factor"));
+    EPSILON = Double.parseDouble(conf.get("epsilon.error"));
+    MAX_ITERATIONS = Integer.parseInt(conf.get("max.iterations"));
+    peerNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public static void printUsage() {
+    LOG.info("PageRank Example:");
+    LOG
+        .info("<damping factor> <epsilon error> <optional: output path> <optional: input path>");
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+    printUsage();
+
+    HamaConfiguration conf = new HamaConfiguration(new Configuration());
+    // set the defaults
+    conf.set("damping.factor", "0.85");
+    conf.set("epsilon.error", "0.000001");
+
+    if (args.length < 2) {
+      LOG.info("You have to provide a damping factor and an error!");
+      LOG.info("Try using 0.85 0.001 as parameter!");
+      System.exit(-1);
+    } else {
+      conf.set("damping.factor", args[0]);
+      conf.set("epsilon.error", args[1]);
+      LOG.info("Set damping factor to " + args[0]);
+      LOG.info("Set epsilon error to " + args[1]);
+      if (args.length > 2) {
+        conf.set("out.path", args[2]);
+        LOG.info("Set output path to " + args[2]);
+        if (args.length == 4) {
+          conf.set("in.path", args[3]);
+          LOG.info("Using custom input at " + args[3]);
+        } else {
+          LOG.info("Running default example graph!");
+        }
+      } else {
+        conf.set("out.path", "pagerank/output");
+        LOG.info("Set output path to default of pagerank/output!");
+      }
+    }
+
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(true);
+    StringBuilder sb = new StringBuilder();
+    for (String peerName : cluster.getActiveGroomNames().values()) {
+      conf.set(MASTER_TASK, peerName);
+      sb.append(peerName + ";");
+    }
+
+    // put every peer into the configuration
+    conf.set(ShortestPaths.BSP_PEERS, sb.toString());
+    // leave the iterations on default
+    conf.set("max.iterations", "0");
+
+    if (conf.get("in.path") == null) {
+      conf = PageRankBase
+          .partitionExample(new Path(conf.get("out.path")), conf);
+    } else {
+      conf = PageRankBase
+          .partitionTextFile(new Path(conf.get("in.path")), conf);
+    }
+
+    BSPJob job = new BSPJob(conf);
+    job.setNumBspTask(cluster.getGroomServers());
+    job.setBspClass(PageRank.class);
+    job.setJarByClass(PageRank.class);
+    job.setJobName("Pagerank");
+    if (job.waitForCompletion(true)) {
+      PageRankBase.printOutput(FileSystem.get(conf), conf);
+    }
+  }
+}

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankBase.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeerProtocol;
+
+public abstract class PageRankBase extends BSP {
+  public static final Log LOG = LogFactory.getLog(PageRankBase.class);
+
+  protected static int MAX_ITERATIONS = 30;
+  protected static final String MASTER_TASK = "master.groom";
+  protected static double ALPHA;
+  protected static int numOfVertices;
+  protected static double DAMPING_FACTOR = 0.85;
+  protected static double EPSILON = 0.001;
+
+  static HashMap<PageRankVertex, List<PageRankVertex>> mapAdjacencyList(
+      Configuration conf, BSPPeerProtocol peer) throws FileNotFoundException,
+      IOException {
+    FileSystem fs = FileSystem.get(conf);
+    HashMap<PageRankVertex, List<PageRankVertex>> adjacencyList = new HashMap<PageRankVertex, List<PageRankVertex>>();
+    Path p = new Path(conf.get("in.path." + peer.getPeerName().split(":")[0]));
+    LOG.info(p.toString());
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    Text key = new Text();
+    Text value = new Text();
+    while (reader.next(key, value)) {
+      PageRankVertex k = new PageRankVertex(key.toString());
+      PageRankVertex v = new PageRankVertex(value.toString());
+      if (!adjacencyList.containsKey(k)) {
+        adjacencyList.put(k, new LinkedList<PageRankVertex>());
+        adjacencyList.get(k).add(v);
+      } else {
+        adjacencyList.get(k).add(v);
+      }
+    }
+    reader.close();
+    return adjacencyList;
+  }
+
+  static HamaConfiguration partitionExample(Path out, HamaConfiguration conf)
+      throws IOException {
+
+    String[] groomNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+    int sizeOfCluster = groomNames.length;
+
+    // setup the paths where the grooms can find their input
+    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
+    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+        sizeOfCluster);
+    FileSystem fs = FileSystem.get(conf);
+    for (String entry : groomNames) {
+      partPaths.add(new Path(out.getParent().toString() + Path.SEPARATOR
+          + ShortestPaths.PARTED + Path.SEPARATOR
+          + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]));
+    }
+    // create a seq writer for that
+    for (Path p : partPaths) {
+      // LOG.info(p.toString());
+      fs.delete(p, true);
+      writers.add(SequenceFile
+          .createWriter(fs, conf, p, Text.class, Text.class));
+    }
+
+    /**
+     * 1:twitter.com <br/>
+     * 2:google.com <br/>
+     * 3:facebook.com <br/>
+     * 4:yahoo.com <br/>
+     * 5:nasa.gov <br/>
+     * 6:stackoverflow.com <br/>
+     * 7:youtube.com
+     */
+
+    String[] realNames = new String[] { null, "twitter.com", "google.com",
+        "facebook.com", "yahoo.com", "nasa.gov", "stackoverflow.com",
+        "youtube.com" };
+
+    String[] lineArray = new String[] { "1;2;3", "2", "3;1;2;5", "4;5;6",
+        "5;4;6", "6;4", "7;2;4" };
+
+    int numLines = 0;
+    for (String line : lineArray) {
+      String[] arr = line.split(";");
+      String vId = arr[0];
+      int indexKey = Integer.valueOf(vId);
+      LinkedList<String> list = new LinkedList<String>();
+      for (int i = 0; i < arr.length; i++) {
+        int index = Integer.valueOf(arr[i]);
+        list.add(realNames[index]);
+      }
+
+      int mod = Math.abs(realNames[indexKey].hashCode() % sizeOfCluster);
+      for (String value : list) {
+        writers.get(mod).append(new Text(realNames[indexKey]), new Text(value));
+      }
+      numLines++;
+    }
+
+    for (SequenceFile.Writer w : writers)
+      w.close();
+
+    for (Path p : partPaths) {
+      conf.set("in.path." + p.getName(), p.toString());
+    }
+    conf.set("num.vertices", "" + numLines);
+
+    return conf;
+  }
+
+  static HamaConfiguration partitionTextFile(Path in, HamaConfiguration conf)
+      throws IOException {
+
+    String[] groomNames = conf.get(ShortestPaths.BSP_PEERS).split(";");
+    int sizeOfCluster = groomNames.length;
+
+    // setup the paths where the grooms can find their input
+    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
+    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+        sizeOfCluster);
+    FileSystem fs = FileSystem.get(conf);
+    for (String entry : groomNames) {
+      partPaths.add(new Path(in.getParent().toString() + Path.SEPARATOR
+          + ShortestPaths.PARTED + Path.SEPARATOR
+          + entry.split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]));
+    }
+    // create a seq writer for that
+    for (Path p : partPaths) {
+      // LOG.info(p.toString());
+      fs.delete(p, true);
+      writers.add(SequenceFile
+          .createWriter(fs, conf, p, Text.class, Text.class));
+    }
+
+    BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(in)));
+
+    String line = null;
+    long numLines = 0;
+    while ((line = br.readLine()) != null) {
+      String[] arr = line.split("\t");
+      String vId = arr[0];
+      LinkedList<String> list = new LinkedList<String>();
+      for (int i = 0; i < arr.length; i++) {
+        list.add(arr[i]);
+      }
+
+      int mod = Math.abs(vId.hashCode() % sizeOfCluster);
+      for (String value : list) {
+        writers.get(mod).append(new Text(vId), new Text(value));
+      }
+      numLines++;
+    }
+
+    for (SequenceFile.Writer w : writers)
+      w.close();
+
+    for (Path p : partPaths) {
+      conf.set("in.path." + p.getName(), p.toString());
+    }
+    conf.set("num.vertices", "" + numLines);
+
+    return conf;
+  }
+
+  static void savePageRankMap(BSPPeerProtocol peer, Configuration conf,
+      Map<PageRankVertex, Double> tentativePagerank) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path outPath = new Path(conf.get("out.path") + Path.SEPARATOR + "temp"
+        + Path.SEPARATOR
+        + peer.getPeerName().split(ShortestPaths.NAME_VALUE_SEPARATOR)[0]);
+    fs.delete(outPath, true);
+    final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf,
+        outPath, Text.class, DoubleWritable.class);
+    for (Entry<PageRankVertex, Double> row : tentativePagerank.entrySet()) {
+      out.append(new Text(row.getKey().getUrl()), new DoubleWritable(row
+          .getValue()));
+    }
+    LOG.info("Closing...");
+    out.close();
+    LOG.info("Closed!");
+  }
+
+  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+    LOG.info("-------------------- RESULTS --------------------");
+    FileStatus[] stati = fs.listStatus(new Path(conf.get("out.path")
+        + Path.SEPARATOR + "temp"));
+    for (FileStatus status : stati) {
+      Path path = status.getPath();
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      Text key = new Text();
+      DoubleWritable value = new DoubleWritable();
+      while (reader.next(key, value)) {
+        LOG.info(key.toString() + " | " + value.get());
+      }
+      reader.close();
+    }
+  }
+
+}

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankVertex.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankVertex.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/PageRankVertex.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+public class PageRankVertex {
+  
+  private final int id;
+  private final String url;
+
+  public PageRankVertex(String url) {
+    super();
+    this.id = url.hashCode();
+    this.url = url;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + id;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    PageRankVertex other = (PageRankVertex) obj;
+    if (id != other.id)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return url;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public String getUrl() {
+    return url;
+  }
+
+}

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathVertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathVertex.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathVertex.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathVertex.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+public final class ShortestPathVertex {
+
+  private final int id;
+  private final String name;
+  private final int weight;
+  private Integer cost;
+
+  public ShortestPathVertex(int weight, String name) {
+    super();
+    this.id = name.hashCode();
+    this.weight = weight;
+    this.name = name;
+  }
+
+  public ShortestPathVertex(int weight, String name, Integer cost) {
+    super();
+    this.id = name.hashCode();
+    this.weight = weight;
+    this.name = name;
+    this.cost = cost;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public Integer getCost() {
+    return cost;
+  }
+
+  public void setCost(Integer cost) {
+    this.cost = cost;
+  }
+
+  public int getId() {
+    return id;
+  }
+
+  public int getWeight() {
+    return weight;
+  }
+
+  @Override
+  public String toString() {
+    return name;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((name == null) ? 0 : name.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    ShortestPathVertex other = (ShortestPathVertex) obj;
+    if (name == null) {
+      if (other.name != null)
+        return false;
+    } else if (!name.equals(other.name))
+      return false;
+    return true;
+  }
+
+}

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPaths.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import java.io.IOException;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSPJob;
+import org.apache.hama.bsp.BSPJobClient;
+import org.apache.hama.bsp.BSPPeerProtocol;
+import org.apache.hama.bsp.BooleanMessage;
+import org.apache.hama.bsp.ClusterStatus;
+import org.apache.hama.bsp.IntegerMessage;
+import org.apache.hama.examples.RandBench;
+import org.apache.zookeeper.KeeperException;
+
+public class ShortestPaths extends ShortestPathsBase {
+  public static final Log LOG = LogFactory.getLog(ShortestPaths.class);
+
+  private Configuration conf;
+  private Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
+  private Map<String, ShortestPathVertex> vertexLookupMap = new HashMap<String, ShortestPathVertex>();
+  private String[] peerNames;
+
+  @Override
+  public void bsp(BSPPeerProtocol peer) throws IOException, KeeperException,
+      InterruptedException {
+    LOG.info("Mapping graph into ram...");
+    // map our input into ram
+    mapAdjacencyList(conf, peer, adjacencyList, vertexLookupMap);
+    LOG.info("Finished! Starting graph initialization...");
+    // parse the configuration to get the peerNames
+    parsePeerNames(conf);
+    // get our master groom
+    String master = conf.get(MASTER_TASK);
+
+    // initial message bypass
+    ShortestPathVertex v = vertexLookupMap.get(conf
+        .get(SHORTEST_PATHS_START_VERTEX_ID));
+    if (v != null) {
+      v.setCost(0);
+      sendMessageToNeighbors(peer, v);
+    }
+
+    LOG.info("Finished! Starting main loop...");
+    boolean updated = true;
+    while (updated) {
+      int updatesMade = 0;
+      peer.sync();
+
+      IntegerMessage msg = null;
+      Deque<ShortestPathVertex> updatedQueue = new LinkedList<ShortestPathVertex>();
+      while ((msg = (IntegerMessage) peer.getCurrentMessage()) != null) {
+        ShortestPathVertex vertex = vertexLookupMap.get(msg.getTag());
+        // check if we need an distance update
+        if (vertex.getCost() > msg.getData()) {
+          updatesMade++;
+          updatedQueue.add(vertex);
+          vertex.setCost(msg.getData());
+        }
+      }
+      // synchonize with all grooms if there were updates
+      updated = broadcastUpdatesMade(peer, master, updatesMade);
+      // send updates to the adjacents of the updated vertices
+      for (ShortestPathVertex vertex : updatedQueue) {
+        sendMessageToNeighbors(peer, vertex);
+      }
+    }
+    LOG.info("Finished!");
+    // finished, finally save our map to DFS.
+    saveVertexMap(conf, peer, adjacencyList);
+  }
+
+  /**
+   * Parses the peer names to fix inconsistency in bsp peer names from context.
+   * 
+   * @param conf
+   */
+  private void parsePeerNames(Configuration conf) {
+    peerNames = conf.get(BSP_PEERS).split(";");
+  }
+
+  /**
+   * This method broadcasts to a master groom how many updates were made. He
+   * simply sums them up and sends a message back to the grooms if sum is
+   * greater than zero.
+   * 
+   * @param peer The peer we got through the BSP method.
+   * @param master The assigned master groom name.
+   * @param updates How many updates were made?
+   * @return True if we need another iteration, False if no updates can be made
+   *         anymore.
+   * @throws IOException
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private boolean broadcastUpdatesMade(BSPPeerProtocol peer, String master,
+      int updates) throws IOException, KeeperException, InterruptedException {
+    peer.send(master, new IntegerMessage(peer.getPeerName(), updates));
+    peer.sync();
+    if (peer.getPeerName().equals(master)) {
+      int count = 0;
+      IntegerMessage message;
+      while ((message = (IntegerMessage) peer.getCurrentMessage()) != null) {
+        count += message.getData();
+        LOG.info("Received " + message.getData() + " updates from "
+            + message.getTag() + " in SuperStep " + peer.getSuperstepCount());
+      }
+
+      for (String name : peer.getAllPeerNames()) {
+        peer.send(name, new BooleanMessage("", count > 0 ? true : false));
+      }
+    }
+
+    peer.sync();
+    BooleanMessage message = (BooleanMessage) peer.getCurrentMessage();
+    return message.getData();
+  }
+
+  /**
+   * This method takes advantage of our partitioning: it uses the vertexID
+   * (simply hash of the name) to determine the host where the message belongs
+   * to. <br/>
+   * It sends the current cost to the adjacent vertex + the edge weight. If cost
+   * will be infinity we just going to send infinity, because summing the weight
+   * will cause an integer overflow resulting in negative weights.
+   * 
+   * @param peer The peer we got through the BSP method.
+   * @param id The vertex to all adjacent vertices the new cost has to be send.
+   * @throws IOException
+   */
+  private void sendMessageToNeighbors(BSPPeerProtocol peer,
+      ShortestPathVertex id) throws IOException {
+    List<ShortestPathVertex> outgoingEdges = adjacencyList.get(id);
+    for (ShortestPathVertex adjacent : outgoingEdges) {
+      int mod = Math.abs((adjacent.getId() % peer.getAllPeerNames().length));
+      peer.send(peerNames[mod], new IntegerMessage(adjacent.getName(), id
+          .getCost() == Integer.MAX_VALUE ? id.getCost() : id.getCost()
+          + adjacent.getWeight()));
+    }
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public static void printUsage() {
+    System.out.println("Single Source Shortest Path Example:");
+    System.out
+        .println("<Startvertex name> <optional: output path> <optional: path to own adjacency list sequencefile>");
+  }
+
+  public static void main(String[] args) throws IOException,
+      InterruptedException, ClassNotFoundException {
+
+    printUsage();
+
+    // BSP job configuration
+    HamaConfiguration conf = new HamaConfiguration();
+    conf.set(SHORTEST_PATHS_START_VERTEX_ID, "Frankfurt");
+    System.out.println("Setting default start vertex to \"Frankfurt\"!");
+    conf.set(OUT_PATH, "sssp/output");
+    boolean skipPartitioning = false;
+    Path adjacencyListPath = null;
+
+    if (args.length > 0) {
+      conf.set(SHORTEST_PATHS_START_VERTEX_ID, args[0]);
+      System.out.println("Setting start vertex to " + args[0] + "!");
+
+      if (args.length > 1) {
+        conf.set(OUT_PATH, args[1]);
+        System.out.println("Using new output folder: " + args[1]);
+      }
+
+      if (args.length > 2) {
+        adjacencyListPath = new Path(args[2]);
+      }
+
+      if (args.length > 3) {
+        skipPartitioning = Boolean.valueOf(args[3]);
+      }
+
+    }
+
+    Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = null;
+    if (adjacencyListPath == null)
+      adjacencyList = ShortestPathsGraphLoader.loadGraph();
+
+    BSPJob bsp = new BSPJob(conf, RandBench.class);
+    // Set the job name
+    bsp.setJobName("Single Source Shortest Path");
+    bsp.setBspClass(ShortestPaths.class);
+
+    // Set the task size as a number of GroomServer
+    BSPJobClient jobClient = new BSPJobClient(conf);
+    ClusterStatus cluster = jobClient.getClusterStatus(true);
+    StringBuilder sb = new StringBuilder();
+    for (String peerName : cluster.getActiveGroomNames().values()) {
+      conf.set(MASTER_TASK, peerName);
+      sb.append(peerName);
+      sb.append(";");
+    }
+    LOG.info("Master is: " + conf.get(MASTER_TASK));
+    conf.set(BSP_PEERS, sb.toString());
+    LOG.info("Starting data partitioning...");
+    if (adjacencyList == null)
+      conf = partition(adjacencyListPath, conf, skipPartitioning);
+    else
+      conf = partition(adjacencyList, conf);
+    LOG.info("Finished!");
+
+    bsp.setNumBspTask(cluster.getGroomServers());
+
+    long startTime = System.currentTimeMillis();
+    if (bsp.waitForCompletion(true)) {
+      System.out.println("Job Finished in "
+          + (double) (System.currentTimeMillis() - startTime) / 1000.0
+          + " seconds");
+      printOutput(FileSystem.get(conf), conf);
+    }
+  }
+
+}

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsBase.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeerProtocol;
+
+public abstract class ShortestPathsBase extends BSP {
+  
+  public static final String BSP_PEERS = "bsp.peers";
+  public static final String SHORTEST_PATHS_START_VERTEX_ID = "shortest.paths.start.vertex.id";
+  public static final String PARTED = "parted";
+  public static final String IN_PATH = "in.path.";
+  public static final String OUT_PATH = "out.path";
+  public static final String NAME_VALUE_SEPARATOR = ":";
+  public static final String MASTER_TASK = "master.groom";
+  
+  /**
+   * When finished we just writing a sequencefile of the vertex name and the
+   * cost.
+   * 
+   * @param peer The peer we got through the BSP method.
+   * @param adjacencyList
+   * @throws IOException
+   */
+  static void saveVertexMap(Configuration conf, BSPPeerProtocol peer,
+      Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList)
+      throws IOException {
+    Path outPath = new Path(conf.get(OUT_PATH) + Path.SEPARATOR
+        + peer.getPeerName().split(":")[0]);
+    FileSystem fs = FileSystem.get(conf);
+    fs.delete(outPath, true);
+    final SequenceFile.Writer out = SequenceFile.createWriter(fs, conf,
+        outPath, Text.class, IntWritable.class);
+    for (ShortestPathVertex vertex : adjacencyList.keySet()) {
+      out.append(new Text(vertex.getName()), new IntWritable(vertex.getCost()));
+    }
+    out.close();
+  }
+
+  /**
+   * Just a reader of the vertexMap in DFS. Output going to STDOUT.
+   * 
+   * @param fs
+   * @param conf
+   * @throws IOException
+   */
+  static void printOutput(FileSystem fs, Configuration conf) throws IOException {
+    System.out.println("-------------------- RESULTS --------------------");
+    FileStatus[] stati = fs.listStatus(new Path(conf.get(OUT_PATH)));
+    for (FileStatus status : stati) {
+      Path path = status.getPath();
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
+      Text key = new Text();
+      IntWritable value = new IntWritable();
+      while (reader.next(key, value)) {
+        System.out.println(key.toString() + " | " + value.get());
+      }
+      reader.close();
+    }
+  }
+
+  /**
+   * 
+   * The adjacencylist contains two text fields on each line. The key component
+   * is the name of a vertex, the value is a ":" separated Text field that
+   * contains the name of the adjacent vertex leftmost and the weight on the
+   * rightmost side.
+   * 
+   * <PRE>
+   *    K               V <br/> 
+   * Vertex[Text]    AdjacentVertex : Weight [Text]
+   * </PRE>
+   * 
+   * @param adjacencyList
+   * @param vertexLookupMap
+   */
+  static void mapAdjacencyList(Configuration conf, BSPPeerProtocol peer,
+      Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
+      Map<String, ShortestPathVertex> vertexLookupMap)
+      throws FileNotFoundException, IOException {
+    FileSystem fs = FileSystem.get(conf);
+    Path p = new Path(conf.get(IN_PATH
+        + peer.getPeerName().split(NAME_VALUE_SEPARATOR)[0]));
+    SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
+    Text key = new Text(); // name of the vertex
+    Text value = new Text(); // name of the adjacent vertex : weight
+    while (reader.next(key, value)) {
+      // a key vertex has weight 0 to itself
+      ShortestPathVertex keyVertex = new ShortestPathVertex(0, key.toString(),
+          Integer.MAX_VALUE);
+      String[] nameWeight = value.toString().split(NAME_VALUE_SEPARATOR);
+      if (!adjacencyList.containsKey(keyVertex)) {
+        LinkedList<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(Integer.valueOf(nameWeight[1]),
+            nameWeight[0], Integer.MAX_VALUE));
+        adjacencyList.put(keyVertex, list);
+        vertexLookupMap.put(keyVertex.getName(), keyVertex);
+      } else {
+        adjacencyList.get(keyVertex).add(
+            new ShortestPathVertex(Integer.valueOf(nameWeight[1]),
+                nameWeight[0], Integer.MAX_VALUE));
+      }
+    }
+    reader.close();
+  }
+
+  /**
+   * Partitioning for in memory adjacency lists.
+   * 
+   * @param adjacencyList
+   * @param status
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  static HamaConfiguration partition(
+      Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList,
+      HamaConfiguration conf) throws IOException {
+
+    String[] groomNames = conf.get(BSP_PEERS).split(";");
+
+    int sizeOfCluster = groomNames.length;
+
+    // setup the paths where the grooms can find their input
+    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
+    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+        sizeOfCluster);
+    FileSystem fs = FileSystem.get(conf);
+    Path fileToPartition = new Path(conf.get(OUT_PATH));
+    for (String entry : groomNames) {
+      partPaths.add(new Path(fileToPartition.getParent().toString()
+          + Path.SEPARATOR + PARTED + Path.SEPARATOR
+          + entry.split(NAME_VALUE_SEPARATOR)[0]));
+    }
+    // create a seq writer for that
+    for (Path p : partPaths) {
+      // System.out.println(p.toString());
+      fs.delete(p, true);
+      writers.add(SequenceFile
+          .createWriter(fs, conf, p, Text.class, Text.class));
+    }
+
+    for (Entry<ShortestPathVertex, List<ShortestPathVertex>> entry : adjacencyList
+        .entrySet()) {
+      // a key vertex has weight 0 to itself
+      ShortestPathVertex keyVertex = entry.getKey();
+      // just mod the id
+      int mod = Math.abs(keyVertex.getId() % sizeOfCluster);
+      // append it to the right sequenceFile
+      for (ShortestPathVertex value : entry.getValue())
+        writers.get(mod)
+            .append(
+                new Text(keyVertex.getName()),
+                new Text(value.getName() + NAME_VALUE_SEPARATOR
+                    + value.getWeight()));
+    }
+
+    for (SequenceFile.Writer w : writers)
+      w.close();
+
+    for (Path p : partPaths) {
+      conf.set(IN_PATH + p.getName(), p.toString());
+    }
+    return conf;
+  }
+
+  /**
+   * Partitioning for sequencefile partitioned adjacency lists.
+   * 
+   * The adjacencylist contains two text fields on each line. The key component
+   * is the name of a vertex, the value is a ":" separated Text field that
+   * contains the name of the adjacent vertex leftmost and the weight on the
+   * rightmost side.
+   * 
+   * <PRE>
+   *    K               V <br/> 
+   * Vertex[Text]    AdjacentVertex : Weight [Text]
+   * </PRE>
+   * 
+   * @param fileToPartition
+   * @param status
+   * @param conf
+   * @return
+   * @throws IOException
+   */
+  static HamaConfiguration partition(Path fileToPartition,
+      HamaConfiguration conf, boolean skipPartitioning) throws IOException {
+
+    String[] groomNames = conf.get(BSP_PEERS).split(";");
+    int sizeOfCluster = groomNames.length;
+
+    // setup the paths where the grooms can find their input
+    List<Path> partPaths = new ArrayList<Path>(sizeOfCluster);
+    List<SequenceFile.Writer> writers = new ArrayList<SequenceFile.Writer>(
+        sizeOfCluster);
+    FileSystem fs = FileSystem.get(conf);
+    for (String entry : groomNames) {
+      partPaths.add(new Path(fileToPartition.getParent().toString()
+          + Path.SEPARATOR + PARTED + Path.SEPARATOR
+          + entry.split(NAME_VALUE_SEPARATOR)[0]));
+    }
+
+    if (!skipPartitioning) {
+
+      // create a seq writer for that
+      for (Path p : partPaths) {
+        // System.out.println(p.toString());
+        fs.delete(p, true);
+        writers.add(SequenceFile.createWriter(fs, conf, p, Text.class,
+            Text.class));
+      }
+
+      // parse our file
+      if (!fs.exists(fileToPartition))
+        throw new FileNotFoundException("File " + fileToPartition
+            + " wasn't found!");
+
+      SequenceFile.Reader reader = new SequenceFile.Reader(fs, fileToPartition,
+          conf);
+      Text key = new Text(); // name of the vertex
+      Text value = new Text(); // name of the adjacent vertex : weight
+
+      while (reader.next(key, value)) {
+        // a key vertex has weight 0 to itself
+        ShortestPathVertex keyVertex = new ShortestPathVertex(0, key.toString());
+        // just mod the id
+        int mod = Math.abs(keyVertex.getId() % sizeOfCluster);
+        // append it to the right sequenceFile
+        writers.get(mod).append(new Text(keyVertex.getName()), new Text(value));
+      }
+
+      reader.close();
+
+      for (SequenceFile.Writer w : writers)
+        w.close();
+    }
+
+    for (Path p : partPaths) {
+      conf.set(IN_PATH + p.getName(), p.toString());
+    }
+
+    return conf;
+  }
+
+}

Added: incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java (added)
+++ incubator/hama/trunk/src/examples/org/apache/hama/examples/graph/ShortestPathsGraphLoader.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.examples.graph;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+public class ShortestPathsGraphLoader {
+  
+  static Map<ShortestPathVertex, List<ShortestPathVertex>> loadGraph() {
+
+    Map<ShortestPathVertex, List<ShortestPathVertex>> adjacencyList = new HashMap<ShortestPathVertex, List<ShortestPathVertex>>();
+    String[] cities = new String[] { "Frankfurt", "Mannheim", "Wuerzburg",
+        "Stuttgart", "Kassel", "Karlsruhe", "Erfurt", "Nuernberg", "Augsburg",
+        "Muenchen" };
+
+    int id = 1;
+    for (String city : cities) {
+      if (city.equals("Frankfurt")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(85, "Mannheim"));
+        list.add(new ShortestPathVertex(173, "Kassel"));
+        list.add(new ShortestPathVertex(217, "Wuerzburg"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Stuttgart")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(183, "Nuernberg"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Kassel")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(502, "Muenchen"));
+        list.add(new ShortestPathVertex(173, "Frankfurt"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Erfurt")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(186, "Wuerzburg"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Wuerzburg")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(217, "Frankfurt"));
+        list.add(new ShortestPathVertex(168, "Erfurt"));
+        list.add(new ShortestPathVertex(103, "Nuernberg"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Mannheim")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(80, "Karlsruhe"));
+        list.add(new ShortestPathVertex(85, "Frankfurt"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Karlsruhe")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(250, "Augsburg"));
+        list.add(new ShortestPathVertex(80, "Mannheim"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Augsburg")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(250, "Karlsruhe"));
+        list.add(new ShortestPathVertex(84, "Muenchen"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Nuernberg")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(183, "Stuttgart"));
+        list.add(new ShortestPathVertex(167, "Muenchen"));
+        list.add(new ShortestPathVertex(103, "Wuerzburg"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      } else if (city.equals("Muenchen")) {
+        List<ShortestPathVertex> list = new LinkedList<ShortestPathVertex>();
+        list.add(new ShortestPathVertex(167, "Nuernberg"));
+        list.add(new ShortestPathVertex(173, "Kassel"));
+        list.add(new ShortestPathVertex(84, "Augsburg"));
+        adjacencyList.put(new ShortestPathVertex(0, city), list);
+      }
+      id++;
+    }
+    return adjacencyList;
+  }
+
+}

Added: incubator/hama/trunk/src/java/org/apache/hama/bsp/IntegerDoubleMessage.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/src/java/org/apache/hama/bsp/IntegerDoubleMessage.java?rev=1137983&view=auto
==============================================================================
--- incubator/hama/trunk/src/java/org/apache/hama/bsp/IntegerDoubleMessage.java (added)
+++ incubator/hama/trunk/src/java/org/apache/hama/bsp/IntegerDoubleMessage.java Tue Jun 21 12:50:58 2011
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class IntegerDoubleMessage extends BSPMessage {
+
+  int tag;
+  double data;
+
+  public IntegerDoubleMessage() {
+    super();
+  }
+
+  public IntegerDoubleMessage(int tag, double data) {
+    super();
+    this.tag = tag;
+    this.data = data;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(tag);
+    out.writeDouble(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    tag = in.readInt();
+    data = in.readDouble();
+  }
+
+  @Override
+  public Integer getTag() {
+    return tag;
+  }
+
+  @Override
+  public Double getData() {
+    return data;
+  }
+
+}



Mime
View raw message