hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tjungb...@apache.org
Subject svn commit: r1383375 - in /hama/trunk: ./ graph/src/main/java/org/apache/hama/graph/
Date Tue, 11 Sep 2012 12:15:22 GMT
Author: tjungblut
Date: Tue Sep 11 12:15:21 2012
New Revision: 1383375

URL: http://svn.apache.org/viewvc?rev=1383375&view=rev
Log:
[HAMA-597]: Split a GraphJobRunner into multiple classes Part 1

Added:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
Removed:
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunnerBase.java
Modified:
    hama/trunk/CHANGES.txt
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
    hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java

Modified: hama/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hama/trunk/CHANGES.txt?rev=1383375&r1=1383374&r2=1383375&view=diff
==============================================================================
--- hama/trunk/CHANGES.txt (original)
+++ hama/trunk/CHANGES.txt Tue Sep 11 12:15:21 2012
@@ -13,6 +13,7 @@ Release 0.6 (unreleased changes)
 
   IMPROVEMENTS
 
+   HAMA-597: Split a GraphJobRunner into multiple classes (edwardyoon & tjungblut) 
    HAMA-557: Implement Checkpointing service in Hama (surajmenon)
    HAMA-587: Synchronization Client should provide API's to store and retrieve information
among peers and BSPMaster (surajmenon)
    HAMA-610: Provision to define the task allocation strategy as a feature of job. (surajmenon)

