hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1412065 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Wed, 21 Nov 2012 11:39:51 GMT
Author: edwardyoon
Date: Wed Nov 21 11:39:50 2012
New Revision: 1412065

URL: http://svn.apache.org/viewvc?rev=1412065&view=rev
Log:
Optimize memory use.

Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Wed Nov 21 11:39:50
2012
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte
   private Combiner<M> combiner;
   private Partitioner<V, M> partitioner;
 
-  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
M>>();
+  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V, E, M>>();
 
   private boolean updated = true;
   private int globalUpdateCounts = 0;
@@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte
   public final void cleanup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
-      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+    for (Vertex<V, E, M> e : vertices) {
+      peer.write(e.getVertexID(), e.getValue());
     }
   }
 
@@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
     int activeVertices = 0;
-    for (Vertex<V, E, M> vertex : vertices.values()) {
+    for (Vertex<V, E, M> vertex : vertices) {
       List<M> msgs = messages.get(vertex.getVertexID());
       // If there are newly received messages, restart.
       if (vertex.isHalted() && msgs != null) {
@@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte
   private void doInitialSuperstep(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Vertex<V, E, M> vertex : vertices.values()) {
+    for (Vertex<V, E, M> vertex : vertices) {
       List<M> singletonList = Collections.singletonList(vertex.getValue());
       M lastValue = vertex.getValue();
       vertex.compute(singletonList.iterator());
@@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte
         peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
       } else {
         vertex.setup(conf);
-        vertices.put(vertex.getVertexID(), vertex);
+        vertices.add(vertex);
       }
       vertex = newVertexInstance(vertexClass, conf);
       vertex.runner = this;
@@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte
             Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
             messagedVertex.runner = this;
             messagedVertex.setup(conf);
-            vertices.put(messagedVertex.getVertexID(), messagedVertex);
+            vertices.add(messagedVertex);
           }
           startPos = peer.getPos();
         }
@@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte
         Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
         messagedVertex.runner = this;
         messagedVertex.setup(conf);
-        vertices.put(messagedVertex.getVertexID(), messagedVertex);
+        vertices.add(messagedVertex);
       }
     }
     LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
@@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte
      */
     if (repairNeeded) {
       LOG.debug("Starting repair of this graph!");
+      repair(peer, partitioningSteps, selfReference);
+    }
 
-      int multiSteps = 0;
-      MapWritable ssize = new MapWritable();
-      ssize.put(new IntWritable(peer.getPeerIndex()),
-          new IntWritable(vertices.size()));
-      peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
-      ssize = null;
-      peer.sync();
+    LOG.debug("Starting Vertex processing!");
+  }
 
