hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1339586 - in /incubator/hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/main/java/org/apache/hama/util/ examples/src/test/java/org/apache/hama/examples/ graph/src/main/java/org/apache/hama/graph/
Date Thu, 17 May 2012 13:06:32 GMT
Author: tjungblut
Date: Thu May 17 13:06:31 2012
New Revision: 1339586

URL: http://svn.apache.org/viewvc?rev=1339586&view=rev
Log:
[HAMA-571]: Provide graph repair function in GraphJobRunner

Modified:
    incubator/hama/trunk/CHANGES.txt
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
    incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java

Modified: incubator/hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/CHANGES.txt?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/CHANGES.txt (original)
+++ incubator/hama/trunk/CHANGES.txt Thu May 17 13:06:31 2012
@@ -16,7 +16,8 @@ Release 0.5 - April 10, 2012 
   BUG FIXES
 
   IMPROVEMENTS
-
+    
+    HAMA-571: Provide graph repair function in GraphJobRunner (tjungblut)
     HAMA-521: Improve message buffering to save memory (Thomas Jungblut via edwardyoon)
     HAMA-494: Remove hard-coded webapp path in HttpServer (edwardyoon)
     HAMA-562: Record Reader/Writer objects should be initialized (edwardyoon)

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu May 17 13:06:31
2012
@@ -117,7 +117,8 @@ public interface BSPPeer<K1, V1, K2, V2,
   public boolean readNext(K1 key, V1 value) throws IOException;
 
   /**
-   * Reads the next key value pair and returns it as a pair.
+   * Reads the next key value pair and returns it as a pair. It may reuse a
+   * {@link KeyValuePair} instance to save garbage collection time.
    * 
    * @return null if there are no records left.
    * @throws IOException
@@ -134,7 +135,7 @@ public interface BSPPeer<K1, V1, K2, V2,
    * @return the jobs configuration
    */
   public Configuration getConfiguration();
-  
+
   /**
    * Get the {@link Counter} of the given group with the given name.
    * 
@@ -151,26 +152,26 @@ public interface BSPPeer<K1, V1, K2, V2,
    * @return the <code>Counter</code> of the given group/name.
    */
   public Counter getCounter(String group, String name);
-  
+
   /**
-   * Increments the counter identified by the key, which can be of
-   * any {@link Enum} type, by the specified amount.
+   * Increments the counter identified by the key, which can be of any
+   * {@link Enum} type, by the specified amount.
    * 
-   * @param key key to identify the counter to be incremented. The key can be
-   *            be any <code>Enum</code>. 
-   * @param amount A non-negative amount by which the counter is to 
-   *               be incremented.
+   * @param key key to identify the counter to be incremented. The key can be be
+   *          any <code>Enum</code>.
+   * @param amount A non-negative amount by which the counter is to be
+   *          incremented.
    */
   public void incrementCounter(Enum<?> key, long amount);
-  
+
   /**
-   * Increments the counter identified by the group and counter name
-   * by the specified amount.
+   * Increments the counter identified by the group and counter name by the
+   * specified amount.
    * 
    * @param group name to identify the group of the counter to be incremented.
    * @param counter name to identify the counter within the group.
-   * @param amount A non-negative amount by which the counter is to 
-   *               be incremented.
+   * @param amount A non-negative amount by which the counter is to be
+   *          incremented.
    */
   public void incrementCounter(String group, String counter, long amount);
 }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu May 17
13:06:31 2012
@@ -84,6 +84,7 @@ public final class BSPPeerImpl<K1, V1, K
   private OutputCollector<K2, V2> collector;
   private RecordReader<K1, V1> in;
   private RecordWriter<K2, V2> outWriter;
+  private final KeyValuePair<K1, V1> cachedPair = new KeyValuePair<K1, V1>();
 
   private InetSocketAddress peerAddress;
 
@@ -480,7 +481,10 @@ public final class BSPPeerImpl<K1, V1, K
     K1 k = in.createKey();
     V1 v = in.createValue();
     if (in.next(k, v)) {
-      return new KeyValuePair<K1, V1>(k, v);
+      cachedPair.clear();
+      cachedPair.setKey(k);
+      cachedPair.setValue(v);
+      return cachedPair;
     } else {
       return null;
     }

