hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Suraj Menon <surajsme...@apache.org>
Subject Re: svn commit: r1412065 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Mon, 03 Dec 2012 20:39:46 GMT
Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit tests
failed because there were instances where I added duplicates of vertices in
the list. Are you seeing this problem with this change because I don't see
an uniqueness check before adding? I propose using the VerticesInfo class
hiding the implementation detail of the vertex collection inside. But we
need to act if there is an issue here.

-Suraj

On Wed, Nov 21, 2012 at 6:39 AM, <edwardyoon@apache.org> wrote:

> Author: edwardyoon
> Date: Wed Nov 21 11:39:50 2012
> New Revision: 1412065
>
> URL: http://svn.apache.org/viewvc?rev=1412065&view=rev
> Log:
> Optimize memory use.
>
> Modified:
>
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
>
> Modified:
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> URL:
> http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff
>
> ==============================================================================
> ---
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> (original)
> +++
> hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> Wed Nov 21 11:39:50 2012
> @@ -24,7 +24,6 @@ import java.util.HashMap;
>  import java.util.List;
>  import java.util.Map;
>  import java.util.Map.Entry;
> -import java.util.Set;
>
>  import org.apache.commons.logging.Log;
>  import org.apache.commons.logging.LogFactory;
> @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte
>    private Combiner<M> combiner;
>    private Partitioner<V, M> partitioner;
>
> -  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V,
E,
> M>>();
> +  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V,
E,
> M>>();
>
>    private boolean updated = true;
>    private int globalUpdateCounts = 0;
> @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte
>    public final void cleanup(
>        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> peer)
>        throws IOException {
> -    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
> -      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
> +    for (Vertex<V, E, M> e : vertices) {
> +      peer.write(e.getVertexID(), e.getValue());
>      }
>    }
>
> @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte
>        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> peer)
>        throws IOException {
>      int activeVertices = 0;
> -    for (Vertex<V, E, M> vertex : vertices.values()) {
> +    for (Vertex<V, E, M> vertex : vertices) {
>        List<M> msgs = messages.get(vertex.getVertexID());
>        // If there are newly received messages, restart.
>        if (vertex.isHalted() && msgs != null) {
> @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte
>    private void doInitialSuperstep(
>        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> peer)
>        throws IOException {
> -    for (Vertex<V, E, M> vertex : vertices.values()) {
> +    for (Vertex<V, E, M> vertex : vertices) {
>        List<M> singletonList =
> Collections.singletonList(vertex.getValue());
>        M lastValue = vertex.getValue();
>        vertex.compute(singletonList.iterator());
> @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte
>          peer.send(peer.getPeerName(partition), new
> GraphJobMessage(vertex));
>        } else {
>          vertex.setup(conf);
> -        vertices.put(vertex.getVertexID(), vertex);
> +        vertices.add(vertex);
>        }
>        vertex = newVertexInstance(vertexClass, conf);
>        vertex.runner = this;
> @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte
>              Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
> msg.getVertex();
>              messagedVertex.runner = this;
>              messagedVertex.setup(conf);
> -            vertices.put(messagedVertex.getVertexID(), messagedVertex);
> +            vertices.add(messagedVertex);
>            }
>            startPos = peer.getPos();
>          }
> @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte
>          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
> msg.getVertex();
>          messagedVertex.runner = this;
>          messagedVertex.setup(conf);
> -        vertices.put(messagedVertex.getVertexID(), messagedVertex);
> +        vertices.add(messagedVertex);
>        }
>      }
>      LOG.debug("Loading finished at " + peer.getSuperstepCount() + "
> steps.");
> @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte
>       */
>      if (repairNeeded) {
>        LOG.debug("Starting repair of this graph!");
> +      repair(peer, partitioningSteps, selfReference);
> +    }
>
> -      int multiSteps = 0;
> -      MapWritable ssize = new MapWritable();
> -      ssize.put(new IntWritable(peer.getPeerIndex()),
> -          new IntWritable(vertices.size()));
> -      peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
> -      ssize = null;
> -      peer.sync();
> +    LOG.debug("Starting Vertex processing!");
> +  }
>
> -      if (isMasterTask(peer)) {
> -        int minVerticesSize = Integer.MAX_VALUE;
> -        GraphJobMessage received = null;
> -        while ((received = peer.getCurrentMessage()) != null) {
> -          MapWritable x = received.getMap();
> -          for (Entry<Writable, Writable> e : x.entrySet()) {
> -            int curr = ((IntWritable) e.getValue()).get();
> -            if (minVerticesSize > curr) {
> -              minVerticesSize = curr;
> -            }
> -          }
> -        }
> +  @SuppressWarnings("unchecked")
> +  private void repair(
> +      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> peer,
> +      int partitioningSteps, boolean selfReference) throws IOException,
> +      SyncException, InterruptedException {
>
> -        if (minVerticesSize < (partitioningSteps * 2)) {
> -          multiSteps = minVerticesSize;
> -        } else {
> -          multiSteps = (partitioningSteps * 2);
> -        }
> +    int multiSteps = 0;
> +    MapWritable ssize = new MapWritable();
> +    ssize.put(new IntWritable(peer.getPeerIndex()),
> +        new IntWritable(vertices.size()));
> +    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
> +    ssize = null;
> +    peer.sync();
>
> -        for (String peerName : peer.getAllPeerNames()) {
> -          MapWritable temp = new MapWritable();
> -          temp.put(new Text("steps"), new IntWritable(multiSteps));
> -          peer.send(peerName, new GraphJobMessage(temp));
> +    if (isMasterTask(peer)) {
> +      int minVerticesSize = Integer.MAX_VALUE;
> +      GraphJobMessage received = null;
> +      while ((received = peer.getCurrentMessage()) != null) {
> +        MapWritable x = received.getMap();
> +        for (Entry<Writable, Writable> e : x.entrySet()) {
> +          int curr = ((IntWritable) e.getValue()).get();
> +          if (minVerticesSize > curr) {
> +            minVerticesSize = curr;
> +          }
>          }
>        }
> -      peer.sync();
>
> -      GraphJobMessage received = peer.getCurrentMessage();
> -      MapWritable x = received.getMap();
> -      for (Entry<Writable, Writable> e : x.entrySet()) {
> -        multiSteps = ((IntWritable) e.getValue()).get();
> -      }
> -
> -      Set<V> keys = vertices.keySet();
> -      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
> -
> -      int i = 0;
> -      int syncs = 0;
> -      for (V v : keys) {
> -        Vertex<V, E, M> vertex2 = vertices.get(v);
> -        for (Edge<V, E> e : vertices.get(v).getEdges()) {
> -          peer.send(vertex2.getDestinationPeerName(e),
> -              new GraphJobMessage(e.getDestinationVertexID()));
> -        }
> +      if (minVerticesSize < (partitioningSteps * 2)) {
> +        multiSteps = minVerticesSize;
> +      } else {
> +        multiSteps = (partitioningSteps * 2);
> +      }
>
> -        if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) ==
> 0) {
> -          peer.sync();
> -          syncs++;
> -          GraphJobMessage msg = null;
> -          while ((msg = peer.getCurrentMessage()) != null) {
> -            V vertexName = (V) msg.getVertexId();
> -            if (!vertices.containsKey(vertexName)) {
> -              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
> conf);
> -              newVertex.setVertexID(vertexName);
> -              newVertex.runner = this;
> -              if (selfReference) {
> -                newVertex.setEdges(Collections.singletonList(new Edge<V,
> E>(
> -                    newVertex.getVertexID(), null)));
> -              } else {
> -                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> -              }
> -              newVertex.setup(conf);
> -              tmp.put(vertexName, newVertex);
> -            }
> -          }
> -        }
> -        i++;
> +      for (String peerName : peer.getAllPeerNames()) {
> +        MapWritable temp = new MapWritable();
> +        temp.put(new Text("steps"), new IntWritable(multiSteps));
> +        peer.send(peerName, new GraphJobMessage(temp));
>        }
> +    }
> +    peer.sync();
> +
> +    GraphJobMessage received = peer.getCurrentMessage();
> +    MapWritable x = received.getMap();
> +    for (Entry<Writable, Writable> e : x.entrySet()) {
> +      multiSteps = ((IntWritable) e.getValue()).get();
> +    }
> +
> +    Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
> +
> +    int i = 0;
> +    int syncs = 0;
> +
> +    for (Vertex<V, E, M> v : vertices) {
> +      for (Edge<V, E> e : v.getEdges()) {
> +        peer.send(v.getDestinationPeerName(e),
> +            new GraphJobMessage(e.getDestinationVertexID()));
> +      }
> +
> +      if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) ==
> 0) {
> +        peer.sync();
> +        syncs++;
> +        GraphJobMessage msg = null;
> +        while ((msg = peer.getCurrentMessage()) != null) {
> +          V vertexName = (V) msg.getVertexId();
>
> -      peer.sync();
> -      GraphJobMessage msg = null;
> -      while ((msg = peer.getCurrentMessage()) != null) {
> -        V vertexName = (V) msg.getVertexId();
> -        if (!vertices.containsKey(vertexName)) {
>            Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
> conf);
>            newVertex.setVertexID(vertexName);
>            newVertex.runner = this;
> @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte
>              newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
>            }
>            newVertex.setup(conf);
> -          vertices.put(vertexName, newVertex);
> -          newVertex = null;
> +          tmp.put(vertexName, newVertex);
> +
>          }
>        }
> +      i++;
> +    }
> +
> +    peer.sync();
> +    GraphJobMessage msg = null;
> +    while ((msg = peer.getCurrentMessage()) != null) {
> +      V vertexName = (V) msg.getVertexId();
>
> -      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
> -        vertices.put(e.getKey(), e.getValue());
> +      Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
> +      newVertex.setVertexID(vertexName);
> +      newVertex.runner = this;
> +      if (selfReference) {
> +        newVertex.setEdges(Collections.singletonList(new Edge<V,
> E>(newVertex
> +            .getVertexID(), null)));
> +      } else {
> +        newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
>        }
> -      tmp.clear();
> +      newVertex.setup(conf);
> +      tmp.put(vertexName, newVertex);
> +      newVertex = null;
> +
>      }
>
> -    LOG.debug("Starting Vertex processing!");
> +    for (Vertex<V, E, M> e : vertices) {
> +      if (tmp.containsKey((e.getVertexID()))) {
> +        tmp.remove(e.getVertexID());
> +      }
> +    }
> +
> +    vertices.addAll(tmp.values());
> +    tmp.clear();
>    }
>
>    /**
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message