hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1378811 - in /hama/trunk: ./ core/src/main/java/org/apache/hama/bsp/ core/src/test/java/org/apache/hama/bsp/ graph/src/main/java/org/apache/hama/graph/ graph/src/test/java/org/apache/hama/graph/
Date Thu, 30 Aug 2012 06:19:51 GMT
Author: edwardyoon
Date: Thu Aug 30 06:19:50 2012
New Revision: 1378811

URL: http://svn.apache.org/viewvc?rev=1378811&view=rev
Log:
Improvement of network-based runtime partitioner

Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
    hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
    hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
    hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Thu Aug 30 06:19:50 2012
@@ -12,6 +12,8 @@ Release 0.6 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-599: Improvement of network-based runtime partitioner (edwardyoon)
+  
 Release 0.5 - April 10, 2012 
 
   NEW FEATURES

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeer.java Thu Aug 30 06:19:50 2012
@@ -181,4 +181,15 @@ public interface BSPPeer<K1, V1, K2, V2,
    *          incremented.
    */
   public void incrementCounter(String group, String counter, long amount);
+
+  /**
+   * @return the size of assigned split
+   */
+  public long getSplitSize();
+  
+  /**
+   * @return the current position of the file read pointer
+   * @throws IOException
+   */
+  public long getPos() throws IOException;
 }

Modified: hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java (original)
+++ hama/trunk/core/src/main/java/org/apache/hama/bsp/BSPPeerImpl.java Thu Aug 30 06:19:50
2012
@@ -95,6 +95,8 @@ public final class BSPPeerImpl<K1, V1, K
 
   private FaultTolerantPeerService<M> faultToleranceService;
 
+  private long splitSize = 0L;
+  
   /**
    * Protected default constructor for LocalBSPRunner.
    */
@@ -335,9 +337,21 @@ public final class BSPPeerImpl<K1, V1, K
           .getRecordReader(inputSplit, bspJob),
           getCounter(BSPPeerImpl.PeerCounter.TASK_INPUT_RECORDS),
           getCounter(BSPPeerImpl.PeerCounter.IO_BYTES_READ));
+      this.splitSize = inputSplit.getLength();
     }
   }
 
+  /**
+   * @return the size of assigned split
+   */
+  public long getSplitSize() {
+    return splitSize;
+  }
+  
+  public long getPos() throws IOException {
+    return in.getPos();
+  }
+  
   public final void initilizeMessaging() throws ClassNotFoundException {
     messenger = MessageManagerFactory.getMessageManager(conf);
     messenger.init(taskId, this, conf, peerAddress);

Modified: hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java
URL: http://svn.apache.org/viewvc/hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java (original)
+++ hama/trunk/core/src/test/java/org/apache/hama/bsp/TestCheckpoint.java Thu Aug 30 06:19:50
2012
@@ -286,6 +286,18 @@ public class TestCheckpoint extends Test
 
     }
 