Modified: incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java (original)
+++ incubator/hama/trunk/core/src/main/java/org/apache/hama/util/KeyValuePair.java Thu May
17 13:06:31 2012
@@ -18,15 +18,16 @@
 package org.apache.hama.util;
 
 /**
- * Immutable class for key values.
- * 
- * @param <K>
- * @param <V>
+ * Mutable class for key values.
  */
 public class KeyValuePair<K, V> {
 
-  private final K key;
-  private final V value;
+  private K key;
+  private V value;
+
+  public KeyValuePair() {
+
+  }
 
   public KeyValuePair(K key, V value) {
     super();
@@ -42,4 +43,17 @@ public class KeyValuePair<K, V> {
     return value;
   }
 
+  public void setKey(K key) {
+    this.key = key;
+  }
+
+  public void setValue(V value) {
+    this.value = value;
+  }
+
+  public void clear() {
+    this.key = null;
+    this.value = null;
+  }
+
 }

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
(original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/MindistSearchTest.java
Thu May 17 13:06:31 2012
@@ -34,8 +34,14 @@ import org.apache.hadoop.io.SequenceFile
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
 import org.apache.hama.examples.MindistSearch.MinTextCombiner;
+import org.apache.hama.examples.MindistSearch.MindistSearchVertex;
 import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.GraphJobRunner;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
@@ -78,8 +84,8 @@ public class MindistSearchTest extends T
     fs = FileSystem.get(conf);
   }
 
-  public void testPageRank() throws Exception {
-    generateSeqTestData();
+  public void testMindistSearch() throws Exception {
+    generateSeqTestData(tmp);
     try {
       MindistSearch.main(new String[] { INPUT, OUTPUT });
 
@@ -112,17 +118,17 @@ public class MindistSearchTest extends T
     }
   }
 
-  private void generateSeqTestData() throws IOException {
+  private void generateSeqTestData(Map<VertexWritable, VertexArrayWritable> map)
+      throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
         INPUT), VertexWritable.class, VertexArrayWritable.class);
-    for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
+    for (Map.Entry<VertexWritable, VertexArrayWritable> e : map.entrySet()) {
       writer.append(e.getKey(), e.getValue());
     }
     writer.close();
   }
 
-  public void testPageRankUtil() throws IOException, InterruptedException,
-      ClassNotFoundException, InstantiationException, IllegalAccessException {
+  public void testPageRankUtil() throws Exception {
     generateTestTextData();
     // <input path> <output path>
     PagerankTextToSeq.main(new String[] { TEXT_INPUT, TEXT_OUTPUT });
@@ -135,6 +141,45 @@ public class MindistSearchTest extends T
     }
   }
 
