hama-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Thomas Jungblut <thomas.jungb...@gmail.com>
Subject Re: svn commit: r1412065 - /hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
Date Mon, 03 Dec 2012 20:51:24 GMT
This is certainly a very crude bug. The list does permit duplicates as
opposed to the map before.
I have already told that we have to remove the repair capability, because
this will add too much complexity  (O(n) vs. O(1) you know). It will get
especially slow when we have a disk based graph or a random access file in
the back of this VertexInfo class which is a nice abstraction.

So I would like to remove this and leave the data normalization up to the
user. The bug above will be solved with it anyway, so this has few
downsides (the major is of course that it is included in two releases
already).

2012/12/3 Suraj Menon <surajsmenon@apache.org>

> Hi, I made similar changes to the HAMA-632.patch-v2 and found my unit tests
> failed because there were instances where I added duplicates of vertices in
> the list. Are you seeing this problem with this change because I don't see
> an uniqueness check before adding? I propose using the VerticesInfo class
> hiding the implementation detail of the vertex collection inside. But we
> need to act if there is an issue here.
>
> -Suraj
>
> On Wed, Nov 21, 2012 at 6:39 AM, <edwardyoon@apache.org> wrote:
>
> > Author: edwardyoon
> > Date: Wed Nov 21 11:39:50 2012
> > New Revision: 1412065
> >
> > URL: http://svn.apache.org/viewvc?rev=1412065&view=rev
> > Log:
> > Optimize memory use.
> >
> > Modified:
> >
> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> >
> > Modified:
> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> > URL:
> >
> http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1412065&r1=1412064&r2=1412065&view=diff
> >
> >
> ==============================================================================
> > ---
> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> > (original)
> > +++
> > hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
> > Wed Nov 21 11:39:50 2012
> > @@ -24,7 +24,6 @@ import java.util.HashMap;
> >  import java.util.List;
> >  import java.util.Map;
> >  import java.util.Map.Entry;
> > -import java.util.Set;
> >
> >  import org.apache.commons.logging.Log;
> >  import org.apache.commons.logging.LogFactory;
> > @@ -72,7 +71,7 @@ public final class GraphJobRunner<V exte
> >    private Combiner<M> combiner;
> >    private Partitioner<V, M> partitioner;
> >
> > -  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V,
E,
> > M>>();
> > +  private List<Vertex<V, E, M>> vertices = new ArrayList<Vertex<V,
E,
> > M>>();
> >
> >    private boolean updated = true;
> >    private int globalUpdateCounts = 0;
> > @@ -144,8 +143,8 @@ public final class GraphJobRunner<V exte
> >    public final void cleanup(
> >        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> > peer)
> >        throws IOException {
> > -    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
> > -      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
> > +    for (Vertex<V, E, M> e : vertices) {
> > +      peer.write(e.getVertexID(), e.getValue());
> >      }
> >    }
> >
> > @@ -180,7 +179,7 @@ public final class GraphJobRunner<V exte
> >        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> > peer)
> >        throws IOException {
> >      int activeVertices = 0;
> > -    for (Vertex<V, E, M> vertex : vertices.values()) {
> > +    for (Vertex<V, E, M> vertex : vertices) {
> >        List<M> msgs = messages.get(vertex.getVertexID());
> >        // If there are newly received messages, restart.
> >        if (vertex.isHalted() && msgs != null) {
> > @@ -216,7 +215,7 @@ public final class GraphJobRunner<V exte
> >    private void doInitialSuperstep(
> >        BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> > peer)
> >        throws IOException {
> > -    for (Vertex<V, E, M> vertex : vertices.values()) {
> > +    for (Vertex<V, E, M> vertex : vertices) {
> >        List<M> singletonList =
> > Collections.singletonList(vertex.getValue());
> >        M lastValue = vertex.getValue();
> >        vertex.compute(singletonList.iterator());
> > @@ -341,7 +340,7 @@ public final class GraphJobRunner<V exte
> >          peer.send(peer.getPeerName(partition), new
> > GraphJobMessage(vertex));
> >        } else {
> >          vertex.setup(conf);
> > -        vertices.put(vertex.getVertexID(), vertex);
> > +        vertices.add(vertex);
> >        }
> >        vertex = newVertexInstance(vertexClass, conf);
> >        vertex.runner = this;
> > @@ -355,7 +354,7 @@ public final class GraphJobRunner<V exte
> >              Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
> > msg.getVertex();
> >              messagedVertex.runner = this;
> >              messagedVertex.setup(conf);
> > -            vertices.put(messagedVertex.getVertexID(), messagedVertex);
> > +            vertices.add(messagedVertex);
> >            }
> >            startPos = peer.getPos();
> >          }
> > @@ -370,7 +369,7 @@ public final class GraphJobRunner<V exte
> >          Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>)
> > msg.getVertex();
> >          messagedVertex.runner = this;
> >          messagedVertex.setup(conf);
> > -        vertices.put(messagedVertex.getVertexID(), messagedVertex);
> > +        vertices.add(messagedVertex);
> >        }
> >      }
> >      LOG.debug("Loading finished at " + peer.getSuperstepCount() + "
> > steps.");
> > @@ -384,89 +383,77 @@ public final class GraphJobRunner<V exte
> >       */
> >      if (repairNeeded) {
> >        LOG.debug("Starting repair of this graph!");
> > +      repair(peer, partitioningSteps, selfReference);
> > +    }
> >
> > -      int multiSteps = 0;
> > -      MapWritable ssize = new MapWritable();
> > -      ssize.put(new IntWritable(peer.getPeerIndex()),
> > -          new IntWritable(vertices.size()));
> > -      peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
> > -      ssize = null;
> > -      peer.sync();
> > +    LOG.debug("Starting Vertex processing!");
> > +  }
> >
> > -      if (isMasterTask(peer)) {
> > -        int minVerticesSize = Integer.MAX_VALUE;
> > -        GraphJobMessage received = null;
> > -        while ((received = peer.getCurrentMessage()) != null) {
> > -          MapWritable x = received.getMap();
> > -          for (Entry<Writable, Writable> e : x.entrySet()) {
> > -            int curr = ((IntWritable) e.getValue()).get();
> > -            if (minVerticesSize > curr) {
> > -              minVerticesSize = curr;
> > -            }
> > -          }
> > -        }
> > +  @SuppressWarnings("unchecked")
> > +  private void repair(
> > +      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage>
> > peer,
> > +      int partitioningSteps, boolean selfReference) throws IOException,
> > +      SyncException, InterruptedException {
> >
> > -        if (minVerticesSize < (partitioningSteps * 2)) {
> > -          multiSteps = minVerticesSize;
> > -        } else {
> > -          multiSteps = (partitioningSteps * 2);
> > -        }
> > +    int multiSteps = 0;
> > +    MapWritable ssize = new MapWritable();
> > +    ssize.put(new IntWritable(peer.getPeerIndex()),
> > +        new IntWritable(vertices.size()));
> > +    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
> > +    ssize = null;
> > +    peer.sync();
> >
> > -        for (String peerName : peer.getAllPeerNames()) {
> > -          MapWritable temp = new MapWritable();
> > -          temp.put(new Text("steps"), new IntWritable(multiSteps));
> > -          peer.send(peerName, new GraphJobMessage(temp));
> > +    if (isMasterTask(peer)) {
> > +      int minVerticesSize = Integer.MAX_VALUE;
> > +      GraphJobMessage received = null;
> > +      while ((received = peer.getCurrentMessage()) != null) {
> > +        MapWritable x = received.getMap();
> > +        for (Entry<Writable, Writable> e : x.entrySet()) {
> > +          int curr = ((IntWritable) e.getValue()).get();
> > +          if (minVerticesSize > curr) {
> > +            minVerticesSize = curr;
> > +          }
> >          }
> >        }
> > -      peer.sync();
> >
> > -      GraphJobMessage received = peer.getCurrentMessage();
> > -      MapWritable x = received.getMap();
> > -      for (Entry<Writable, Writable> e : x.entrySet()) {
> > -        multiSteps = ((IntWritable) e.getValue()).get();
> > -      }
> > -
> > -      Set<V> keys = vertices.keySet();
> > -      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V,
E, M>>();
> > -
> > -      int i = 0;
> > -      int syncs = 0;
> > -      for (V v : keys) {
> > -        Vertex<V, E, M> vertex2 = vertices.get(v);
> > -        for (Edge<V, E> e : vertices.get(v).getEdges()) {
> > -          peer.send(vertex2.getDestinationPeerName(e),
> > -              new GraphJobMessage(e.getDestinationVertexID()));
> > -        }
> > +      if (minVerticesSize < (partitioningSteps * 2)) {
> > +        multiSteps = minVerticesSize;
> > +      } else {
> > +        multiSteps = (partitioningSteps * 2);
> > +      }
> >
> > -        if (syncs < multiSteps && (i % (vertices.size() / multiSteps))
> ==
> > 0) {
> > -          peer.sync();
> > -          syncs++;
> > -          GraphJobMessage msg = null;
> > -          while ((msg = peer.getCurrentMessage()) != null) {
> > -            V vertexName = (V) msg.getVertexId();
> > -            if (!vertices.containsKey(vertexName)) {
> > -              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
> > conf);
> > -              newVertex.setVertexID(vertexName);
> > -              newVertex.runner = this;
> > -              if (selfReference) {
> > -                newVertex.setEdges(Collections.singletonList(new Edge<V,
> > E>(
> > -                    newVertex.getVertexID(), null)));
> > -              } else {
> > -                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> > -              }
> > -              newVertex.setup(conf);
> > -              tmp.put(vertexName, newVertex);
> > -            }
> > -          }
> > -        }
> > -        i++;
> > +      for (String peerName : peer.getAllPeerNames()) {
> > +        MapWritable temp = new MapWritable();
> > +        temp.put(new Text("steps"), new IntWritable(multiSteps));
> > +        peer.send(peerName, new GraphJobMessage(temp));
> >        }
> > +    }
> > +    peer.sync();
> > +
> > +    GraphJobMessage received = peer.getCurrentMessage();
> > +    MapWritable x = received.getMap();
> > +    for (Entry<Writable, Writable> e : x.entrySet()) {
> > +      multiSteps = ((IntWritable) e.getValue()).get();
> > +    }
> > +
> > +    Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E,
M>>();
> > +
> > +    int i = 0;
> > +    int syncs = 0;
> > +
> > +    for (Vertex<V, E, M> v : vertices) {
> > +      for (Edge<V, E> e : v.getEdges()) {
> > +        peer.send(v.getDestinationPeerName(e),
> > +            new GraphJobMessage(e.getDestinationVertexID()));
> > +      }
> > +
> > +      if (syncs < multiSteps && (i % (vertices.size() / multiSteps))
==
> > 0) {
> > +        peer.sync();
> > +        syncs++;
> > +        GraphJobMessage msg = null;
> > +        while ((msg = peer.getCurrentMessage()) != null) {
> > +          V vertexName = (V) msg.getVertexId();
> >
> > -      peer.sync();
> > -      GraphJobMessage msg = null;
> > -      while ((msg = peer.getCurrentMessage()) != null) {
> > -        V vertexName = (V) msg.getVertexId();
> > -        if (!vertices.containsKey(vertexName)) {
> >            Vertex<V, E, M> newVertex = newVertexInstance(vertexClass,
> > conf);
> >            newVertex.setVertexID(vertexName);
> >            newVertex.runner = this;
> > @@ -477,18 +464,41 @@ public final class GraphJobRunner<V exte
> >              newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> >            }
> >            newVertex.setup(conf);
> > -          vertices.put(vertexName, newVertex);
> > -          newVertex = null;
> > +          tmp.put(vertexName, newVertex);
> > +
> >          }
> >        }
> > +      i++;
> > +    }
> > +
> > +    peer.sync();
> > +    GraphJobMessage msg = null;
> > +    while ((msg = peer.getCurrentMessage()) != null) {
> > +      V vertexName = (V) msg.getVertexId();
> >
> > -      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
> > -        vertices.put(e.getKey(), e.getValue());
> > +      Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
> > +      newVertex.setVertexID(vertexName);
> > +      newVertex.runner = this;
> > +      if (selfReference) {
> > +        newVertex.setEdges(Collections.singletonList(new Edge<V,
> > E>(newVertex
> > +            .getVertexID(), null)));
> > +      } else {
> > +        newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
> >        }
> > -      tmp.clear();
> > +      newVertex.setup(conf);
> > +      tmp.put(vertexName, newVertex);
> > +      newVertex = null;
> > +
> >      }
> >
> > -    LOG.debug("Starting Vertex processing!");
> > +    for (Vertex<V, E, M> e : vertices) {
> > +      if (tmp.containsKey((e.getVertexID()))) {
> > +        tmp.remove(e.getVertexID());
> > +      }
> > +    }
> > +
> > +    vertices.addAll(tmp.values());
> > +    tmp.clear();
> >    }
> >
> >    /**
> >
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message