+    @Override
+    public long getSplitSize() {
+      // TODO Auto-generated method stub
+      return 0;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      // TODO Auto-generated method stub
+      return 0;
+    }
+
   }
 
   public static class TempSyncClient extends BSPPeerSyncClient {

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Thu Aug 30 06:19:50
2012
@@ -19,18 +19,18 @@ package org.apache.hama.graph;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -386,14 +386,24 @@ public final class GraphJobRunner<V exte
       VertexInputReader<Writable, Writable, V, E, M> reader)
       throws IOException, SyncException, InterruptedException {
 
+    // //////////////////////////////////
+    long splitSize = peer.getSplitSize();
+    int partitioningSteps = computeMultiSteps(peer, splitSize);
+    long interval = splitSize / partitioningSteps;
+    // //////////////////////////////////
+
     LOG.debug("vertex class: " + vertexClass);
     boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
     Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
     vertex.setPeer(peer);
     vertex.runner = this;
 
+    long startPos = peer.getPos();
+    if (startPos == 0)
+      startPos = 1L;
+
     KeyValuePair<Writable, Writable> next = null;
-    int lines = 0;
+    int steps = 1;
     while ((next = peer.readNext()) != null) {
       boolean vertexFinished = reader.parseVertex(next.getKey(),
           next.getValue(), vertex);
@@ -428,22 +438,26 @@ public final class GraphJobRunner<V exte
       vertex.setPeer(peer);
       vertex.runner = this;
 
-      lines++;
-      if ((lines % 100000) == 0) {
-        peer.sync();
-        GraphJobMessage msg = null;
-        while ((msg = peer.getCurrentMessage()) != null) {
-          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-          messagedVertex.setPeer(peer);
-          messagedVertex.runner = this;
-          messagedVertex.setup(conf);
-          vertices.put(messagedVertex.getVertexID(), messagedVertex);
+      if (runtimePartitioning) {
+        if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval)
{
+          peer.sync();
+          steps++;
+          GraphJobMessage msg = null;
+          while ((msg = peer.getCurrentMessage()) != null) {
+            Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+            messagedVertex.setPeer(peer);
+            messagedVertex.runner = this;
+            messagedVertex.setup(conf);
+            vertices.put(messagedVertex.getVertexID(), messagedVertex);
+          }
+          startPos = peer.getPos();
         }
       }
     }
 
     if (runtimePartitioning) {
       peer.sync();
+
       GraphJobMessage msg = null;
       while ((msg = peer.getCurrentMessage()) != null) {
         Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
@@ -453,8 +467,7 @@ public final class GraphJobRunner<V exte
         vertices.put(messagedVertex.getVertexID(), messagedVertex);
       }
     }
-
-    LOG.info("Loading finished at " + peer.getSuperstepCount() + " steps.");
+    LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
 
     /*
      * If the user want to repair the graph, it should traverse through that
@@ -465,14 +478,88 @@ public final class GraphJobRunner<V exte
      */
     if (repairNeeded) {
       LOG.debug("Starting repair of this graph!");
-      final Collection<Vertex<V, E, M>> entries = vertices.values();
-      for (Vertex<V, E, M> entry : entries) {
-        List<Edge<V, E>> outEdges = entry.getEdges();
-        for (Edge<V, E> e : outEdges) {
+      
+      int multiSteps = 0;
+      MapWritable ssize = new MapWritable();
+      ssize
+          .put(new IntWritable(peer.getPeerIndex()), new IntWritable(vertices.size()));
+      peer.send(masterTask, new GraphJobMessage(ssize));
+      ssize = null;
+      peer.sync();
+
+      if (this.isMasterTask(peer)) {
+        int minVerticesSize = Integer.MAX_VALUE;
+        GraphJobMessage received = null;
+        while ((received = peer.getCurrentMessage()) != null) {
+          MapWritable x = received.getMap();
+          for (Entry<Writable, Writable> e : x.entrySet()) {
+            int curr = ((IntWritable) e.getValue()).get();
+            if (minVerticesSize > curr) {
+              minVerticesSize = curr;
+            }
+          }
+        }
+
+        if(minVerticesSize < (partitioningSteps * 2)) {
+          multiSteps = minVerticesSize;
+        } else {
+          multiSteps = (partitioningSteps * 2);
+        }
+
+        for (String peerName : peer.getAllPeerNames()) {
+          MapWritable temp = new MapWritable();
+          temp.put(new Text("steps"), new IntWritable(multiSteps));
+          peer.send(peerName, new GraphJobMessage(temp));
+        }
+      }
+      peer.sync();
+
+      GraphJobMessage received = peer.getCurrentMessage();
+      MapWritable x = received.getMap();
+      for (Entry<Writable, Writable> e : x.entrySet()) {
+        multiSteps = ((IntWritable) e.getValue()).get();
+      }
+      
+      Set<V> keys = vertices.keySet();
+      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
+
+      int i = 0;
+      int syncs = 0;
+      for (V v : keys) {
+        for (Edge<V, E> e : vertices.get(v).getEdges()) {
           peer.send(e.getDestinationPeerName(),
               new GraphJobMessage(e.getDestinationVertexID()));
         }
+
+        if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+          peer.sync();
+          syncs++;
+          GraphJobMessage msg = null;
+          while ((msg = peer.getCurrentMessage()) != null) {
+            V vertexName = (V) msg.getVertexId();
+            if (!vertices.containsKey(vertexName)) {
+              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+              newVertex.setPeer(peer);
+              newVertex.setVertexID(vertexName);
+              newVertex.runner = this;
+              if (selfReference) {
+                int partition = partitioner.getPartition(
+                    newVertex.getVertexID(), newVertex.getValue(),
+                    peer.getNumPeers());
+                String target = peer.getPeerName(partition);
+                newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+                    newVertex.getVertexID(), target, null)));
+              } else {
+                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
+              }
+              newVertex.setup(conf);
+              tmp.put(vertexName, newVertex);
+            }
+          }
+        }
+        i++;
       }
+
       peer.sync();
       GraphJobMessage msg = null;
       while ((msg = peer.getCurrentMessage()) != null) {
@@ -493,10 +580,62 @@ public final class GraphJobRunner<V exte
           }
           newVertex.setup(conf);
           vertices.put(vertexName, newVertex);
+          newVertex = null;
+        }
+      }
+      
+      for(Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
+        vertices.put(e.getKey(), e.getValue());
+      }
+      tmp.clear();
+    }
+
+    LOG.debug("Starting Vertex processing!");
+  }
+
+  private int computeMultiSteps(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      long splitSize) throws IOException, SyncException, InterruptedException {
+    int multiSteps = 1;
+
+    MapWritable ssize = new MapWritable();
+    ssize
+        .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize));
+    peer.send(masterTask, new GraphJobMessage(ssize));
+    ssize = null;
+    peer.sync();
+
+    if (this.isMasterTask(peer)) {
+      long maxSplitSize = 0L;
+      GraphJobMessage received = null;
+      while ((received = peer.getCurrentMessage()) != null) {
+        MapWritable x = received.getMap();
+        for (Entry<Writable, Writable> e : x.entrySet()) {
+          long curr = ((LongWritable) e.getValue()).get();
+          if (maxSplitSize < curr) {
+            maxSplitSize = curr;
+          }
         }
       }
+
+      int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
+
+      for (String peerName : peer.getAllPeerNames()) {
+        MapWritable temp = new MapWritable();
+        temp.put(new Text("max"), new IntWritable(steps));
+        peer.send(peerName, new GraphJobMessage(temp));
+      }
     }
