hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edwardy...@apache.org
Subject svn commit: r1382652 - in /hama/trunk/graph/src/main/java/org/apache/hama/graph: GraphJobRunner.java GraphJobRunnerBase.java
Date Mon, 10 Sep 2012 06:31:51 GMT
Author: edwardyoon
Date: Mon Sep 10 06:31:51 2012
New Revision: 1382652

URL: http://svn.apache.org/viewvc?rev=1382652&view=rev
Log:
Moved all except bsp(), setup(), and clean() to abstract class GraphJobRunnerBase.

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
Modified:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1382652&r1=1382651&r2=1382652&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Mon Sep 10 06:31:51
2012
@@ -18,30 +18,20 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.util.KeyValuePair;
 
 /**
  * Fully generic graph job runner.
@@ -51,51 +41,7 @@ import org.apache.hama.util.KeyValuePair
  * @param <M> the value type of a vertex.
  */
 public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable>
-    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
-
-  static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
-
-  // make sure that these values don't collide with the vertex names
-  private static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
-  private static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
-  private static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
-
-  private static final Text FLAG_MESSAGE_COUNTS = new Text(
-      S_FLAG_MESSAGE_COUNTS);
-
-  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
-  public static final String GRAPH_REPAIR = "hama.graph.repair";
-
-  private Configuration conf;
-  private Combiner<M> combiner;
-  private Partitioner<V, M> partitioner;
-
-  // multiple aggregator arrays
-  private Aggregator<M, Vertex<V, E, M>>[] aggregators;
-  private Writable[] globalAggregatorResult;
-  private IntWritable[] globalAggregatorIncrement;
-  private boolean[] isAbstractAggregator;
-  private String[] aggregatorClassNames;
-  private Text[] aggregatorValueFlag;
-  private Text[] aggregatorIncrementFlag;
-  // aggregator on the master side
-  private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
-
-  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
M>>();
-
-  private String masterTask;
-  private boolean updated = true;
-  private int globalUpdateCounts = 0;
-
-  private long numberVertices = 0;
-  // -1 is deactivated
-  private int maxIteration = -1;
-  private long iteration;
-
-  private Class<V> vertexIdClass;
-  private Class<M> vertexValueClass;
-  private Class<E> edgeValueClass;
-  private Class<Vertex<V, E, M>> vertexClass;
+    extends GraphJobRunnerBase<V, E, M> {
 
   @Override
   @SuppressWarnings("unchecked")
@@ -105,6 +51,8 @@ public final class GraphJobRunner<V exte
     this.conf = peer.getConfiguration();
     // Choose one as a master to collect global updates
     this.masterTask = peer.getPeerName(0);
+    maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
+        -1);
 
     vertexIdClass = (Class<V>) conf.getClass(GraphJob.VERTEX_ID_CLASS_ATTR,
         Text.class, Writable.class);
@@ -168,12 +116,12 @@ public final class GraphJobRunner<V exte
         .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
             VertexInputReader.class), conf);
 
-    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader);
-   
+    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader, this);
+    
     for (String peerName : peer.getAllPeerNames()) {
       peer.send(peerName, new GraphJobMessage(new IntWritable(vertices.size())));
     }
-    
+
     peer.sync();
 
     GraphJobMessage msg = null;
@@ -182,6 +130,7 @@ public final class GraphJobRunner<V exte
         numberVertices += msg.getVerticesSize().get();
       }
     }
+    
     // TODO refactor this to a single step
     for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
       LinkedList<M> msgIterator = new LinkedList<M>();
@@ -210,9 +159,6 @@ public final class GraphJobRunner<V exte
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
 
-    maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
-        -1);
-
     while (updated && !((maxIteration > 0) && iteration > maxIteration))
{
       globalUpdateCounts = 0;
       peer.sync();
@@ -315,352 +261,6 @@ public final class GraphJobRunner<V exte
     }
   }
 