Added: hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java?rev=1383375&view=auto
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java (added)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/AggregationRunner.java Tue Sep 11
12:15:21 2012
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.graph;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.MapWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSPPeer;
+import org.apache.hama.bsp.sync.SyncException;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Runner class to do the tasks that need to be done if aggregation was
+ * configured.
+ * 
+ */
+public final class AggregationRunner<V extends Writable, E extends Writable, M extends
Writable> {
+
+  // multiple aggregator arrays
+  private Aggregator<M, Vertex<V, E, M>>[] aggregators;
+  private Writable[] globalAggregatorResult;
+  private IntWritable[] globalAggregatorIncrement;
+  private boolean[] isAbstractAggregator;
+  private String[] aggregatorClassNames;
+  private Text[] aggregatorValueFlag;
+  private Text[] aggregatorIncrementFlag;
+  // aggregator on the master side
+  private Aggregator<M, Vertex<V, E, M>>[] masterAggregator;
+
+  private boolean enabled = false;
+  private Configuration conf;
+
+  @SuppressWarnings("unchecked")
+  public void setupAggregators(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    this.conf = peer.getConfiguration();
+    String aggregatorClasses = peer.getConfiguration().get(
+        GraphJob.AGGREGATOR_CLASS_ATTR);
+    if (aggregatorClasses != null) {
+      enabled = true;
+      aggregatorClassNames = aggregatorClasses.split(";");
+      // init to the split size
+      aggregators = new Aggregator[aggregatorClassNames.length];
+      globalAggregatorResult = new Writable[aggregatorClassNames.length];
+      globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
+      isAbstractAggregator = new boolean[aggregatorClassNames.length];
+      aggregatorValueFlag = new Text[aggregatorClassNames.length];
+      aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
+      if (GraphJobRunner.isMasterTask(peer)) {
+        masterAggregator = new Aggregator[aggregatorClassNames.length];
+      }
+      for (int i = 0; i < aggregatorClassNames.length; i++) {
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        aggregatorValueFlag[i] = new Text(
+            GraphJobRunner.S_FLAG_AGGREGATOR_VALUE + ";" + i);
+        aggregatorIncrementFlag[i] = new Text(
+            GraphJobRunner.S_FLAG_AGGREGATOR_INCREMENT + ";" + i);
+        if (aggregators[i] instanceof AbstractAggregator) {
+          isAbstractAggregator[i] = true;
+        }
+        if (GraphJobRunner.isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * Runs the aggregators by sending their values to the master task.
+   */
+  public void sendAggregatorValues(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      int activeVertices) throws IOException {
+    // send msgCounts to the master task
+    MapWritable updatedCnt = new MapWritable();
+    updatedCnt.put(GraphJobRunner.FLAG_MESSAGE_COUNTS, new IntWritable(
+        activeVertices));
+    // also send aggregated values to the master
+    if (aggregators != null) {
+      for (int i = 0; i < this.aggregators.length; i++) {
+        updatedCnt.put(aggregatorValueFlag[i], aggregators[i].getValue());
+        if (isAbstractAggregator[i]) {
+          updatedCnt.put(aggregatorIncrementFlag[i],
+              ((AbstractAggregator<M, Vertex<V, E, M>>) aggregators[i])
+                  .getTimesAggregated());
+        }
+      }
+      for (int i = 0; i < aggregators.length; i++) {
+        // now create new aggregators for the next iteration
+        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
+        if (GraphJobRunner.isMasterTask(peer)) {
+          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
+        }
+      }
+    }
+    peer.send(GraphJobRunner.getMasterTask(peer), new GraphJobMessage(
+        updatedCnt));
+  }
+
+  /**
+   * Aggregates the last value before computation and the value after the
+   * computation.
+   * 
+   * @param lastValue the value before compute().
+   * @param v the vertex.
+   */
+  public void aggregateVertex(M lastValue, Vertex<V, E, M> v) {
+    if (isEnabled()) {
+      for (int i = 0; i < this.aggregators.length; i++) {
+        Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
+        aggregator.aggregate(v, v.getValue());
+        if (isAbstractAggregator[i]) {
+          AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
+          intern.aggregate(v, lastValue, v.getValue());
+          intern.aggregateInternal();
+        }
+      }
+    }
+  }
+
+  /**
+   * The method the master task does, it globally aggregates the values of each
+   * peer and updates the given map accordingly.
+   */
+  public void doMasterAggregation(MapWritable updatedCnt) {
+    if (isEnabled()) {
+      // work through the master aggregators
+      for (int i = 0; i < masterAggregator.length; i++) {
+        Writable lastAggregatedValue = masterAggregator[i].getValue();
+        if (isAbstractAggregator[i]) {
+          final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
+          final Writable finalizeAggregation = intern.finalizeAggregation();
+          if (intern.finalizeAggregation() != null) {
+            lastAggregatedValue = finalizeAggregation;
+          }
+          // this count is usually the times of active
+          // vertices in the graph
+          updatedCnt.put(aggregatorIncrementFlag[i],
+              intern.getTimesAggregated());
+        }
+        updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
+      }
+    }
+  }
+
+  /**
+   * Receives aggregated values from a master task, by doing an additional
+   * barrier sync and parsing the messages.
+   * 
+   * @return always true if no aggregators are defined, false if aggregators say
+   *         we haven't seen any messages anymore.
+   */
+  public boolean receiveAggregatedValues(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      long iteration) throws IOException, SyncException, InterruptedException {
+    // if we have an aggregator defined, we must make an additional sync
+    // to have the updated values available on all our peers.
+    if (isEnabled() && iteration > 1) {
+      peer.sync();
+
+      MapWritable updatedValues = peer.getCurrentMessage().getMap();
+      for (int i = 0; i < aggregators.length; i++) {
+        globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
+        globalAggregatorIncrement[i] = (IntWritable) updatedValues
+            .get(aggregatorIncrementFlag[i]);
+      }
+      IntWritable count = (IntWritable) updatedValues
+          .get(GraphJobRunner.FLAG_MESSAGE_COUNTS);
+      if (count != null && count.get() == Integer.MIN_VALUE) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @return true if aggregators were defined. Normally used by the internal
+   *         stateful methods, outside shouldn't use it too extensively.
+   */
+  public boolean isEnabled() {
+    return enabled;
+  }
+
+  /**
+   * Method to let the master read messages from peers and aggregate a value.
+   */
+  public void masterReadAggregatedValue(Text textIndex, M value) {
+    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+    masterAggregator[index].aggregate(null, value);
+  }
+
+  /**
+   * Method to let the master read messages from peers and aggregate the
+   * incremental value.
+   */
+  public void masterReadAggregatedIncrementalValue(Text textIndex, M value) {
+    int index = Integer.parseInt(textIndex.toString().split(";")[1]);
+    if (isAbstractAggregator[index]) {
+      ((AbstractAggregator<M, Vertex<V, E, M>>) masterAggregator[index])
+          .addTimesAggregated(((IntWritable) value).get());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private Aggregator<M, Vertex<V, E, M>> getNewAggregator(String clsName) {
+    try {
+      return (Aggregator<M, Vertex<V, E, M>>) ReflectionUtils.newInstance(
+          conf.getClassByName(clsName), conf);
+    } catch (ClassNotFoundException e) {
+      e.printStackTrace();
+    }
+    throw new IllegalArgumentException("Aggregator class " + clsName
+        + " could not be found or instantiated!");
+  }
+
+  public final Writable getLastAggregatedValue(int index) {
+    return globalAggregatorResult[Preconditions.checkPositionIndex(index,
+        globalAggregatorResult.length)];
+  }
+
+  public final IntWritable getNumLastAggregatedVertices(int index) {
+    return globalAggregatorIncrement[Preconditions.checkPositionIndex(index,
+        globalAggregatorIncrement.length)];
+  }
+}

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java?rev=1383375&r1=1383374&r2=1383375&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobMessage.java Tue Sep 11 12:15:21
2012
@@ -135,7 +135,7 @@ public final class GraphJobMessage imple
       map = new MapWritable();
       map.readFields(in);
     } else if (isPartitioningMessage()) {
-      Vertex<Writable, Writable, Writable> vertex = GraphJobRunnerBase
+      Vertex<Writable, Writable, Writable> vertex = GraphJobRunner
           .newVertexInstance(VERTEX_CLASS, null);
       Writable vertexId = ReflectionUtils.newInstance(VERTEX_ID_CLASS, null);
       vertexId.readFields(in);

Modified: hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java
URL: http://svn.apache.org/viewvc/hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java?rev=1383375&r1=1383374&r2=1383375&view=diff
==============================================================================
--- hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java (original)
+++ hama/trunk/graph/src/main/java/org/apache/hama/graph/GraphJobRunner.java Tue Sep 11 12:15:21
2012
@@ -18,20 +18,30 @@
 package org.apache.hama.graph;
 
 import java.io.IOException;
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hama.bsp.BSP;
 import org.apache.hama.bsp.BSPPeer;
 import org.apache.hama.bsp.Combiner;
 import org.apache.hama.bsp.HashPartitioner;
 import org.apache.hama.bsp.Partitioner;
 import org.apache.hama.bsp.sync.SyncException;
+import org.apache.hama.util.KeyValuePair;
 
 /**
  * Fully generic graph job runner.
@@ -41,17 +51,177 @@ import org.apache.hama.bsp.sync.SyncExce
  * @param <M> the value type of a vertex.
  */
 public final class GraphJobRunner<V extends Writable, E extends Writable, M extends Writable>
-    extends GraphJobRunnerBase<V, E, M> {
+    extends BSP<Writable, Writable, Writable, Writable, GraphJobMessage> {
+
+  private static final Log LOG = LogFactory.getLog(GraphJobRunner.class);
+
+  // make sure that these values don't collide with the vertex names
+  public static final String S_FLAG_MESSAGE_COUNTS = "hama.0";
+  public static final String S_FLAG_AGGREGATOR_VALUE = "hama.1";
+  public static final String S_FLAG_AGGREGATOR_INCREMENT = "hama.2";
+  public static final Text FLAG_MESSAGE_COUNTS = new Text(S_FLAG_MESSAGE_COUNTS);
+
+  public static final String MESSAGE_COMBINER_CLASS = "hama.vertex.message.combiner.class";
+  public static final String GRAPH_REPAIR = "hama.graph.repair";
+
+  private Configuration conf;
+  private Combiner<M> combiner;
+  private Partitioner<V, M> partitioner;
+
+  private Map<V, Vertex<V, E, M>> vertices = new HashMap<V, Vertex<V, E,
M>>();
+
+  private boolean updated = true;
+  private int globalUpdateCounts = 0;
+
+  private long numberVertices = 0;
+  // -1 is deactivated
+  private int maxIteration = -1;
+  private long iteration;
+
+  private Class<V> vertexIdClass;
+  private Class<M> vertexValueClass;
+  private Class<E> edgeValueClass;
+  private Class<Vertex<V, E, M>> vertexClass;
+
+  private AggregationRunner<V, E, M> aggregationRunner;
+
+  private BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer;
 
   @Override
-  @SuppressWarnings("unchecked")
   public final void setup(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException, SyncException, InterruptedException {
+
+    setupFields(peer);
+
+    loadVertices(peer);
+
+    countGlobalVertexCount(peer);
+
+    doInitialSuperstep(peer);
+
+  }
+
+  @Override
+  public final void bsp(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+
+    // we do supersteps while we still have updates and have not reached our
+    // maximum iterations yet
+    while (updated && !((maxIteration > 0) && iteration > maxIteration))
{
+      // reset the global update counter from our master in every superstep
+      globalUpdateCounts = 0;
+      peer.sync();
+
+      // note that the messages must be parsed here
+      final Map<V, List<M>> messages = parseMessages(peer);
+      // master needs to update
+      doMasterUpdates(peer);
+      // if aggregators say we don't have updates anymore, break
+      if (!aggregationRunner.receiveAggregatedValues(peer, iteration)) {
+        break;
+      }
+      // loop over vertices and do their computation
+      doSuperstep(messages, peer);
+    }
+  }
+
+  /**
+   * Just write <ID as Writable, Value as Writable> pair as a result. Note that
+   * this will also be executed when failure happened.
+   */
+  @Override
+  public final void cleanup(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
+    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
+      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+    }
+  }
+
+  /**
+   * The master task is going to check the number of updated vertices and do
+   * master aggregation. In case of no aggregators defined, we save a sync by
+   * reading multiple typed messages.
+   */
+  private void doMasterUpdates(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
+    if (isMasterTask(peer) && iteration > 1) {
+      MapWritable updatedCnt = new MapWritable();
+      // exit if there's no update made
+      if (globalUpdateCounts == 0) {
+        updatedCnt.put(FLAG_MESSAGE_COUNTS, new IntWritable(Integer.MIN_VALUE));
+      } else {
+        aggregationRunner.doMasterAggregation(updatedCnt);
+      }
+      // send the updates from the mater tasks back to the slaves
+      for (String peerName : peer.getAllPeerNames()) {
+        peer.send(peerName, new GraphJobMessage(updatedCnt));
+      }
+    }
+  }
+
+  /**
+   * Do the main logic of a superstep, namely checking if vertices are active,
+   * feeding compute with messages and controlling combiners/aggregators.
+   */
+  private void doSuperstep(Map<V, List<M>> messages,
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
+    int activeVertices = 0;
+    for (Vertex<V, E, M> vertex : vertices.values()) {
+      List<M> msgs = messages.get(vertex.getVertexID());
+      // If there are newly received messages, restart.
+      if (vertex.isHalted() && msgs != null) {
+        vertex.setActive();
+      }
+      if (msgs == null) {
+        msgs = Collections.emptyList();
+      }
+
+      if (!vertex.isHalted()) {
+        if (combiner != null) {
+          M combined = combiner.combine(msgs);
+          msgs = new ArrayList<M>();
+          msgs.add(combined);
+        }
+        M lastValue = vertex.getValue();
+        vertex.compute(msgs.iterator());
+        aggregationRunner.aggregateVertex(lastValue, vertex);
+        if (!vertex.isHalted()) {
+          activeVertices++;
+        }
+      }
+    }
+
+    aggregationRunner.sendAggregatorValues(peer, activeVertices);
+    iteration++;
+  }
+
+  /**
+   * Seed the vertices first with their own values in compute. This is the first
+   * superstep after the vertices have been loaded.
+   */
+  private void doInitialSuperstep(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException {
+    for (Vertex<V, E, M> vertex : vertices.values()) {
+      List<M> singletonList = Collections.singletonList(vertex.getValue());
+      M lastValue = vertex.getValue();
+      vertex.compute(singletonList.iterator());
+      aggregationRunner.aggregateVertex(lastValue, vertex);
+    }
+    aggregationRunner.sendAggregatorValues(peer, 1);
+    iteration++;
+  }
+
+  @SuppressWarnings("unchecked")
+  private void setupFields(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
     this.peer = peer;
     this.conf = peer.getConfiguration();
-    // Choose one as a master to collect global updates
-    this.masterTask = peer.getPeerName(0);
     maxIteration = peer.getConfiguration().getInt("hama.graph.max.iteration",
         -1);
 
@@ -65,14 +235,12 @@ public final class GraphJobRunner<V exte
     vertexClass = (Class<Vertex<V, E, M>>) conf.getClass(
         "hama.graph.vertex.class", Vertex.class);
 
+    // set the classes statically, so we can save memory per message
     GraphJobMessage.VERTEX_ID_CLASS = vertexIdClass;
     GraphJobMessage.VERTEX_VALUE_CLASS = vertexValueClass;
     GraphJobMessage.VERTEX_CLASS = vertexClass;
     GraphJobMessage.EDGE_VALUE_CLASS = edgeValueClass;
 
-    boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
-    boolean runtimePartitioning = conf.getBoolean(
-        GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
     partitioner = (Partitioner<V, M>) ReflectionUtils.newInstance(
         conf.getClass("bsp.input.partitioner.class", HashPartitioner.class),
         conf);
@@ -85,195 +253,429 @@ public final class GraphJobRunner<V exte
           conf.getClass("hama.vertex.message.combiner.class", Combiner.class),
           conf);
     }
-    String aggregatorClasses = conf.get(GraphJob.AGGREGATOR_CLASS_ATTR);
-    if (aggregatorClasses != null) {
-      LOG.debug("Aggregator classes: " + aggregatorClasses);
-      aggregatorClassNames = aggregatorClasses.split(";");
-      // init to the split size
-      aggregators = new Aggregator[aggregatorClassNames.length];
-      globalAggregatorResult = new Writable[aggregatorClassNames.length];
-      globalAggregatorIncrement = new IntWritable[aggregatorClassNames.length];
-      isAbstractAggregator = new boolean[aggregatorClassNames.length];
-      aggregatorValueFlag = new Text[aggregatorClassNames.length];
-      aggregatorIncrementFlag = new Text[aggregatorClassNames.length];
-      if (isMasterTask(peer)) {
-        masterAggregator = new Aggregator[aggregatorClassNames.length];
-      }
-      for (int i = 0; i < aggregatorClassNames.length; i++) {
-        aggregators[i] = getNewAggregator(aggregatorClassNames[i]);
-        aggregatorValueFlag[i] = new Text(S_FLAG_AGGREGATOR_VALUE + ";" + i);
-        aggregatorIncrementFlag[i] = new Text(S_FLAG_AGGREGATOR_INCREMENT + ";"
-            + i);
-        if (aggregators[i] instanceof AbstractAggregator) {
-          isAbstractAggregator[i] = true;
-        }
-        if (isMasterTask(peer)) {
-          masterAggregator[i] = getNewAggregator(aggregatorClassNames[i]);
-        }
-      }
-    }
 
-    VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
+    aggregationRunner = new AggregationRunner<V, E, M>();
+    aggregationRunner.setupAggregators(peer);
+  }
+
+  /**
+   * Loads vertices into memory of each peer. TODO this needs to be simplified.
+   */
+  @SuppressWarnings("unchecked")
+  private void loadVertices(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+
+    /*
+     * Several partitioning constants begin
+     */
+
+    final VertexInputReader<Writable, Writable, V, E, M> reader = (VertexInputReader<Writable,
Writable, V, E, M>) ReflectionUtils
         .newInstance(conf.getClass(GraphJob.VERTEX_GRAPH_INPUT_READER,
             VertexInputReader.class), conf);
 
-    loadVertices(peer, repairNeeded, runtimePartitioning, partitioner, reader,
-        this);
+    final boolean repairNeeded = conf.getBoolean(GRAPH_REPAIR, false);
+    final boolean runtimePartitioning = conf.getBoolean(
+        GraphJob.VERTEX_GRAPH_RUNTIME_PARTIONING, true);
 
-    for (String peerName : peer.getAllPeerNames()) {
-      peer.send(peerName, new GraphJobMessage(new IntWritable(vertices.size())));
-    }
+    final long splitSize = peer.getSplitSize();
+    final int partitioningSteps = partitionMultiSteps(peer, splitSize);
+    final long interval = splitSize / partitioningSteps;
+
+    final boolean selfReference = conf.getBoolean("hama.graph.self.ref", false);
+
+    /*
+     * Several partitioning constants end
+     */
+
+    LOG.debug("vertex class: " + vertexClass);
+    Vertex<V, E, M> vertex = newVertexInstance(vertexClass, conf);
+    vertex.runner = this;
+
+    long startPos = peer.getPos();
+    if (startPos == 0)
+      startPos = 1L;
+
+    KeyValuePair<Writable, Writable> next = null;
+    int steps = 1;
+    while ((next = peer.readNext()) != null) {
+      boolean vertexFinished = reader.parseVertex(next.getKey(),
+          next.getValue(), vertex);
+      if (!vertexFinished) {
+        continue;
+      }
 
-    peer.sync();
+      if (vertex.getEdges() == null) {
+        if (selfReference) {
+          vertex.setEdges(Collections.singletonList(new Edge<V, E>(vertex
+              .getVertexID(), null)));
+        } else {
+          vertex.setEdges(Collections.EMPTY_LIST);
+        }
+      }
 
-    GraphJobMessage msg = null;
-    while ((msg = peer.getCurrentMessage()) != null) {
-      if (msg.isVerticesSizeMessage()) {
-        numberVertices += msg.getVerticesSize().get();
+      if (selfReference) {
+        vertex.addEdge(new Edge<V, E>(vertex.getVertexID(), null));
       }
-    }
 
-    // TODO refactor this to a single step
-    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
-      LinkedList<M> msgIterator = new LinkedList<M>();
-      Vertex<V, E, M> v = e.getValue();
-      msgIterator.add(v.getValue());
-      M lastValue = v.getValue();
-      v.compute(msgIterator.iterator());
-      if (this.aggregators != null) {
-        for (int i = 0; i < this.aggregators.length; i++) {
-          Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-          aggregator.aggregate(v, v.getValue());
-          if (isAbstractAggregator[i]) {
-            AbstractAggregator<M, Vertex<V, E, M>> intern = (AbstractAggregator<M,
Vertex<V, E, M>>) aggregator;
-            intern.aggregate(v, lastValue, v.getValue());
-            intern.aggregateInternal();
+      if (runtimePartitioning) {
+        int partition = partitioner.getPartition(vertex.getVertexID(),
+            vertex.getValue(), peer.getNumPeers());
+        peer.send(peer.getPeerName(partition), new GraphJobMessage(vertex));
+      } else {
+        vertex.setup(conf);
+        vertices.put(vertex.getVertexID(), vertex);
+      }
+      vertex = newVertexInstance(vertexClass, conf);
+      vertex.runner = this;
+
+      if (runtimePartitioning) {
+        if (steps < partitioningSteps && (peer.getPos() - startPos) >= interval)
{
+          peer.sync();
+          steps++;
+          GraphJobMessage msg = null;
+          while ((msg = peer.getCurrentMessage()) != null) {
+            Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+            messagedVertex.runner = this;
+            messagedVertex.setup(conf);
+            vertices.put(messagedVertex.getVertexID(), messagedVertex);
           }
+          startPos = peer.getPos();
         }
       }
     }
-    runAggregators(peer, 1);
-    iteration++;
-  }
 
-  @Override
-  public final void bsp(
-      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
-      throws IOException, SyncException, InterruptedException {
+    if (runtimePartitioning) {
+      peer.sync();
 
-    while (updated && !((maxIteration > 0) && iteration > maxIteration))
{
-      globalUpdateCounts = 0;
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        Vertex<V, E, M> messagedVertex = (Vertex<V, E, M>) msg.getVertex();
+        messagedVertex.runner = this;
+        messagedVertex.setup(conf);
+        vertices.put(messagedVertex.getVertexID(), messagedVertex);
+      }
+    }
+    LOG.debug("Loading finished at " + peer.getSuperstepCount() + " steps.");
+
+    /*
+     * If the user want to repair the graph, it should traverse through that
+     * local chunk of adjancency list and message the corresponding peer to
+     * check whether that vertex exists. In real-life this may be dead-ending
+     * vertices, since we have no information about outgoing edges. Mainly this
+     * procedure is to prevent NullPointerExceptions from happening.
+     */
+    if (repairNeeded) {
+      LOG.debug("Starting repair of this graph!");
+
+      int multiSteps = 0;
+      MapWritable ssize = new MapWritable();
+      ssize.put(new IntWritable(peer.getPeerIndex()),
+          new IntWritable(vertices.size()));
+      peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
+      ssize = null;
       peer.sync();
 
-      // Map <vertexID, messages>
-      final Map<V, LinkedList<M>> messages = parseMessages(peer);
-      // use iterations here, since repair can skew the number of
-      // supersteps
-      if (isMasterTask(peer) && iteration > 1) {
-        MapWritable updatedCnt = new MapWritable();
-        // exit if there's no update made
-        if (globalUpdateCounts == 0) {
-          updatedCnt.put(FLAG_MESSAGE_COUNTS,
-              new IntWritable(Integer.MIN_VALUE));
-        } else {
-          if (aggregators != null) {
-            // work through the master aggregators
-            for (int i = 0; i < masterAggregator.length; i++) {
-              Writable lastAggregatedValue = masterAggregator[i].getValue();
-              if (isAbstractAggregator[i]) {
-                final AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) masterAggregator[i]);
-                final Writable finalizeAggregation = intern
-                    .finalizeAggregation();
-                if (intern.finalizeAggregation() != null) {
-                  lastAggregatedValue = finalizeAggregation;
-                }
-                // this count is usually the times of active
-                // vertices in the graph
-                updatedCnt.put(aggregatorIncrementFlag[i],
-                    intern.getTimesAggregated());
-              }
-              updatedCnt.put(aggregatorValueFlag[i], lastAggregatedValue);
+      if (isMasterTask(peer)) {
+        int minVerticesSize = Integer.MAX_VALUE;
+        GraphJobMessage received = null;
+        while ((received = peer.getCurrentMessage()) != null) {
+          MapWritable x = received.getMap();
+          for (Entry<Writable, Writable> e : x.entrySet()) {
+            int curr = ((IntWritable) e.getValue()).get();
+            if (minVerticesSize > curr) {
+              minVerticesSize = curr;
             }
           }
         }
+
+        if (minVerticesSize < (partitioningSteps * 2)) {
+          multiSteps = minVerticesSize;
+        } else {
+          multiSteps = (partitioningSteps * 2);
+        }
+
         for (String peerName : peer.getAllPeerNames()) {
-          peer.send(peerName, new GraphJobMessage(updatedCnt));
+          MapWritable temp = new MapWritable();
+          temp.put(new Text("steps"), new IntWritable(multiSteps));
+          peer.send(peerName, new GraphJobMessage(temp));
         }
       }
-      // if we have an aggregator defined, we must make an additional sync
-      // to have the updated values available on all our peers.
-      if (aggregators != null && iteration > 1) {
-        peer.sync();
-
-        MapWritable updatedValues = peer.getCurrentMessage().getMap();
-        for (int i = 0; i < aggregators.length; i++) {
-          globalAggregatorResult[i] = updatedValues.get(aggregatorValueFlag[i]);
-          globalAggregatorIncrement[i] = (IntWritable) updatedValues
-              .get(aggregatorIncrementFlag[i]);
-        }
-        IntWritable count = (IntWritable) updatedValues
-            .get(FLAG_MESSAGE_COUNTS);
-        if (count != null && count.get() == Integer.MIN_VALUE) {
-          updated = false;
-          break;
-        }
+      peer.sync();
+
+      GraphJobMessage received = peer.getCurrentMessage();
+      MapWritable x = received.getMap();
+      for (Entry<Writable, Writable> e : x.entrySet()) {
+        multiSteps = ((IntWritable) e.getValue()).get();
       }
 
-      int activeVertices = 0;
-      for (Vertex<V, E, M> vertex : vertices.values()) {
-        LinkedList<M> msgs = messages.get(vertex.getVertexID());
-        // If there are newly received messages, restart.
-        if (vertex.isHalted() && msgs != null) {
-          vertex.setActive();
-        }
-        if (msgs == null) {
-          msgs = new LinkedList<M>();
-        }
+      Set<V> keys = vertices.keySet();
+      Map<V, Vertex<V, E, M>> tmp = new HashMap<V, Vertex<V, E, M>>();
 
-        if (!vertex.isHalted()) {
-          if (combiner != null) {
-            M combined = combiner.combine(msgs);
-            msgs = new LinkedList<M>();
-            msgs.add(combined);
-          }
-          M lastValue = vertex.getValue();
-          vertex.compute(msgs.iterator());
+      int i = 0;
+      int syncs = 0;
+      for (V v : keys) {
+        Vertex<V, E, M> vertex2 = vertices.get(v);
+        for (Edge<V, E> e : vertices.get(v).getEdges()) {
+          peer.send(vertex2.getDestinationPeerName(e),
+              new GraphJobMessage(e.getDestinationVertexID()));
+        }
 
-          if (aggregators != null) {
-            if (this.aggregators != null) {
-              for (int i = 0; i < this.aggregators.length; i++) {
-                Aggregator<M, Vertex<V, E, M>> aggregator = this.aggregators[i];
-                aggregator.aggregate(vertex, vertex.getValue());
-                if (isAbstractAggregator[i]) {
-                  AbstractAggregator<M, Vertex<V, E, M>> intern = ((AbstractAggregator<M,
Vertex<V, E, M>>) aggregator);
-                  intern.aggregate(vertex, lastValue, vertex.getValue());
-                  intern.aggregateInternal();
-                }
+        if (syncs < multiSteps && (i % (vertices.size() / multiSteps)) == 0) {
+          peer.sync();
+          syncs++;
+          GraphJobMessage msg = null;
+          while ((msg = peer.getCurrentMessage()) != null) {
+            V vertexName = (V) msg.getVertexId();
+            if (!vertices.containsKey(vertexName)) {
+              Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+              newVertex.setVertexID(vertexName);
+              newVertex.runner = this;
+              if (selfReference) {
+                newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+                    newVertex.getVertexID(), null)));
+              } else {
+                newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
               }
+              newVertex.setup(conf);
+              tmp.put(vertexName, newVertex);
             }
           }
-          if (!vertex.isHalted()) {
-            activeVertices++;
+        }
+        i++;
+      }
+
+      peer.sync();
+      GraphJobMessage msg = null;
+      while ((msg = peer.getCurrentMessage()) != null) {
+        V vertexName = (V) msg.getVertexId();
+        if (!vertices.containsKey(vertexName)) {
+          Vertex<V, E, M> newVertex = newVertexInstance(vertexClass, conf);
+          newVertex.setVertexID(vertexName);
+          newVertex.runner = this;
+          if (selfReference) {
+            newVertex.setEdges(Collections.singletonList(new Edge<V, E>(
+                newVertex.getVertexID(), null)));
+          } else {
+            newVertex.setEdges(new ArrayList<Edge<V, E>>(0));
           }
+          newVertex.setup(conf);
+          vertices.put(vertexName, newVertex);
+          newVertex = null;
         }
       }
 
-      runAggregators(peer, activeVertices);
-      iteration++;
+      for (Map.Entry<V, Vertex<V, E, M>> e : tmp.entrySet()) {
+        vertices.put(e.getKey(), e.getValue());
+      }
+      tmp.clear();
     }
+
+    LOG.debug("Starting Vertex processing!");
   }
 
   /**
-   * Just write <ID as Writable, Value as Writable> pair as a result. Note that
-   * this will also be executed when failure happened.
+   * Partitions our vertices through multiple supersteps to save memory.
    */
-  @Override
-  public final void cleanup(
+  private int partitionMultiSteps(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer,
+      long splitSize) throws IOException, SyncException, InterruptedException {
+    int multiSteps = 1;
+
+    MapWritable ssize = new MapWritable();
+    ssize
+        .put(new IntWritable(peer.getPeerIndex()), new LongWritable(splitSize));
+    peer.send(getMasterTask(peer), new GraphJobMessage(ssize));
+    ssize = null;
+    peer.sync();
+
+    if (isMasterTask(peer)) {
+      long maxSplitSize = 0L;
+      GraphJobMessage received = null;
+      while ((received = peer.getCurrentMessage()) != null) {
+        MapWritable x = received.getMap();
+        for (Entry<Writable, Writable> e : x.entrySet()) {
+          long curr = ((LongWritable) e.getValue()).get();
+          if (maxSplitSize < curr) {
+            maxSplitSize = curr;
+          }
+        }
+      }
+
+      int steps = (int) (maxSplitSize / conf.getInt( // 20 mb
+          "hama.graph.multi.step.partitioning.interval", 20000000)) + 1;
+
+      for (String peerName : peer.getAllPeerNames()) {
+        MapWritable temp = new MapWritable();
+        temp.put(new Text("max"), new IntWritable(steps));
+        peer.send(peerName, new GraphJobMessage(temp));
+      }
+    }
+    peer.sync();
+
+    GraphJobMessage received = peer.getCurrentMessage();
+    MapWritable x = received.getMap();
+    for (Entry<Writable, Writable> e : x.entrySet()) {
+      multiSteps = ((IntWritable) e.getValue()).get();
+    }
+    LOG.info(peer.getPeerName() + ": " + multiSteps);
+    return multiSteps;
+  }
+
+  /**
+   * Counts vertices globally by sending the count of vertices in the map to the
+   * other peers.
+   */
+  private void countGlobalVertexCount(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
+      throws IOException, SyncException, InterruptedException {
+    for (String peerName : peer.getAllPeerNames()) {
+      peer.send(peerName, new GraphJobMessage(new IntWritable(vertices.size())));
+    }
+
+    peer.sync();
+
+    GraphJobMessage msg = null;
+    while ((msg = peer.getCurrentMessage()) != null) {
+      if (msg.isVerticesSizeMessage()) {
+        numberVertices += msg.getVerticesSize().get();
+      }
+    }
+  }
+
+  /**
+   * Parses the messages in every superstep and does actions according to flags
+   * in the messages.
+   * 
+   * @return a map that contains messages pro vertex.
+   */
+  @SuppressWarnings("unchecked")
+  private Map<V, List<M>> parseMessages(
       BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer)
       throws IOException {
-    for (Entry<V, Vertex<V, E, M>> e : vertices.entrySet()) {
-      peer.write(e.getValue().getVertexID(), e.getValue().getValue());
+    GraphJobMessage msg = null;
+    final Map<V, List<M>> msgMap = new HashMap<V, List<M>>();
+    while ((msg = peer.getCurrentMessage()) != null) {
+      // either this is a vertex message or a directive that must be read
+      // as map
+      if (msg.isVertexMessage()) {
+        final V vertexID = (V) msg.getVertexId();
+        final M value = (M) msg.getVertexValue();
+        List<M> msgs = msgMap.get(vertexID);
+        if (msgs == null) {
+          msgs = new ArrayList<M>();
+          msgMap.put(vertexID, msgs);
+        }
+        msgs.add(value);
+      } else if (msg.isMapMessage()) {
+        for (Entry<Writable, Writable> e : msg.getMap().entrySet()) {
+          Text vertexID = (Text) e.getKey();
+          if (FLAG_MESSAGE_COUNTS.equals(vertexID)) {
+            if (((IntWritable) e.getValue()).get() == Integer.MIN_VALUE) {
+              updated = false;
+            } else {
+              globalUpdateCounts += ((IntWritable) e.getValue()).get();
+            }
+          } else if (aggregationRunner.isEnabled()
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_VALUE)) {
+            aggregationRunner.masterReadAggregatedValue(vertexID,
+                (M) e.getValue());
+          } else if (aggregationRunner.isEnabled()
+              && vertexID.toString().startsWith(S_FLAG_AGGREGATOR_INCREMENT)) {
+            aggregationRunner.masterReadAggregatedIncrementalValue(vertexID,
+                (M) e.getValue());
+          }
+        }
+      } else {
+        throw new UnsupportedOperationException("Unknown message type: " + msg);
+      }
+
     }
+    return msgMap;
+  }
+
+  /**
+   * @return the number of vertices, globally accumulated.
+   */
+  public final long getNumberVertices() {
+    return numberVertices;
+  }
+
+  /**
+   * @return the current number of iterations.
+   */
+  public final long getNumberIterations() {
+    return iteration;
+  }
+
+  /**
+   * @return the defined number of maximum iterations, -1 if not defined.
+   */
+  public final int getMaxIteration() {
+    return maxIteration;
+  }
+
+  /**
+   * @return the defined partitioner instance.
+   */
+  public final Partitioner<V, M> getPartitioner() {
+    return partitioner;
+  }
+
+  /**
+   * Gets the last aggregated value at the given index. The index is dependend
+   * on how the aggregators were configured during job setup phase.
+   * 
+   * @return the value of the aggregator, or null if none was defined.
+   */
+  public final Writable getLastAggregatedValue(int index) {
+    return aggregationRunner.getLastAggregatedValue(index);
+  }
+
+  /**
+   * Gets the last aggregated number of vertices at the given index. The index
+   * is dependend on how the aggregators were configured during job setup phase.
+   * 
+   * @return the value of the aggregator, or null if none was defined.
+   */
+  public final IntWritable getNumLastAggregatedVertices(int index) {
+    return aggregationRunner.getNumLastAggregatedVertices(index);
+  }
+
+  /**
+   * @return the peer instance.
+   */
+  public final BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> getPeer()
{
+    return peer;
+  }
+
+  /**
+   * Checks if this is a master task. The master task is the first peer in the
+   * peer array.
+   */
+  public static boolean isMasterTask(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    return peer.getPeerName().equals(getMasterTask(peer));
+  }
+
+  /**
+   * @return the name of the master peer, the name at the first index of the
+   *         peers.
+   */
+  public static String getMasterTask(
+      BSPPeer<Writable, Writable, Writable, Writable, GraphJobMessage> peer) {
+    return peer.getPeerName(0);
+  }
+
+  /**
+   * @return a new vertex instance
+   */
+  public static <V extends Writable, E extends Writable, M extends Writable> Vertex<V,
E, M> newVertexInstance(
+      Class<?> vertexClass, Configuration conf) {
+    @SuppressWarnings("unchecked")
+    Vertex<V, E, M> vertex = (Vertex<V, E, M>) ReflectionUtils.newInstance(
+        vertexClass, conf);
+    return vertex;
   }
 
 }



Mime
View raw message