-      if (isMasterTask(peer)) {
-        int minVerticesSize = Integer.MAX_VALUE;
-        GraphJobMessage received = null;
-        while ((received = peer.getCurrentMessage()) != null) {
-          MapWritable x = received.getMap();
-          for (Entry<Writable, Writable> e : x.entrySet()) {
-            int curr = ((IntWritable) e.getValue()).get();
-            if (minVerticesSize > curr) {
-              minVerticesSize = curr;
-            }
-          }
-        }
+  @SuppressWarnings("unchecked")
+  private void repair(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      int partitioningSteps, boolean selfReference) throws IOException,
+      SyncException, InterruptedException {
 
-        if (minVerticesSize < (partitioningSteps * 2)) {
-          multiSteps = minVerticesSize;
-        } else {
-          multiSteps = (partitioningSteps * 2);
-        }
+    int multiSteps = 0;
+    MapWritable ssize = new MapWritable();
+    ssize.put(new IntWritable(peer.getPeerIndex()),
+        new IntWritable(vertices.size()));
+    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
+    ssize = null;
+    peer.sync();
 
-        for (String peerName : peer.getAllPeerNames()) {
-          MapWritable temp = new MapWritable();
-          temp.put(new Text("steps"), new IntWritable(multiSteps));
-          peer.send(peerName, new GraphJobMessage(temp));
+    if (isMasterTask(peer)) {
+      int minVerticesSize = Integer.MAX_VALUE;
+      GraphJobMessage received = null;
+      while ((received = peer.getCurrentMessage()) != null) {
+        MapWritable x = received.getMap();
+        for (Entry<Writable, Writable> e : x.entrySet()) {
+          int curr = ((IntWritable) e.getValue()).get();
+          if (minVerticesSize > curr) {
+            minVerticesSize = curr;
+          }
         }
       }
-      peer.sync();
 
-      GraphJobMessage received = peer.getCurrentMessage();
-      MapWritable x = received.getMap();
-      for (Entry<Writable, Writable> e : x.entrySet()) {
-        multiSteps = ((IntWritable) e.getValue()).get();
-      }
-
-      Set<V> keys = vertices.keySet();
-      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
-
-      int i = 0;
-      int syncs = 0;
-      for (V v : keys) {
-        Vertex<V, E, M> vertex2 = vertices.get(v);
-        for (Edge<V, E> e : vertices.get(v).getEdges()) {
-          peer.send(vertex2.getDestinationPeerName(e),
-              new GraphJobMessage(e.getDestinationVertexID()));
-        }
+      if (minVerticesSize < (partitioningSteps * 2)) {
+        multiSteps = minVerticesSize;
+      } else {
+        multiSteps = (partitioningSteps * 2);
+      }
 
-        if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
-          peer.sync();
-          syncs++;
-          GraphJobMessage msg = null;
-          while ((msg = peer.getCurrentMessage()) != null) {
-            V vertexName = (V) msg.getVertexId();
-            if (!vertices.containsKey(vertexName)) {
-              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
-              newVertex.setVertexID(vertexName);
-              newVertex.runner = this;
-              if (selfReference) {
-                newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
-                    newVertex.getVertexID(), null)));
-              } else {
-                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
-              }
-              newVertex.setup(conf);
-              tmp.put(vertexName, newVertex);
-            }
-          }
-        }
-        i++;
+      for (String peerName : peer.getAllPeerNames()) {
+        MapWritable temp = new MapWritable();
+        temp.put(new Text("steps"), new IntWritable(multiSteps));
+        peer.send(peerName, new GraphJobMessage(temp));
       }
+    }
+    peer.sync();
+
+    GraphJobMessage received = peer.getCurrentMessage();
+    MapWritable x = received.getMap();
+    for (Entry<Writable, Writable> e : x.entrySet()) {
+      multiSteps = ((IntWritable) e.getValue()).get();
+    }
+
+    Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
+
+    int i = 0;
+    int syncs = 0;
+
+    for (Vertex<V, E, M> v : vertices) {
+      for (Edge<V, E> e : v.getEdges()) {
+        peer.send(v.getDestinationPeerName(e),
+            new GraphJobMessage(e.getDestinationVertexID()));
+      }
+
+      if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+        peer.sync();
+        syncs++;
+        GraphJobMessage msg = null;
+        while ((msg = peer.getCurrentMessage()) != null) {
+          V vertexName = (V) msg.getVertexId();
 
-      peer.sync();
-      GraphJobMessage msg = null;
-      while ((msg = peer.getCurrentMessage()) != null) {
-        V vertexName = (V) msg.getVertexId();
-        if (!vertices.containsKey(vertexName)) {
           Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
           newVertex.setVertexID(vertexName);
           newVertex.runner = this;
@@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte
             newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
           }
           newVertex.setup(conf);
-          vertices.put(vertexName, newVertex);
-          newVertex = null;
+          tmp.put(vertexName, newVertex);
+
         }
       }
+      i++;
+    }
+
+    peer.sync();
+    GraphJobMessage msg = null;
+    while ((msg = peer.getCurrentMessage()) != null) {
+      V vertexName = (V) msg.getVertexId();
 
-      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
-        vertices.put(e.getKey(), e.getValue());
+      Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+      newVertex.setVertexID(vertexName);
+      newVertex.runner = this;
+      if (selfReference) {
+        newVertex.setEdges(Collections.singletonList(new Edge<V, E>(newVertex
+            .getVertexID(), null)));
+      } else {
+        newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
       }
-      tmp.clear();
+      newVertex.setup(conf);
+      tmp.put(vertexName, newVertex);
+      newVertex = null;
+
     }
 
-    LOG.debug("Starting Vertex processing!");
+    for (Vertex<V, E, M> e : vertices) {
+      if (tmp.containsKey((e.getVertexID()))) {
+        tmp.remove(e.getVertexID());
+      }
+    }
+
+    vertices.addAll(tmp.values());
+    tmp.clear();
   }
 
   /**



Mime
View raw message