-  private void runAggregators(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      int activeVertices) throws IOException {
-    // send msgCounts to the master task
-    MapWritable updatedCnt = new MapWritable();
-    updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
-    // also send aggregated values to the master
-    if (aggregators != null) {
-      for (int i = 0; i < this.aggregators.length; i++) {
-        updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
-        if (isAbstractAggregator[i]) {
-          updatedCnt.put(aggregatorIncrementFlag[i],
-              ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
-                  .getTimesAggregated());
-        }
-      }
-      for (int i = 0; i < aggregators.length; i++) {
-        // now create new aggregators for the next iteration
-        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
-        if (isMasterTask(peer)) {
-          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
-        }
-      }
-    }
-    peer.send(masterTask, new GraphJobMessage(updatedCnt));
-  }
-
-  @SuppressWarnings("unchecked")
-  private Map<V, LinkedList<M>> parseMessages(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
-      throws IOException {
-    GraphJobMessage msg = null;
-    final Map<V, LinkedList<M>> msgMap = new HashMap<V, LinkedList<M>>();
-    while ((msg = peer.getCurrentMessage()) != null) {
-      // either this is a vertex message or a directive that must be read
-      // as map
-      if (msg.isVertexMessage()) {
-        final V vertexID = (V) msg.getVertexId();
-        final M value = (M) msg.getVertexValue();
-        LinkedList<M> msgs = msgMap.get(vertexID);
-        if (msgs == null) {
-          msgs = new LinkedList<M>();
-          msgMap.put(vertexID, msgs);
-        }
-        msgs.add(value);
-      } else if (msg.isMapMessage()) {
-        for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
-          Text vertexID = (Text) e.getKey();
-          if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
-            if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
-              updated = false;
-            } else {
-              globalUpdateCounts += ((IntWritable) e.getValue()).get();
-            }
-          } else if (aggregators != null
-              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
-            int index = Integer.parseInt(vertexID.toString().split(";")[1]);
-            masterAggregator[index].aggregate(null, (M) e.getValue());
-          } else if (aggregators != null
-              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
-            int index = Integer.parseInt(vertexID.toString().split(";")[1]);
-            if (isAbstractAggregator[index]) {
-              ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
-                  .addTimesAggregated(((IntWritable) e.getValue()).get());
-            }
-          }
-        }
-      } else {
-        throw new UnsupportedOperationException("Unknown message type? " + msg);
-      }
-
-    }
-    return msgMap;
-  }
-
-  @SuppressWarnings("unchecked")
-  private void loadVertices(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      boolean repairNeeded, boolean runtimePartitioning,
-      Partitioner<V, M> partitioner,
-      VertexInputReader<Writable, Writable, V, E, M> reader)
-      throws IOException, SyncException, InterruptedException {
-
-    // //////////////////////////////////
-    long splitSize = peer.getSplitSize();
-    int partitioningSteps = computeMultiSteps(peer, splitSize);
-    long interval = splitSize / partitioningSteps;
-    // //////////////////////////////////
-
-    LOG.debug("vertex class: " + vertexClass);
-    boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
-    Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
-    vertex.setPeer(peer);
-    vertex.runner = this;
-
-    long startPos = peer.getPos();
-    if (startPos == 0)
-      startPos = 1L;
-
-    KeyValuePair<Writable, Writable> next = null;
-    int steps = 1;
-    while ((next = peer.readNext()) != null) {
-      boolean vertexFinished = reader.parseVertex(next.getKey(),
-          next.getValue(), vertex);
-      if (!vertexFinished) {
-        continue;
-      }
-
-      if (vertex.getEdges() == null) {
-        vertex.setEdges(new ArrayList<Edge<V, E>>(0));
-      }
-      if (selfReference) {
-        vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), peer.getPeerName(),
-            null));
-      }
-      if (runtimePartitioning) {
-        int partition = partitioner.getPartition(vertex.getVertexID(),
-            vertex.getValue(), peer.getNumPeers());
-        // set the destination name for the edge now
-        for (Edge<V, E> edge : vertex.getEdges()) {
-          int edgePartition = partitioner.getPartition(
-              edge.getDestinationVertexID(), (M) edge.getValue(),
-              peer.getNumPeers());
-          edge.destinationPeerName = peer.getPeerName(edgePartition);
-        }
-        peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
-      } else {
-        // FIXME need to set destination names
-        vertex.setup(conf);
-        vertices.put(vertex.getVertexID(), vertex);
-      }
-      vertex = newVertexInstance(vertexClass, conf);
-      vertex.setPeer(peer);
-      vertex.runner = this;
-
-      if (runtimePartitioning) {
-        if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval)
{
-          peer.sync();
-          steps++;
-          GraphJobMessage msg = null;
-          while ((msg = peer.getCurrentMessage()) != null) {
-            Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-            messagedVertex.setPeer(peer);
-            messagedVertex.runner = this;
-            messagedVertex.setup(conf);
-            vertices.put(messagedVertex.getVertexID(), messagedVertex);
-          }
-          startPos = peer.getPos();
-        }
-      }
-    }
-
-    if (runtimePartitioning) {
-      peer.sync();
-
-      GraphJobMessage msg = null;
-      while ((msg = peer.getCurrentMessage()) != null) {
-        Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
-        messagedVertex.setPeer(peer);
-        messagedVertex.runner = this;
-        messagedVertex.setup(conf);
-        vertices.put(messagedVertex.getVertexID(), messagedVertex);
-      }
-    }
-    LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
-
-    /*
-     * If the user want to repair the graph, it should traverse through that
-     * local chunk of adjancency list and message the corresponding peer to
-     * check whether that vertex exists. In real-life this may be dead-ending
-     * vertices, since we have no information about outgoing edges. Mainly this
-     * procedure is to prevent NullPointerExceptions from happening.
-     */
-    if (repairNeeded) {
-      LOG.debug("Starting repair of this graph!");
-      
-      int multiSteps = 0;
-      MapWritable ssize = new MapWritable();
-      ssize
-          .put(new IntWritable(peer.getPeerIndex()), new IntWritable(vertices.size()));
-      peer.send(masterTask, new GraphJobMessage(ssize));
-      ssize = null;
-      peer.sync();
-
-      if (this.isMasterTask(peer)) {
-        int minVerticesSize = Integer.MAX_VALUE;
-        GraphJobMessage received = null;
-        while ((received = peer.getCurrentMessage()) != null) {
-          MapWritable x = received.getMap();
-          for (Entry<Writable, Writable> e : x.entrySet()) {
-            int curr = ((IntWritable) e.getValue()).get();
-            if (minVerticesSize > curr) {
-              minVerticesSize = curr;
-            }
-          }
-        }
-
-        if(minVerticesSize < (partitioningSteps * 2)) {
-          multiSteps = minVerticesSize;
-        } else {
-          multiSteps = (partitioningSteps * 2);
-        }
-
-        for (String peerName : peer.getAllPeerNames()) {
-          MapWritable temp = new MapWritable();
-          temp.put(new Text("steps"), new IntWritable(multiSteps));
-          peer.send(peerName, new GraphJobMessage(temp));
-        }
-      }
-      peer.sync();
-
-      GraphJobMessage received = peer.getCurrentMessage();
-      MapWritable x = received.getMap();
-      for (Entry<Writable, Writable> e : x.entrySet()) {
-        multiSteps = ((IntWritable) e.getValue()).get();
-      }
-      
-      Set<V> keys = vertices.keySet();
-      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
-
-      int i = 0;
-      int syncs = 0;
-      for (V v : keys) {
-        for (Edge<V, E> e : vertices.get(v).getEdges()) {
-          peer.send(e.getDestinationPeerName(),
-              new GraphJobMessage(e.getDestinationVertexID()));
-        }
-
-        if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
-          peer.sync();
-          syncs++;
-          GraphJobMessage msg = null;
-          while ((msg = peer.getCurrentMessage()) != null) {
-            V vertexName = (V) msg.getVertexId();
-            if (!vertices.containsKey(vertexName)) {
-              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
-              newVertex.setPeer(peer);
-              newVertex.setVertexID(vertexName);
-              newVertex.runner = this;
-              if (selfReference) {
-                int partition = partitioner.getPartition(
-                    newVertex.getVertexID(), newVertex.getValue(),
-                    peer.getNumPeers());
-                String target = peer.getPeerName(partition);
-                newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
-                    newVertex.getVertexID(), target, null)));
-              } else {
-                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
-              }
-              newVertex.setup(conf);
-              tmp.put(vertexName, newVertex);
-            }
-          }
-        }
-        i++;
-      }
-
-      peer.sync();
-      GraphJobMessage msg = null;
-      while ((msg = peer.getCurrentMessage()) != null) {
-        V vertexName = (V) msg.getVertexId();
-        if (!vertices.containsKey(vertexName)) {
-          Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
-          newVertex.setPeer(peer);
-          newVertex.setVertexID(vertexName);
-          newVertex.runner = this;
-          if (selfReference) {
-            int partition = partitioner.getPartition(newVertex.getVertexID(),
-                newVertex.getValue(), peer.getNumPeers());
-            String target = peer.getPeerName(partition);
-            newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
-                newVertex.getVertexID(), target, null)));
-          } else {
-            newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
-          }
-          newVertex.setup(conf);
-          vertices.put(vertexName, newVertex);
-          newVertex = null;
-        }
-      }
-      
-      for(Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
-        vertices.put(e.getKey(), e.getValue());
-      }
-      tmp.clear();
-    }
-
-    LOG.debug("Starting Vertex processing!");
-  }
-
-  private int computeMultiSteps(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
-      long splitSize) throws IOException, SyncException, InterruptedException {
-    int multiSteps = 1;
-
-    MapWritable ssize = new MapWritable();
-    ssize
-        .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize));
-    peer.send(masterTask, new GraphJobMessage(ssize));
-    ssize = null;
-    peer.sync();
-
-    if (this.isMasterTask(peer)) {
-      long maxSplitSize = 0L;
-      GraphJobMessage received = null;
-      while ((received = peer.getCurrentMessage()) != null) {
-        MapWritable x = received.getMap();
-        for (Entry<Writable, Writable> e : x.entrySet()) {
-          long curr = ((LongWritable) e.getValue()).get();
-          if (maxSplitSize < curr) {
-            maxSplitSize = curr;
-          }
-        }
-      }
-
-      int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
-          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
-
-      for (String peerName : peer.getAllPeerNames()) {
-        MapWritable temp = new MapWritable();
-        temp.put(new Text("max"), new IntWritable(steps));
-        peer.send(peerName, new GraphJobMessage(temp));
-      }
-    }
-    peer.sync();
-
-    GraphJobMessage received = peer.getCurrentMessage();
-    MapWritable x = received.getMap();
-    for (Entry<Writable, Writable> e : x.entrySet()) {
-      multiSteps = ((IntWritable) e.getValue()).get();
-    }
-    LOG.info(peer.getPeerName() + ": " + multiSteps);
-    return multiSteps;
-  }
-
-  /**
-   * @return a new vertex instance
-   */
-  public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V,
E, M> newVertexInstance(
-      Class<?> vertexClass, Configuration conf) {
-    @SuppressWarnings("unchecked")
-    Vertex<V, E, M> vertex = (Vertex<V, E, M>) ReflectionUtils.newInstance(
-        vertexClass, conf);
-    return vertex;
-  }
-
   /**
    * Just write <ID as Writable, Value as Writable> pair as a result. Note that
    * this will also be executed when failure happened.
@@ -674,45 +274,4 @@ public final class GraphJobRunner<V exte
     }
   }
 
-  @SuppressWarnings("unchecked")
-  private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
-    try {
-      return (Aggregator<M, Vertex<V, E, M>>) ReflectionUtils.newInstance(
-          conf.getClassByName(clsName), conf);
-    } catch (ClassNotFoundException e) {
-      e.printStackTrace();
-    }
-    throw new IllegalArgumentException("Aggregator class " + clsName
-        + " could not be found or instantiated!");
-  }
-
-  private boolean isMasterTask(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
-    return peer.getPeerName().equals(masterTask);
-  }
-
-  public final long getNumberVertices() {
-    return numberVertices;
-  }
-
-  public final long getNumberIterations() {
-    return iteration;
-  }
-
-  public final int getMaxIteration() {
-    return maxIteration;
-  }
-
-  public Partitioner<V, M> getPartitioner() {
-    return partitioner;
-  }
-
-  public final Writable getLastAggregatedValue(int index) {
-    return globalAggregatorResult[index];
-  }
-
-  public final IntWritable getNumLastAggregatedVertices(int index) {
-    return globalAggregatorIncrement[index];
-  }
-
 }

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java?rev=1382652&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java Mon Sep 10
06:31:51 2012
@@ -0,0 +1,463 @@
+package org.apache.hama.graph;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSP;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.Combiner;
+import org.apache.hama.bsp.Partitioner;
+import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
+
+public abstract class GraphJobRunnerBase<V extends Writable, E extends Writable, M extends
Writable>
+    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
+
+  static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+
+  // make sure that these values don't collide with the vertex names
+  protected static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
+  protected static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
+  protected static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+
+  protected static final Text FLAG_MESSAGE_COUNTS = new Text(
+      S_FLAG_MESSAGE_COUNTS);
+
+  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  public static final String GRAPH_REPAIR = "hama.graph.repair";
+
+  protected Configuration conf;
+  protected Combiner<M> combiner;
+  protected Partitioner<V, M> partitioner;
+
+  // multiple aggregator arrays
+  protected Aggregator<M, Vertex<V, E, M>>[] aggregators;
+  protected Writable[] globalAggregatorResult;
+  protected IntWritable[] globalAggregatorIncrement;
+  protected boolean[] isAbstractAggregator;
+  protected String[] aggregatorClassNames;
+  protected Text[] aggregatorValueFlag;
+  protected Text[] aggregatorIncrementFlag;
+  // aggregator on the master side
+  protected Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+
+  protected Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V,
E, M>>();
+
+  protected String masterTask;
+  protected boolean updated = true;
+  protected int globalUpdateCounts = 0;
+
+  protected long numberVertices = 0;
+  // -1 is deactivated
+  protected int maxIteration = -1;
+  protected long iteration;
+
+  protected Class<V> vertexIdClass;
+  protected Class<M> vertexValueClass;
+  protected Class<E> edgeValueClass;
+  protected Class<Vertex<V, E, M>> vertexClass;
+
+  @SuppressWarnings("unchecked")
+  protected void loadVertices(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      boolean repairNeeded, boolean runtimePartitioning,
+      Partitioner<V, M> partitioner,
+      VertexInputReader<Writable, Writable, V, E, M> reader,
+      GraphJobRunner<V, E, M> graphJobRunner) throws IOException,
+      SyncException, InterruptedException {
+
+    // //////////////////////////////////
+    long splitSize = peer.getSplitSize();
+    int partitioningSteps = computeMultiSteps(peer, splitSize);
+    long interval = splitSize / partitioningSteps;
+    // //////////////////////////////////
+
+    LOG.debug("vertex class: " + vertexClass);
+    boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
+    Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
+    vertex.setPeer(peer);
+    vertex.runner = graphJobRunner;
+
+    long startPos = peer.getPos();
+    if (startPos == 0)
+      startPos = 1L;
+
+    KeyValuePair<Writable, Writable> next = null;
+    int steps = 1;
+    while ((next = peer.readNext()) != null) {
+      boolean vertexFinished = reader.parseVertex(next.getKey(),
+          next.getValue(), vertex);
+      if (!vertexFinished) {
+        continue;
+      }
+
+      if (vertex.getEdges() == null) {
+        vertex.setEdges(new ArrayList<Edge<V, E>>(0));
+      }
+      if (selfReference) {
+        vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), peer.getPeerName(),
+            null));
+      }
+      if (runtimePartitioning) {
+        int partition = partitioner.getPartition(vertex.getVertexID(),
+            vertex.getValue(), peer.getNumPeers());
+        // set the destination name for the edge now
+        for (Edge<V, E> edge : vertex.getEdges()) {
+          int edgePartition = partitioner.getPartition(
+              edge.getDestinationVertexID(), (M) edge.getValue(),
+              peer.getNumPeers());
+          edge.destinationPeerName = peer.getPeerName(edgePartition);
+        }
+        peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
+      } else {
+        // FIXME need to set destination names
+        vertex.setup(conf);
+        vertices.put(vertex.getVertexID(), vertex);
+      }
+      vertex = newVertexInstance(vertexClass, conf);
+      vertex.setPeer(peer);
+      vertex.runner = graphJobRunner;
+
+      if (runtimePartitioning) {
+        if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval)
{
+          peer.sync();
+          steps++;
+          GraphJobMessage msg = null;
+          while ((msg = peer.getCurrentMessage()) != null) {
+            Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+            messagedVertex.setPeer(peer);
+            messagedVertex.runner = graphJobRunner;
+            messagedVertex.setup(conf);
+            vertices.put(messagedVertex.getVertexID(), messagedVertex);
+          }
+          startPos = peer.getPos();
+        }
+      }
+    }
+
+    if (runtimePartitioning) {
+      peer.sync();
+
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+        messagedVertex.setPeer(peer);
+        messagedVertex.runner = graphJobRunner;
+        messagedVertex.setup(conf);
+        vertices.put(messagedVertex.getVertexID(), messagedVertex);
+      }
+    }
+    LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
+
+    /*
+     * If the user want to repair the graph, it should traverse through that
+     * local chunk of adjancency list and message the corresponding peer to
+     * check whether that vertex exists. In real-life this may be dead-ending
+     * vertices, since we have no information about outgoing edges. Mainly this
+     * procedure is to prevent NullPointerExceptions from happening.
+     */
+    if (repairNeeded) {
+      LOG.debug("Starting repair of this graph!");
+
+      int multiSteps = 0;
+      MapWritable ssize = new MapWritable();
+      ssize.put(new IntWritable(peer.getPeerIndex()),
+          new IntWritable(vertices.size()));
+      peer.send(masterTask, new GraphJobMessage(ssize));
+      ssize = null;
+      peer.sync();
+
+      if (this.isMasterTask(peer)) {
+        int minVerticesSize = Integer.MAX_VALUE;
+        GraphJobMessage received = null;
+        while ((received = peer.getCurrentMessage()) != null) {
+          MapWritable x = received.getMap();
+          for (Entry<Writable, Writable> e : x.entrySet()) {
+            int curr = ((IntWritable) e.getValue()).get();
+            if (minVerticesSize > curr) {
+              minVerticesSize = curr;
+            }
+          }
+        }
+
+        if (minVerticesSize < (partitioningSteps * 2)) {
+          multiSteps = minVerticesSize;
+        } else {
+          multiSteps = (partitioningSteps * 2);
+        }
+
+        for (String peerName : peer.getAllPeerNames()) {
+          MapWritable temp = new MapWritable();
+          temp.put(new Text("steps"), new IntWritable(multiSteps));
+          peer.send(peerName, new GraphJobMessage(temp));
+        }
+      }
+      peer.sync();
+
+      GraphJobMessage received = peer.getCurrentMessage();
+      MapWritable x = received.getMap();
+      for (Entry<Writable, Writable> e : x.entrySet()) {
+        multiSteps = ((IntWritable) e.getValue()).get();
+      }
+
+      Set<V> keys = vertices.keySet();
+      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
+
+      int i = 0;
+      int syncs = 0;
+      for (V v : keys) {
+        for (Edge<V, E> e : vertices.get(v).getEdges()) {
+          peer.send(e.getDestinationPeerName(),
+              new GraphJobMessage(e.getDestinationVertexID()));
+        }
+
+        if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+          peer.sync();
+          syncs++;
+          GraphJobMessage msg = null;
+          while ((msg = peer.getCurrentMessage()) != null) {
+            V vertexName = (V) msg.getVertexId();
+            if (!vertices.containsKey(vertexName)) {
+              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+              newVertex.setPeer(peer);
+              newVertex.setVertexID(vertexName);
+              newVertex.runner = graphJobRunner;
+              if (selfReference) {
+                int partition = partitioner.getPartition(
+                    newVertex.getVertexID(), newVertex.getValue(),
+                    peer.getNumPeers());
+                String target = peer.getPeerName(partition);
+                newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+                    newVertex.getVertexID(), target, null)));
+              } else {
+                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
+              }
+              newVertex.setup(conf);
+              tmp.put(vertexName, newVertex);
+            }
+          }
+        }
+        i++;
+      }
+
+      peer.sync();
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        V vertexName = (V) msg.getVertexId();
+        if (!vertices.containsKey(vertexName)) {
+          Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+          newVertex.setPeer(peer);
+          newVertex.setVertexID(vertexName);
+          newVertex.runner = graphJobRunner;
+          if (selfReference) {
+            int partition = partitioner.getPartition(newVertex.getVertexID(),
+                newVertex.getValue(), peer.getNumPeers());
+            String target = peer.getPeerName(partition);
+            newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+                newVertex.getVertexID(), target, null)));
+          } else {
+            newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
+          }
+          newVertex.setup(conf);
+          vertices.put(vertexName, newVertex);
+          newVertex = null;
+        }
+      }
+
+      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
+        vertices.put(e.getKey(), e.getValue());
+      }
+      tmp.clear();
+    }
+
+    LOG.debug("Starting Vertex processing!");
+  }
+
+  protected int computeMultiSteps(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      long splitSize) throws IOException, SyncException, InterruptedException {
+    int multiSteps = 1;
+
+    MapWritable ssize = new MapWritable();
+    ssize
+        .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize));
+    peer.send(masterTask, new GraphJobMessage(ssize));
+    ssize = null;
+    peer.sync();
+
+    if (this.isMasterTask(peer)) {
+      long maxSplitSize = 0L;
+      GraphJobMessage received = null;
+      while ((received = peer.getCurrentMessage()) != null) {
+        MapWritable x = received.getMap();
+        for (Entry<Writable, Writable> e : x.entrySet()) {
+          long curr = ((LongWritable) e.getValue()).get();
+          if (maxSplitSize < curr) {
+            maxSplitSize = curr;
+          }
+        }
+      }
+
+      int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
+
+      for (String peerName : peer.getAllPeerNames()) {
+        MapWritable temp = new MapWritable();
+        temp.put(new Text("max"), new IntWritable(steps));
+        peer.send(peerName, new GraphJobMessage(temp));
+      }
+    }
+    peer.sync();
+
+    GraphJobMessage received = peer.getCurrentMessage();
+    MapWritable x = received.getMap();
+    for (Entry<Writable, Writable> e : x.entrySet()) {
+      multiSteps = ((IntWritable) e.getValue()).get();
+    }
+    LOG.info(peer.getPeerName() + ": " + multiSteps);
+    return multiSteps;
+  }
+
+  /**
+   * @return a new vertex instance
+   */
+  public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V,
E, M> newVertexInstance(
+      Class<?> vertexClass, Configuration conf) {
+    @SuppressWarnings("unchecked")
+    Vertex<V, E, M> vertex = (Vertex<V, E, M>) ReflectionUtils.newInstance(
+        vertexClass, conf);
+    return vertex;
+  }
+
+  protected void runAggregators(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      int activeVertices) throws IOException {
+    // send msgCounts to the master task
+    MapWritable updatedCnt = new MapWritable();
+    updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(activeVertices));
+    // also send aggregated values to the master
+    if (aggregators != null) {
+      for (int i = 0; i < this.aggregators.length; i++) {
+        updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+        if (isAbstractAggregator[i]) {
+          updatedCnt.put(aggregatorIncrementFlag[i],
+              ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+                  .getTimesAggregated());
+        }
+      }
+      for (int i = 0; i < aggregators.length; i++) {
+        // now create new aggregators for the next iteration
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        if (isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+        }
+      }
+    }
+    peer.send(masterTask, new GraphJobMessage(updatedCnt));
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Map<V, LinkedList<M>> parseMessages(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
+    GraphJobMessage msg = null;
+    final Map<V, LinkedList<M>> msgMap = new HashMap<V, LinkedList<M>>();
+    while ((msg = peer.getCurrentMessage()) != null) {
+      // either this is a vertex message or a directive that must be read
+      // as map
+      if (msg.isVertexMessage()) {
+        final V vertexID = (V) msg.getVertexId();
+        final M value = (M) msg.getVertexValue();
+        LinkedList<M> msgs = msgMap.get(vertexID);
+        if (msgs == null) {
+          msgs = new LinkedList<M>();
+          msgMap.put(vertexID, msgs);
+        }
+        msgs.add(value);
+      } else if (msg.isMapMessage()) {
+        for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
+          Text vertexID = (Text) e.getKey();
+          if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
+            if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+              updated = false;
+            } else {
+              globalUpdateCounts += ((IntWritable) e.getValue()).get();
+            }
+          } else if (aggregators != null
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+            int index = Integer.parseInt(vertexID.toString().split(";")[1]);
+            masterAggregator[index].aggregate(null, (M) e.getValue());
+          } else if (aggregators != null
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+            int index = Integer.parseInt(vertexID.toString().split(";")[1]);
+            if (isAbstractAggregator[index]) {
+              ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+                  .addTimesAggregated(((IntWritable) e.getValue()).get());
+            }
+          }
+        }
+      } else {
+        throw new UnsupportedOperationException("Unknown message type? " + msg);
+      }
+
+    }
+    return msgMap;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
+    try {
+      return (Aggregator<M, Vertex<V, E, M>>) ReflectionUtils.newInstance(
+          conf.getClassByName(clsName), conf);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    throw new IllegalArgumentException("Aggregator class " + clsName
+        + " could not be found or instantiated!");
+  }
+
+  protected boolean isMasterTask(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    return peer.getPeerName().equals(masterTask);
+  }
+
+  public final long getNumberVertices() {
+    return numberVertices;
+  }
+
+  public final long getNumberIterations() {
+    return iteration;
+  }
+
+  public final int getMaxIteration() {
+    return maxIteration;
+  }
+
+  public Partitioner<V, M> getPartitioner() {
+    return partitioner;
+  }
+
+  public final Writable getLastAggregatedValue(int index) {
+    return globalAggregatorResult[index];
+  }
+
+  public final IntWritable getNumLastAggregatedVertices(int index) {
+    return globalAggregatorIncrement[index];
+  }
+
+}



Mime
View raw message