+  public void testRepairFunctionality() throws Exception {
+    // make a copy to be safe with parallel test executions
+    final Map<VertexWritable, VertexArrayWritable> map = new HashMap<VertexWritable,
VertexArrayWritable>(
+        tmp);
+    // removing 7 should resulting in creating it and getting the same result as
+    // usual
+    map.remove(new VertexWritable("7"));
+    generateSeqTestData(map);
+    try {
+      HamaConfiguration conf = new HamaConfiguration(new Configuration());
+      conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
+      GraphJob connectedComponentsJob = new GraphJob(conf,
+          MindistSearchVertex.class);
+      connectedComponentsJob.setJobName("Mindist Search");
+
+      connectedComponentsJob.setVertexClass(MindistSearchVertex.class);
+      connectedComponentsJob.setInputPath(new Path(INPUT));
+      connectedComponentsJob.setOutputPath(new Path(OUTPUT));
+      // set the min text combiner here
+      connectedComponentsJob.setCombinerClass(MinTextCombiner.class);
+
+      // set the defaults
+      connectedComponentsJob.setMaxIteration(30);
+      connectedComponentsJob.setInputFormat(SequenceFileInputFormat.class);
+      connectedComponentsJob.setPartitioner(HashPartitioner.class);
+      connectedComponentsJob.setOutputFormat(SequenceFileOutputFormat.class);
+      connectedComponentsJob.setOutputKeyClass(Text.class);
+      connectedComponentsJob.setOutputValueClass(Text.class);
+
+      if (connectedComponentsJob.waitForCompletion(true)) {
+        verifyResult();
+      } else {
+        fail("Job not completed correctly!");
+      }
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
   private void generateTestTextData() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
     for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {

Modified: incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
(original)
+++ incubator/hama/trunk/examples/src/test/java/org/apache/hama/examples/PageRankTest.java
Thu May 17 13:06:31 2012
@@ -32,7 +32,14 @@ import org.apache.hadoop.io.DoubleWritab
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
 import org.apache.hama.HamaConfiguration;
+import org.apache.hama.bsp.HashPartitioner;
+import org.apache.hama.bsp.SequenceFileInputFormat;
+import org.apache.hama.bsp.SequenceFileOutputFormat;
+import org.apache.hama.examples.PageRank.PageRankVertex;
 import org.apache.hama.examples.util.PagerankTextToSeq;
+import org.apache.hama.graph.AverageAggregator;
+import org.apache.hama.graph.GraphJob;
+import org.apache.hama.graph.GraphJobRunner;
 import org.apache.hama.graph.VertexArrayWritable;
 import org.apache.hama.graph.VertexWritable;
 
@@ -87,7 +94,7 @@ public class PageRankTest extends TestCa
   }
 
   public void testPageRank() throws Exception {
-    generateSeqTestData();
+    generateSeqTestData(tmp);
     try {
       // Usage: <input> <output> [damping factor (default 0.85)] [Epsilon
       // (convergence error, default 0.001)] [Max iterations (default 30)]
@@ -113,7 +120,8 @@ public class PageRankTest extends TestCa
     assertTrue(sum > 0.99d && sum <= 1d);
   }
 
-  private void generateSeqTestData() throws IOException {
+  private void generateSeqTestData(Map<VertexWritable, VertexArrayWritable> tmp)
+      throws IOException {
     SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, new Path(
         INPUT), VertexWritable.class, VertexArrayWritable.class);
     for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {
@@ -137,6 +145,47 @@ public class PageRankTest extends TestCa
     }
   }
 
+  public void testRepairFunctionality() throws Exception {
+    // make a copy to be safe with parallel test executions
+    final Map<VertexWritable, VertexArrayWritable> map = new HashMap<VertexWritable,
VertexArrayWritable>(
+        tmp);
+    // removing google should resulting in creating it and getting the same
+    // result as usual
+    map.remove(new VertexWritable("google.com"));
+    generateSeqTestData(map);
+    try {
+      HamaConfiguration conf = new HamaConfiguration(new Configuration());
+      conf.setBoolean(GraphJobRunner.GRAPH_REPAIR, true);
+      GraphJob pageJob = new GraphJob(conf, PageRank.class);
+      pageJob.setJobName("Pagerank");
+
+      pageJob.setVertexClass(PageRankVertex.class);
+      pageJob.setInputPath(new Path(INPUT));
+      pageJob.setOutputPath(new Path(OUTPUT));
+
+      // set the defaults
+      pageJob.setMaxIteration(30);
+      pageJob.set("hama.pagerank.alpha", "0.85");
+      // we need to include a vertex in its adjacency list,
+      // otherwise the pagerank result has a constant loss
+      pageJob.set("hama.graph.self.ref", "true");
+
+      pageJob.setAggregatorClass(AverageAggregator.class);
+
+      pageJob.setInputFormat(SequenceFileInputFormat.class);
+      pageJob.setPartitioner(HashPartitioner.class);
+      pageJob.setOutputFormat(SequenceFileOutputFormat.class);
+      pageJob.setOutputKeyClass(Text.class);
+      pageJob.setOutputValueClass(DoubleWritable.class);
+
+      if (!pageJob.waitForCompletion(true)) {
+        fail("Job did not complete normally!");
+      }
+    } finally {
+      deleteTempDirs();
+    }
+  }
+
   private void generateTestTextData() throws IOException {
     BufferedWriter writer = new BufferedWriter(new FileWriter(TEXT_INPUT));
     for (Map.Entry<VertexWritable, VertexArrayWritable> e : tmp.entrySet()) {

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Edge.java Thu May 17 13:06:31
2012
@@ -44,6 +44,7 @@ public class Edge {
   }
 
   public String toString() {
-    return this.getDestVertexID() + ":" + this.getCost();
+    return this.getName() + " -> " + this.getDestVertexID() + ":"
+        + this.getCost();
   }
 }

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu
May 17 13:06:31 2012
@@ -19,6 +19,8 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -57,7 +59,8 @@ public class GraphJobRunner extends BSP 
   private static final Text FLAG_AGGREGATOR_INCREMENT = new Text(
       S_FLAG_AGGREGATOR_INCREMENT);
 
-  private static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  public static final String GRAPH_REPAIR = "hama.graph.repair";
 
   private Configuration conf;
   private Combiner<? extends Writable> combiner;
@@ -81,13 +84,14 @@ public class GraphJobRunner extends BSP 
   private int maxIteration = -1;
   private long iteration;
 
-  // TODO check if our graph is not broken and repair
   public void setup(BSPPeer peer) throws IOException, SyncException,
       InterruptedException {
     this.conf = peer.getConfiguration();
     // Choose one as a master to collect global updates
     this.masterTask = peer.getPeerName(0);
 
+    boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
+
     if (!conf.getClass(MESSAGE_COMBINER_CLASS, Combiner.class).equals(
         Combiner.class)) {
       LOG.debug("Combiner class: " + conf.get(MESSAGE_COMBINER_CLASS));
@@ -110,7 +114,7 @@ public class GraphJobRunner extends BSP 
       }
     }
 
-    loadVertices(peer);
+    loadVertices(peer, repairNeeded);
     numberVertices = vertices.size() * peer.getNumPeers();
     // TODO refactor this to a single step
     for (Map.Entry<String, Vertex> e : vertices.entrySet()) {
@@ -144,8 +148,8 @@ public class GraphJobRunner extends BSP 
 
       // Map <vertexID, messages>
       final Map<String, LinkedList<Writable>> messages = parseMessages(peer);
-      if (isMasterTask(peer) && peer.getSuperstepCount() > 1) {
-
+      // use iterations here, since repair can skew the number of supersteps
+      if (isMasterTask(peer) && iteration > 1) {
         MapWritable updatedCnt = new MapWritable();
         // exit if there's no update made
         if (globalUpdateCounts == 0) {
@@ -173,7 +177,7 @@ public class GraphJobRunner extends BSP 
       }
       // if we have an aggregator defined, we must make an additional sync
       // to have the updated values available on all our peers.
-      if (aggregator != null && peer.getSuperstepCount() > 1) {
+      if (aggregator != null && iteration > 1) {
         peer.sync();
 
         MapWritable updatedValues = (MapWritable) peer.getCurrentMessage();
@@ -273,7 +277,8 @@ public class GraphJobRunner extends BSP 
     return msgMap;
   }
 
-  private void loadVertices(BSPPeer peer) throws IOException {
+  private void loadVertices(BSPPeer peer, boolean repairNeeded)
+      throws IOException {
     LOG.debug("vertex class: " + conf.get("hama.graph.vertex.class"));
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
     KeyValuePair<? extends VertexWritable, ? extends VertexArrayWritable> next = null;
@@ -304,6 +309,53 @@ public class GraphJobRunner extends BSP 
       vertex.setup(conf);
       vertices.put(next.getKey().getName(), vertex);
     }
+
+    /*
+     * If the user want to repair the graph, it should traverse through that
+     * local chunk of adjancency list and message the corresponding peer to
+     * check whether that vertex exists. In real-life this may be dead-ending
+     * vertices, since we have no information about outgoing edges. Mainly this
+     * procedure is to prevent NullPointerExceptions from happening.
+     */
+    if (repairNeeded) {
+      LOG.debug("Starting repair of this graph!");
+      final Collection<Vertex> entries = vertices.values();
+      for (Vertex entry : entries) {
+        List<Edge> outEdges = entry.getOutEdges();
+        for (Edge e : outEdges) {
+          peer.send(e.getDestVertexID(), new Text(e.getName()));
+        }
+      }
+      try {
+        peer.sync();
+      } catch (Exception e) {
+        // we can't really recover from that, so fail this task
+        throw new RuntimeException(e);
+      }
+      Text vertexName = null;
+      while ((vertexName = (Text) peer.getCurrentMessage()) != null) {
+        String vName = vertexName.toString();
+        if (!vertices.containsKey(vName)) {
+          Vertex<? extends Writable> vertex = (Vertex<? extends Writable>) ReflectionUtils
+              .newInstance(
+                  conf.getClass("hama.graph.vertex.class", Vertex.class), conf);
+          vertex.peer = peer;
+          vertex.setVertexID(vName);
+          vertex.runner = this;
+          if (selfReference) {
+            String target = peer.getPeerName(Math.abs((vertex.hashCode() % peer
+                .getAllPeerNames().length)));
+            vertex.edges = Collections
+                .singletonList(new Edge(vertex.getVertexID(), target, 0));
+          } else {
+            vertex.edges = Collections.emptyList();
+          }
+          vertex.setup(conf);
+          vertices.put(vName, vertex);
+        }
+      }
+    }
+
   }
 
   /**

Modified: incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java
URL: http://svn.apache.org/viewvc/incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java?rev=1339586&r1=1339585&r2=1339586&view=diff
==============================================================================
--- incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java (original)
+++ incubator/hama/trunk/graph/src/main/java/org/apache/hama/graph/Vertex.java Thu May 17
13:06:31 2012
@@ -119,7 +119,8 @@ public abstract class Vertex<M extends W
 
   @Override
   public String toString() {
-    return getVertexID() + "=" + getValue();
+    return getVertexID() + (getValue() != null ? " = " + getValue() : "")
+        + " // " + edges;
   }
 
 }



Mime
View raw message