+    peer.sync();
 
+    GraphJobMessage received = peer.getCurrentMessage();
+    MapWritable x = received.getMap();
+    for (Entry<Writable, Writable> e : x.entrySet()) {
+      multiSteps = ((IntWritable) e.getValue()).get();
+    }
+    LOG.info(peer.getPeerName() + ": " + multiSteps);
+    return multiSteps;
   }
 
   /**

Modified: hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java?rev=1378811&r1=1378810&r2=1378811&view=diff
==============================================================================
--- hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java (original)
+++ hama/trunk/graph/src/test/java/org/apache/hama/graph/TestSubmitGraphJob.java Thu Aug 30
06:19:50 2012
@@ -18,6 +18,7 @@
 package org.apache.hama.graph;
 
 import java.io.BufferedWriter;
+import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 
@@ -54,7 +55,10 @@ public class TestSubmitGraphJob extends 
   public void testSubmitJob() throws Exception {
 
     generateTestData();
-
+    
+    // Set multi-step partitioning interval to 30 bytes
+    configuration.setInt("hama.graph.multi.step.partitioning.interval", 30);
+    
     GraphJob bsp = new GraphJob(configuration, PageRank.class);
     bsp.setInputPath(new Path(INPUT));
     bsp.setOutputPath(new Path(OUTPUT));
@@ -127,6 +131,10 @@ public class TestSubmitGraphJob extends 
       if (bw != null) {
         try {
           bw.close();
+          
+          File file = new File(INPUT);
+          LOG.info("Temp file length: " + file.length());
+          
         } catch (IOException e) {
           e.printStackTrace();
         }



Mime
View raw message