incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1245205 [9/18] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples...
Date Thu, 16 Feb 2012 22:12:36 GMT
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java?rev=1245205&r1=1245204&r2=1245205&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceWorker.java Thu Feb 16 22:12:31 2012
@@ -59,7 +59,6 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -73,1412 +72,1470 @@ import java.util.TreeSet;
 
 /**
  * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
  */
 @SuppressWarnings("rawtypes")
-public class BspServiceWorker<
-        I extends WritableComparable,
-        V extends Writable,
-        E extends Writable,
-        M extends Writable>
-        extends BspService<I, V, E, M>
-        implements CentralizedServiceWorker<I, V, E, M> {
-    /** Number of input splits */
-    private int inputSplitCount = -1;
-    /** My process health znode */
-    private String myHealthZnode;
-    /** List of aggregators currently in use */
-    private Set<String> aggregatorInUse = new TreeSet<String>();
-    /** Worker info */
-    private final WorkerInfo workerInfo;
-    /** Worker graph partitioner */
-    private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
-    /** Input split vertex cache (only used when loading from input split) */
-    private final Map<PartitionOwner, Partition<I, V, E, M>>
-        inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
-    /** Communication service */
-    private final ServerInterface<I, V, E, M> commService;
-    /** Structure to store the partitions on this worker */
-    private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
-        new HashMap<Integer, Partition<I, V, E, M>>();
-    /** Have the partition exchange children (workers) changed? */
-    private final BspEvent partitionExchangeChildrenChanged =
-        new PredicateLock();
-    /** Max vertices per partition before sending */
-    private final int maxVerticesPerPartition;
-    /** Worker Context */
-    private final WorkerContext workerContext;
-    /** Total vertices loaded */
-    private long totalVerticesLoaded = 0;
-    /** Total edges loaded */
-    private long totalEdgesLoaded = 0;
-    /** Input split max vertices (-1 denotes all) */
-    private final long inputSplitMaxVertices;
-    /** Class logger */
-    private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
-
-    public BspServiceWorker(
-            String serverPortList,
-            int sessionMsecTimeout,
-            Mapper<?, ?, ?, ?>.Context context,
-            GraphMapper<I, V, E, M> graphMapper,
-            GraphState<I, V, E,M> graphState)
-            throws UnknownHostException, IOException, InterruptedException {
-        super(serverPortList, sessionMsecTimeout, context, graphMapper);
-        registerBspEvent(partitionExchangeChildrenChanged);
-        int finalRpcPort =
-            getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
-                                      GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
-                                      getTaskPartition();
-        maxVerticesPerPartition =
-            getConfiguration().getInt(
-                GiraphJob.MAX_VERTICES_PER_PARTITION,
-                GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
-        inputSplitMaxVertices =
-            getConfiguration().getLong(
-                GiraphJob.INPUT_SPLIT_MAX_VERTICES,
-                GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
-        workerInfo =
-            new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
-        workerGraphPartitioner =
-            getGraphPartitionerFactory().createWorkerGraphPartitioner();
-        commService = new RPCCommunications<I, V, E, M>(
-            context, this, graphState);
-        graphState.setWorkerCommunications(commService);
-        this.workerContext =
-            BspUtils.createWorkerContext(getConfiguration(),
-                                         graphMapper.getGraphState());
-    }
-
-    public WorkerContext getWorkerContext() {
-    	return workerContext;
-    }
-
-    /**
-     * Intended to check the health of the node.  For instance, can it ssh,
-     * dmesg, etc. For now, does nothing.
-     */
-    public boolean isHealthy() {
-        return true;
-    }
-
-    /**
-     * Use an aggregator in this superstep.
-     *
-     * @param name
-     * @return boolean (false when aggregator not registered)
-     */
-    public boolean useAggregator(String name) {
-        if (getAggregatorMap().get(name) == null) {
-            LOG.error("userAggregator: Aggregator=" + name + " not registered");
-            return false;
-        }
-        aggregatorInUse.add(name);
-        return true;
-    }
-
-    /**
-     * Try to reserve an InputSplit for loading.  While InputSplits exists that
-     * are not finished, wait until they are.
-     *
-     * @return reserved InputSplit or null if no unfinished InputSplits exist
-     */
-    private String reserveInputSplit() {
-        List<String> inputSplitPathList = null;
-        try {
-            inputSplitPathList =
-                getZkExt().getChildrenExt(INPUT_SPLIT_PATH, false, false, true);
-            if (inputSplitCount == -1) {
-                inputSplitCount = inputSplitPathList.size();
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        String reservedInputSplitPath = null;
-        Stat reservedStat = null;
-        while (true) {
-            int finishedInputSplits = 0;
-            for (int i = 0; i < inputSplitPathList.size(); ++i) {
-                String tmpInputSplitFinishedPath =
-                    inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
-                try {
-                    reservedStat =
-                        getZkExt().exists(tmpInputSplitFinishedPath, true);
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                if (reservedStat != null) {
-                    ++finishedInputSplits;
-                    continue;
-                }
-
-                String tmpInputSplitReservedPath =
-                    inputSplitPathList.get(i) + INPUT_SPLIT_RESERVED_NODE;
-                try {
-                    reservedStat =
-                        getZkExt().exists(tmpInputSplitReservedPath, true);
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
-                if (reservedStat == null) {
-                    try {
-                        // Attempt to reserve this InputSplit
-                        getZkExt().createExt(tmpInputSplitReservedPath,
-                                       null,
-                                       Ids.OPEN_ACL_UNSAFE,
-                                       CreateMode.EPHEMERAL,
-                                       false);
-                        reservedInputSplitPath = inputSplitPathList.get(i);
-                        if (LOG.isInfoEnabled()) {
-                            float percentFinished =
-                               finishedInputSplits * 100.0f /
-                               inputSplitPathList.size();
-                            LOG.info("reserveInputSplit: Reserved input " +
-                                     "split path " + reservedInputSplitPath +
-                                     ", overall roughly " +
-                                      + percentFinished +
-                                     "% input splits finished");
-                        }
-                        return reservedInputSplitPath;
-                    } catch (KeeperException.NodeExistsException e) {
-                        LOG.info("reserveInputSplit: Couldn't reserve " +
-                                 "(already reserved) inputSplit" +
-                                 " at " + tmpInputSplitReservedPath);
-                    } catch (KeeperException e) {
-                        throw new IllegalStateException(
-                            "reserveInputSplit: KeeperException on reserve", e);
-                    } catch (InterruptedException e) {
-                        throw new IllegalStateException(
-                            "reserveInputSplit: InterruptedException " +
-                            "on reserve", e);
-                    }
-                }
-            }
+public class BspServiceWorker<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BspService<I, V, E, M>
+    implements CentralizedServiceWorker<I, V, E, M> {
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
+  /** Number of input splits */
+  private int inputSplitCount = -1;
+  /** My process health znode */
+  private String myHealthZnode;
+  /** List of aggregators currently in use */
+  private Set<String> aggregatorInUse = new TreeSet<String>();
+  /** Worker info */
+  private final WorkerInfo workerInfo;
+  /** Worker graph partitioner */
+  private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+  /** Input split vertex cache (only used when loading from input split) */
+  private final Map<PartitionOwner, Partition<I, V, E, M>>
+  inputSplitCache = new HashMap<PartitionOwner, Partition<I, V, E, M>>();
+  /** Communication service */
+  private final ServerInterface<I, V, E, M> commService;
+  /** Structure to store the partitions on this worker */
+  private final Map<Integer, Partition<I, V, E, M>> workerPartitionMap =
+      new HashMap<Integer, Partition<I, V, E, M>>();
+  /** Have the partition exchange children (workers) changed? */
+  private final BspEvent partitionExchangeChildrenChanged =
+      new PredicateLock();
+  /** Max vertices per partition before sending */
+  private final int maxVerticesPerPartition;
+  /** Worker Context */
+  private final WorkerContext workerContext;
+  /** Total vertices loaded */
+  private long totalVerticesLoaded = 0;
+  /** Total edges loaded */
+  private long totalEdgesLoaded = 0;
+  /** Input split max vertices (-1 denotes all) */
+  private final long inputSplitMaxVertices;
+
+  /**
+   * Constructor for setting up the worker.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+   * @param context Mapper context
+   * @param graphMapper Graph mapper
+   * @param graphState Global graph state
+   * @throws UnknownHostException
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public BspServiceWorker(
+    String serverPortList,
+    int sessionMsecTimeout,
+    Mapper<?, ?, ?, ?>.Context context,
+    GraphMapper<I, V, E, M> graphMapper,
+    GraphState<I, V, E, M> graphState)
+    throws IOException, InterruptedException {
+    super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    registerBspEvent(partitionExchangeChildrenChanged);
+    int finalRpcPort =
+        getConfiguration().getInt(GiraphJob.RPC_INITIAL_PORT,
+            GiraphJob.RPC_INITIAL_PORT_DEFAULT) +
+            getTaskPartition();
+    maxVerticesPerPartition =
+        getConfiguration().getInt(
+            GiraphJob.MAX_VERTICES_PER_PARTITION,
+            GiraphJob.MAX_VERTICES_PER_PARTITION_DEFAULT);
+    inputSplitMaxVertices =
+        getConfiguration().getLong(
+            GiraphJob.INPUT_SPLIT_MAX_VERTICES,
+            GiraphJob.INPUT_SPLIT_MAX_VERTICES_DEFAULT);
+    workerInfo =
+        new WorkerInfo(getHostname(), getTaskPartition(), finalRpcPort);
+    workerGraphPartitioner =
+        getGraphPartitionerFactory().createWorkerGraphPartitioner();
+    commService = new RPCCommunications<I, V, E, M>(
+        context, this, graphState);
+    graphState.setWorkerCommunications(commService);
+    this.workerContext =
+        BspUtils.createWorkerContext(getConfiguration(),
+            graphMapper.getGraphState());
+  }
+
+  public WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  /**
+   * Intended to check the health of the node.  For instance, can it ssh,
+   * dmesg, etc. For now, does nothing.
+   * TODO: Make this check configurable by the user (i.e. search dmesg for
+   * problems).
+   *
+   * @return True if healthy (always in this case).
+   */
+  public boolean isHealthy() {
+    return true;
+  }
+
+  /**
+   * Use an aggregator in this superstep.
+   *
+   * @param name Name of aggregator (should be unique)
+   * @return boolean (false when aggregator not registered)
+   */
+  public boolean useAggregator(String name) {
+    if (getAggregatorMap().get(name) == null) {
+      LOG.error("userAggregator: Aggregator=" + name + " not registered");
+      return false;
+    }
+    aggregatorInUse.add(name);
+    return true;
+  }
+
+  /**
+   * Try to reserve an InputSplit for loading.  While InputSplits exists that
+   * are not finished, wait until they are.
+   *
+   * @return reserved InputSplit or null if no unfinished InputSplits exist
+   * @throws KeeperException
+   * @throws InterruptedException
+   */
+  private String reserveInputSplit()
+    throws KeeperException, InterruptedException {
+    List<String> inputSplitPathList = null;
+    inputSplitPathList =
+        getZkExt().getChildrenExt(inputSplitsPath, false, false, true);
+    if (inputSplitCount == -1) {
+      inputSplitCount = inputSplitPathList.size();
+    }
+
+    String reservedInputSplitPath = null;
+    Stat reservedStat = null;
+    while (true) {
+      int finishedInputSplits = 0;
+      for (int i = 0; i < inputSplitPathList.size(); ++i) {
+        String tmpInputSplitFinishedPath =
+            inputSplitPathList.get(i) + INPUT_SPLIT_FINISHED_NODE;
+        reservedStat =
+            getZkExt().exists(tmpInputSplitFinishedPath, true);
+        if (reservedStat != null) {
+          ++finishedInputSplits;
+          continue;
+        }
+
+        String tmpInputSplitReservedPath =
+            inputSplitPathList.get(i) + INPUT_SPLIT_RESERVED_NODE;
+        reservedStat =
+            getZkExt().exists(tmpInputSplitReservedPath, true);
+        if (reservedStat == null) {
+          try {
+            // Attempt to reserve this InputSplit
+            getZkExt().createExt(tmpInputSplitReservedPath,
+                null,
+                Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL,
+                false);
+            reservedInputSplitPath = inputSplitPathList.get(i);
             if (LOG.isInfoEnabled()) {
-                LOG.info("reserveInputSplit: reservedPath = " +
-                         reservedInputSplitPath + ", " + finishedInputSplits +
-                         " of " + inputSplitPathList.size() +
-                         " InputSplits are finished.");
-            }
-            if (finishedInputSplits == inputSplitPathList.size()) {
-                return null;
-            }
-            // Wait for either a reservation to go away or a notification that
-            // an InputSplit has finished.
-            getInputSplitsStateChangedEvent().waitMsecs(60*1000);
-            getInputSplitsStateChangedEvent().reset();
-        }
-    }
-
-
-
-    /**
-     * Load the vertices from the user-defined VertexReader into our partitions
-     * of vertex ranges.  Do this until all the InputSplits have been processed.
-     * All workers will try to do as many InputSplits as they can.  The master
-     * will monitor progress and stop this once all the InputSplits have been
-     * loaded and check-pointed.  Keep track of the last input split path to
-     * ensure the input split cache is flushed prior to marking the last input
-     * split complete.
-     *
-     * @throws IOException
-     * @throws IllegalAccessException
-     * @throws InstantiationException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     */
-    private VertexEdgeCount loadVertices() throws IOException,
-            ClassNotFoundException,
-            InterruptedException, InstantiationException,
-            IllegalAccessException {
-        String inputSplitPath = null;
-        VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
-        while ((inputSplitPath = reserveInputSplit()) != null) {
-            vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
-                loadVerticesFromInputSplit(inputSplitPath));
-        }
-
-        // Flush the remaining cached vertices
-        for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
-                inputSplitCache.entrySet()) {
-            if (!entry.getValue().getVertices().isEmpty()) {
-                commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
-                                             entry.getValue());
-                entry.getValue().getVertices().clear();
-            }
-        }
-        inputSplitCache.clear();
-
-        return vertexEdgeCount;
-    }
-
-    /**
-     * Mark an input split path as completed by this worker.  This notifies
-     * the master and the other workers that this input split has not only
-     * been reserved, but also marked processed.
-     *
-     * @param inputSplitPath Path to the input split.
-     */
-    private void markInputSplitPathFinished(String inputSplitPath) {
-        String inputSplitFinishedPath =
-            inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
-        try {
-            getZkExt().createExt(inputSplitFinishedPath,
-                    null,
-                    Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT,
-                    true);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.warn("loadVertices: " + inputSplitFinishedPath +
-                    " already exists!");
-        } catch (KeeperException e) {
+              float percentFinished =
+                  finishedInputSplits * 100.0f /
+                  inputSplitPathList.size();
+              LOG.info("reserveInputSplit: Reserved input " +
+                  "split path " + reservedInputSplitPath +
+                  ", overall roughly " +
+                  + percentFinished +
+                  "% input splits finished");
+            }
+            return reservedInputSplitPath;
+          } catch (KeeperException.NodeExistsException e) {
+            LOG.info("reserveInputSplit: Couldn't reserve " +
+                "(already reserved) inputSplit" +
+                " at " + tmpInputSplitReservedPath);
+          } catch (KeeperException e) {
             throw new IllegalStateException(
-                "loadVertices: KeeperException on " +
-                inputSplitFinishedPath, e);
-        } catch (InterruptedException e) {
+                "reserveInputSplit: KeeperException on reserve", e);
+          } catch (InterruptedException e) {
             throw new IllegalStateException(
-                "loadVertices: InterruptedException on " +
-                inputSplitFinishedPath, e);
-        }
-    }
-
-    /**
-     * Extract vertices from input split, saving them into a mini cache of
-     * partitions.  Periodically flush the cache of vertices when a limit is
-     * reached in readVerticeFromInputSplit.
-     * Mark the input split finished when done.
-     *
-     * @param inputSplitPath ZK location of input split
-     * @return Mapping of vertex indices and statistics, or null if no data read
-     * @throws IOException
-     * @throws ClassNotFoundException
-     * @throws InterruptedException
-     * @throws InstantiationException
-     * @throws IllegalAccessException
-     */
-    private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
-        throws IOException, ClassNotFoundException, InterruptedException,
-               InstantiationException, IllegalAccessException {
-        InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
-        VertexEdgeCount vertexEdgeCount =
-            readVerticesFromInputSplit(inputSplit);
+                "reserveInputSplit: InterruptedException " +
+                    "on reserve", e);
+          }
+        }
+      }
+      if (LOG.isInfoEnabled()) {
+        LOG.info("reserveInputSplit: reservedPath = " +
+            reservedInputSplitPath + ", " + finishedInputSplits +
+            " of " + inputSplitPathList.size() +
+            " InputSplits are finished.");
+      }
+      if (finishedInputSplits == inputSplitPathList.size()) {
+        return null;
+      }
+      // Wait for either a reservation to go away or a notification that
+      // an InputSplit has finished.
+      getInputSplitsStateChangedEvent().waitMsecs(60 * 1000);
+      getInputSplitsStateChangedEvent().reset();
+    }
+  }
+
+  /**
+   * Load the vertices from the user-defined VertexReader into our partitions
+   * of vertex ranges.  Do this until all the InputSplits have been processed.
+   * All workers will try to do as many InputSplits as they can.  The master
+   * will monitor progress and stop this once all the InputSplits have been
+   * loaded and check-pointed.  Keep track of the last input split path to
+   * ensure the input split cache is flushed prior to marking the last input
+   * split complete.
+   *
+   * @return Statistics of the vertices loaded
+   * @throws IOException
+   * @throws IllegalAccessException
+   * @throws InstantiationException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private VertexEdgeCount loadVertices() throws IOException,
+    ClassNotFoundException, InterruptedException, InstantiationException,
+    IllegalAccessException, KeeperException {
+    String inputSplitPath = null;
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    while ((inputSplitPath = reserveInputSplit()) != null) {
+      vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(
+          loadVerticesFromInputSplit(inputSplitPath));
+    }
+
+    // Flush the remaining cached vertices
+    for (Entry<PartitionOwner, Partition<I, V, E, M>> entry :
+      inputSplitCache.entrySet()) {
+      if (!entry.getValue().getVertices().isEmpty()) {
+        commService.sendPartitionReq(entry.getKey().getWorkerInfo(),
+            entry.getValue());
+        entry.getValue().getVertices().clear();
+      }
+    }
+    inputSplitCache.clear();
+
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Mark an input split path as completed by this worker.  This notifies
+   * the master and the other workers that this input split has not only
+   * been reserved, but also marked processed.
+   *
+   * @param inputSplitPath Path to the input split.
+   */
+  private void markInputSplitPathFinished(String inputSplitPath) {
+    String inputSplitFinishedPath =
+        inputSplitPath + INPUT_SPLIT_FINISHED_NODE;
+    try {
+      getZkExt().createExt(inputSplitFinishedPath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("loadVertices: " + inputSplitFinishedPath +
+          " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "loadVertices: KeeperException on " +
+              inputSplitFinishedPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "loadVertices: InterruptedException on " +
+              inputSplitFinishedPath, e);
+    }
+  }
+
+  /**
+   * Extract vertices from input split, saving them into a mini cache of
+   * partitions.  Periodically flush the cache of vertices when a limit is
+   * reached in readVerticeFromInputSplit.
+   * Mark the input split finished when done.
+   *
+   * @param inputSplitPath ZK location of input split
+   * @return Mapping of vertex indices and statistics, or null if no data read
+   * @throws IOException
+   * @throws ClassNotFoundException
+   * @throws InterruptedException
+   * @throws InstantiationException
+   * @throws IllegalAccessException
+   */
+  private VertexEdgeCount loadVerticesFromInputSplit(String inputSplitPath)
+    throws IOException, ClassNotFoundException, InterruptedException,
+    InstantiationException, IllegalAccessException {
+    InputSplit inputSplit = getInputSplitForVertices(inputSplitPath);
+    VertexEdgeCount vertexEdgeCount =
+        readVerticesFromInputSplit(inputSplit);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadVerticesFromInputSplit: Finished loading " +
+          inputSplitPath + " " + vertexEdgeCount);
+    }
+    markInputSplitPathFinished(inputSplitPath);
+    return vertexEdgeCount;
+  }
+
+  /**
+   * Talk to ZooKeeper to convert the input split path to the actual
+   * InputSplit containing the vertices to read.
+   *
+   * @param inputSplitPath Location in ZK of input split
+   * @return instance of InputSplit containing vertices to read
+   * @throws IOException
+   * @throws ClassNotFoundException
+   */
+  private InputSplit getInputSplitForVertices(String inputSplitPath)
+    throws IOException, ClassNotFoundException {
+    byte[] splitList;
+    try {
+      splitList = getZkExt().getData(inputSplitPath, false, null);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "loadVertices: KeeperException on " + inputSplitPath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "loadVertices: IllegalStateException on " + inputSplitPath, e);
+    }
+    getContext().progress();
+
+    DataInputStream inputStream =
+        new DataInputStream(new ByteArrayInputStream(splitList));
+    String inputSplitClass = Text.readString(inputStream);
+    InputSplit inputSplit = (InputSplit)
+        ReflectionUtils.newInstance(
+            getConfiguration().getClassByName(inputSplitClass),
+            getConfiguration());
+    ((Writable) inputSplit).readFields(inputStream);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
+          " from ZooKeeper and got input split '" +
+          inputSplit.toString() + "'");
+    }
+    return inputSplit;
+  }
+
+  /**
+   * Read vertices from input split.  If testing, the user may request a
+   * maximum number of vertices to be read from an input split.
+   *
+   * @param inputSplit Input split to process with vertex reader
+   * @return List of vertices.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private VertexEdgeCount readVerticesFromInputSplit(
+      InputSplit inputSplit) throws IOException, InterruptedException {
+    VertexInputFormat<I, V, E, M> vertexInputFormat =
+        BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
+    VertexReader<I, V, E, M> vertexReader =
+        vertexInputFormat.createVertexReader(inputSplit, getContext());
+    vertexReader.initialize(inputSplit, getContext());
+    long vertexCount = 0;
+    long edgeCount = 0;
+    while (vertexReader.nextVertex()) {
+      BasicVertex<I, V, E, M> readerVertex =
+          vertexReader.getCurrentVertex();
+      if (readerVertex.getVertexId() == null) {
+        throw new IllegalArgumentException(
+            "loadVertices: Vertex reader returned a vertex " +
+                "without an id!  - " + readerVertex);
+      }
+      if (readerVertex.getVertexValue() == null) {
+        readerVertex.setVertexValue(
+            BspUtils.<V>createVertexValue(getConfiguration()));
+      }
+      PartitionOwner partitionOwner =
+          workerGraphPartitioner.getPartitionOwner(
+              readerVertex.getVertexId());
+      Partition<I, V, E, M> partition =
+          inputSplitCache.get(partitionOwner);
+      if (partition == null) {
+        partition = new Partition<I, V, E, M>(
+            getConfiguration(),
+            partitionOwner.getPartitionId());
+        inputSplitCache.put(partitionOwner, partition);
+      }
+      BasicVertex<I, V, E, M> oldVertex =
+          partition.putVertex(readerVertex);
+      if (oldVertex != null) {
+        LOG.warn("readVertices: Replacing vertex " + oldVertex +
+            " with " + readerVertex);
+      }
+      if (partition.getVertices().size() >= maxVerticesPerPartition) {
+        commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
+            partition);
+        partition.getVertices().clear();
+      }
+      ++vertexCount;
+      edgeCount += readerVertex.getNumOutEdges();
+      getContext().progress();
+
+      ++totalVerticesLoaded;
+      totalEdgesLoaded += readerVertex.getNumOutEdges();
+      // Update status every half a million vertices
+      if ((totalVerticesLoaded % 500000) == 0) {
+        String status = "readVerticesFromInputSplit: Loaded " +
+            totalVerticesLoaded + " vertices and " +
+            totalEdgesLoaded + " edges " +
+            MemoryUtils.getRuntimeMemoryStats() + " " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep();
         if (LOG.isInfoEnabled()) {
-            LOG.info("loadVerticesFromInputSplit: Finished loading " +
-                     inputSplitPath + " " + vertexEdgeCount);
-        }
-        markInputSplitPathFinished(inputSplitPath);
-        return vertexEdgeCount;
-    }
-
-    /**
-     * Talk to ZooKeeper to convert the input split path to the actual
-     * InputSplit containing the vertices to read.
-     *
-     * @param inputSplitPath Location in ZK of input split
-     * @return instance of InputSplit containing vertices to read
-     * @throws IOException
-     * @throws ClassNotFoundException
-     */
-    private InputSplit getInputSplitForVertices(String inputSplitPath)
-            throws IOException, ClassNotFoundException {
-        byte[] splitList;
-        try {
-            splitList = getZkExt().getData(inputSplitPath, false, null);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-               "loadVertices: KeeperException on " + inputSplitPath, e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "loadVertices: IllegalStateException on " + inputSplitPath, e);
+          LOG.info(status);
         }
-        getContext().progress();
-
-        DataInputStream inputStream =
-            new DataInputStream(new ByteArrayInputStream(splitList));
-        String inputSplitClass = Text.readString(inputStream);
-        InputSplit inputSplit = (InputSplit)
-            ReflectionUtils.newInstance(
-                getConfiguration().getClassByName(inputSplitClass),
-                getConfiguration());
-        ((Writable) inputSplit).readFields(inputStream);
+        getContext().setStatus(status);
+      }
 
+      // For sampling, or to limit outlier input splits, the number of
+      // records per input split can be limited
+      if ((inputSplitMaxVertices > 0) &&
+          (vertexCount >= inputSplitMaxVertices)) {
         if (LOG.isInfoEnabled()) {
-            LOG.info("getInputSplitForVertices: Reserved " + inputSplitPath +
-                 " from ZooKeeper and got input split '" +
-                 inputSplit.toString() + "'");
-        }
-        return inputSplit;
-    }
-
-    /**
-     * Read vertices from input split.  If testing, the user may request a
-     * maximum number of vertices to be read from an input split.
-     *
-     * @param inputSplit Input split to process with vertex reader
-     * @return List of vertices.
-     * @throws IOException
-     * @throws InterruptedException
-     */
-    private VertexEdgeCount readVerticesFromInputSplit(
-            InputSplit inputSplit) throws IOException, InterruptedException {
-        VertexInputFormat<I, V, E, M> vertexInputFormat =
-            BspUtils.<I, V, E, M>createVertexInputFormat(getConfiguration());
-        VertexReader<I, V, E, M> vertexReader =
-            vertexInputFormat.createVertexReader(inputSplit, getContext());
-        vertexReader.initialize(inputSplit, getContext());
-        long vertexCount = 0;
-        long edgeCount = 0;
-        while (vertexReader.nextVertex()) {
-            BasicVertex<I, V, E, M> readerVertex =
-                vertexReader.getCurrentVertex();
-            if (readerVertex.getVertexId() == null) {
-                throw new IllegalArgumentException(
-                    "loadVertices: Vertex reader returned a vertex " +
-                    "without an id!  - " + readerVertex);
-            }
-            if (readerVertex.getVertexValue() == null) {
-                readerVertex.setVertexValue(
-                    BspUtils.<V>createVertexValue(getConfiguration()));
-            }
-            PartitionOwner partitionOwner =
-                workerGraphPartitioner.getPartitionOwner(
-                    readerVertex.getVertexId());
-            Partition<I, V, E, M> partition =
-                inputSplitCache.get(partitionOwner);
-            if (partition == null) {
-                partition = new Partition<I, V, E, M>(
-                    getConfiguration(),
-                    partitionOwner.getPartitionId());
-                inputSplitCache.put(partitionOwner, partition);
-            }
-            BasicVertex<I, V, E, M> oldVertex =
-                partition.putVertex(readerVertex);
-            if (oldVertex != null) {
-                LOG.warn("readVertices: Replacing vertex " + oldVertex +
-                        " with " + readerVertex);
-            }
-            if (partition.getVertices().size() >= maxVerticesPerPartition) {
-                commService.sendPartitionReq(partitionOwner.getWorkerInfo(),
-                                             partition);
-                partition.getVertices().clear();
-            }
-            ++vertexCount;
-            edgeCount += readerVertex.getNumOutEdges();
-            getContext().progress();
-
-            ++totalVerticesLoaded;
-            totalEdgesLoaded += readerVertex.getNumOutEdges();
-            // Update status every half a million vertices
-            if ((totalVerticesLoaded % 500000) == 0) {
-                String status = "readVerticesFromInputSplit: Loaded " +
-                    totalVerticesLoaded + " vertices and " +
-                    totalEdgesLoaded + " edges " +
-                    MemoryUtils.getRuntimeMemoryStats() + " " +
-                    getGraphMapper().getMapFunctions().toString() +
-                    " - Attempt=" + getApplicationAttempt() +
-                    ", Superstep=" + getSuperstep();
-                if (LOG.isInfoEnabled()) {
-                    LOG.info(status);
-                }
-                getContext().setStatus(status);
-            }
-
-            // For sampling, or to limit outlier input splits, the number of
-            // records per input split can be limited
-            if ((inputSplitMaxVertices > 0) &&
-                    (vertexCount >= inputSplitMaxVertices)) {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("readVerticesFromInputSplit: Leaving the input " +
-                            "split early, reached maximum vertices " +
-                            vertexCount);
-                }
-                break;
-            }
-        }
-        vertexReader.close();
-
-        return new VertexEdgeCount(vertexCount, edgeCount);
-    }
-
-    @Override
-    public void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
-            Iterable<M> messageIterator) {
-        vertex.putMessages(messageIterator);
-    }
-
-    @Override
-    public void setup() {
-        // Unless doing a restart, prepare for computation:
-        // 1. Start superstep INPUT_SUPERSTEP (no computation)
-        // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
-        // 3. Process input splits until there are no more.
-        // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
-        // 5. Wait for superstep INPUT_SUPERSTEP to complete.
-        if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
-            setCachedSuperstep(getRestartedSuperstep());
-            return;
-        }
-
-        JSONObject jobState = getJobState();
-        if (jobState != null) {
-            try {
-                if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
-                        ApplicationState.START_SUPERSTEP) &&
-                        jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
-                        getSuperstep()) {
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("setup: Restarting from an automated " +
-                                 "checkpointed superstep " +
-                                 getSuperstep() + ", attempt " +
-                                 getApplicationAttempt());
-                    }
-                    setRestartedSuperstep(getSuperstep());
-                    return;
-                }
-            } catch (JSONException e) {
-                throw new RuntimeException(
-                    "setup: Failed to get key-values from " +
-                    jobState.toString(), e);
-            }
-        }
-
-        // Add the partitions for that this worker owns
-        Collection<? extends PartitionOwner> masterSetPartitionOwners =
-            startSuperstep();
-        workerGraphPartitioner.updatePartitionOwners(
-            getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
-
-        commService.setup();
-
-        // Ensure the InputSplits are ready for processing before processing
-        while (true) {
-            Stat inputSplitsReadyStat;
-            try {
-                inputSplitsReadyStat =
-                    getZkExt().exists(INPUT_SPLITS_ALL_READY_PATH, true);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "setup: KeeperException waiting on input splits", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "setup: InterruptedException waiting on input splits", e);
-            }
-            if (inputSplitsReadyStat != null) {
-                break;
-            }
-            getInputSplitsAllReadyEvent().waitForever();
-            getInputSplitsAllReadyEvent().reset();
-        }
-
-        getContext().progress();
-
-        try {
-            VertexEdgeCount vertexEdgeCount = loadVertices();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("setup: Finally loaded a total of " +
-                         vertexEdgeCount);
-            }
-        } catch (Exception e) {
-            LOG.error("setup: loadVertices failed - ", e);
-            throw new IllegalStateException("setup: loadVertices failed", e);
-        }
-        getContext().progress();
-
-        // Workers wait for each other to finish, coordinated by master
-        String workerDonePath =
-            INPUT_SPLIT_DONE_PATH + "/" + getWorkerInfo().getHostnameId();
-        try {
-            getZkExt().createExt(workerDonePath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "setup: KeeperException creating worker done splits", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "setup: InterruptedException creating worker done splits", e);
-        }
-        while (true) {
-            Stat inputSplitsDoneStat;
-            try {
-                inputSplitsDoneStat =
-                    getZkExt().exists(INPUT_SPLITS_ALL_DONE_PATH, true);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "setup: KeeperException waiting on worker done splits", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "setup: InterruptedException waiting on worker " +
-                    "done splits", e);
-            }
-            if (inputSplitsDoneStat != null) {
-                break;
-            }
-            getInputSplitsAllDoneEvent().waitForever();
-            getInputSplitsAllDoneEvent().reset();
-        }
-
-        // At this point all vertices have been sent to their destinations.
-        // Move them to the worker, creating creating the empty partitions
-        movePartitionsToWorker(commService);
-        for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
-            if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
-                !getPartitionMap().containsKey(
-                    partitionOwner.getPartitionId())) {
-                Partition<I, V, E, M> partition =
-                    new Partition<I, V, E, M>(getConfiguration(),
-                                              partitionOwner.getPartitionId());
-                getPartitionMap().put(partitionOwner.getPartitionId(),
-                                      partition);
-            }
-        }
-
-        // Generate the partition stats for the input superstep and process
-        // if necessary
-        List<PartitionStats> partitionStatsList =
-            new ArrayList<PartitionStats>();
-        for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
-            PartitionStats partitionStats =
-                new PartitionStats(partition.getPartitionId(),
-                                   partition.getVertices().size(),
-                                   0,
-                                   partition.getEdgeCount());
-            partitionStatsList.add(partitionStats);
-        }
+          LOG.info("readVerticesFromInputSplit: Leaving the input " +
+              "split early, reached maximum vertices " +
+              vertexCount);
+        }
+        break;
+      }
+    }
+    vertexReader.close();
+
+    return new VertexEdgeCount(vertexCount, edgeCount);
+  }
+
+  @Override
+  public void assignMessagesToVertex(BasicVertex<I, V, E, M> vertex,
+      Iterable<M> messageIterator) {
+    vertex.putMessages(messageIterator);
+  }
+
+  @Override
+  public void setup() {
+    // Unless doing a restart, prepare for computation:
+    // 1. Start superstep INPUT_SUPERSTEP (no computation)
+    // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
+    // 3. Process input splits until there are no more.
+    // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
+    // 5. Wait for superstep INPUT_SUPERSTEP to complete.
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      setCachedSuperstep(getRestartedSuperstep());
+      return;
+    }
+
+    JSONObject jobState = getJobState();
+    if (jobState != null) {
+      try {
+        if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
+            getSuperstep()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("setup: Restarting from an automated " +
+                "checkpointed superstep " +
+                getSuperstep() + ", attempt " +
+                getApplicationAttempt());
+          }
+          setRestartedSuperstep(getSuperstep());
+          return;
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "setup: Failed to get key-values from " +
+                jobState.toString(), e);
+      }
+    }
+
+    // Add the partitions for that this worker owns
+    Collection<? extends PartitionOwner> masterSetPartitionOwners =
+        startSuperstep();
+    workerGraphPartitioner.updatePartitionOwners(
+        getWorkerInfo(), masterSetPartitionOwners, getPartitionMap());
+
+    commService.setup();
+
+    // Ensure the InputSplits are ready for processing before processing
+    while (true) {
+      Stat inputSplitsReadyStat;
+      try {
+        inputSplitsReadyStat =
+            getZkExt().exists(inputSplitsAllReadyPath, true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: KeeperException waiting on input splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: InterruptedException waiting on input splits", e);
+      }
+      if (inputSplitsReadyStat != null) {
+        break;
+      }
+      getInputSplitsAllReadyEvent().waitForever();
+      getInputSplitsAllReadyEvent().reset();
+    }
+
+    getContext().progress();
+
+    try {
+      VertexEdgeCount vertexEdgeCount = loadVertices();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("setup: Finally loaded a total of " +
+            vertexEdgeCount);
+      }
+    } catch (IOException e) {
+      throw new IllegalStateException("setup: loadVertices failed due to " +
+          "IOException", e);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("setup: loadVertices failed due to " +
+          "ClassNotFoundException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("setup: loadVertices failed due to " +
+          "InterruptedException", e);
+    } catch (InstantiationException e) {
+      throw new IllegalStateException("setup: loadVertices failed due to " +
+          "InstantiationException", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalStateException("setup: loadVertices failed due to " +
+          "IllegalAccessException", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("setup: loadVertices failed due to " +
+          "KeeperException", e);
+    }
+    getContext().progress();
+
+    // Workers wait for each other to finish, coordinated by master
+    String workerDonePath =
+        inputSplitsDonePath + "/" + getWorkerInfo().getHostnameId();
+    try {
+      getZkExt().createExt(workerDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "setup: KeeperException creating worker done splits", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "setup: InterruptedException creating worker done splits", e);
+    }
+    while (true) {
+      Stat inputSplitsDoneStat;
+      try {
+        inputSplitsDoneStat =
+            getZkExt().exists(inputSplitsAllDonePath, true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: KeeperException waiting on worker done splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: InterruptedException waiting on worker " +
+                "done splits", e);
+      }
+      if (inputSplitsDoneStat != null) {
+        break;
+      }
+      getInputSplitsAllDoneEvent().waitForever();
+      getInputSplitsAllDoneEvent().reset();
+    }
+
+    // At this point all vertices have been sent to their destinations.
+    // Move them to the worker, creating creating the empty partitions
+    movePartitionsToWorker(commService);
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+          !getPartitionMap().containsKey(
+              partitionOwner.getPartitionId())) {
+        Partition<I, V, E, M> partition =
+            new Partition<I, V, E, M>(getConfiguration(),
+                partitionOwner.getPartitionId());
+        getPartitionMap().put(partitionOwner.getPartitionId(),
+            partition);
+      }
+    }
+
+    // Generate the partition stats for the input superstep and process
+    // if necessary
+    List<PartitionStats> partitionStatsList =
+        new ArrayList<PartitionStats>();
+    for (Partition<I, V, E, M> partition : getPartitionMap().values()) {
+      PartitionStats partitionStats =
+          new PartitionStats(partition.getPartitionId(),
+              partition.getVertices().size(),
+              0,
+              partition.getEdgeCount());
+      partitionStatsList.add(partitionStats);
+    }
+    workerGraphPartitioner.finalizePartitionStats(
+        partitionStatsList, workerPartitionMap);
+
+    finishSuperstep(partitionStatsList);
+  }
+
+  /**
+   *  Marshal the aggregator values of to a JSONArray that will later be
+   *  aggregated by master.  Reset the 'use' of aggregators in the next
+   *  superstep
+   *
+   * @param superstep Superstep to marshall on
+   * @return JSON array of the aggreagtor values
+   */
+  private JSONArray marshalAggregatorValues(long superstep) {
+    JSONArray aggregatorArray = new JSONArray();
+    if ((superstep == INPUT_SUPERSTEP) || (aggregatorInUse.size() == 0)) {
+      return aggregatorArray;
+    }
+
+    for (String name : aggregatorInUse) {
+      try {
+        Aggregator<Writable> aggregator = getAggregatorMap().get(name);
+        ByteArrayOutputStream outputStream =
+            new ByteArrayOutputStream();
+        DataOutput output = new DataOutputStream(outputStream);
+        aggregator.getAggregatedValue().write(output);
+
+        JSONObject aggregatorObj = new JSONObject();
+        aggregatorObj.put(AGGREGATOR_NAME_KEY, name);
+        aggregatorObj.put(AGGREGATOR_CLASS_NAME_KEY,
+            aggregator.getClass().getName());
+        aggregatorObj.put(
+            AGGREGATOR_VALUE_KEY,
+            Base64.encodeBytes(outputStream.toByteArray()));
+        aggregatorArray.put(aggregatorObj);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("marshalAggregatorValues: " +
+              "Found aggregatorObj " +
+              aggregatorObj + ", value (" +
+              aggregator.getAggregatedValue() + ")");
+        }
+      } catch (JSONException e) {
+        throw new IllegalStateException("Failed to marshall aggregator " +
+            "with JSONException " + name, e);
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to marshall aggregator " +
+            "with IOException " + name, e);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("marshalAggregatorValues: Finished assembling " +
+          "aggregator values in JSONArray - " + aggregatorArray);
+    }
+    aggregatorInUse.clear();
+    return aggregatorArray;
+  }
+
+  /**
+   * Get values of aggregators aggregated by master in previous superstep.
+   *
+   * @param superstep Superstep to get the aggregated values from
+   */
+  private void getAggregatorValues(long superstep) {
+    if (superstep <= (INPUT_SUPERSTEP + 1)) {
+      return;
+    }
+    String mergedAggregatorPath =
+        getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
+    JSONArray aggregatorArray = null;
+    try {
+      byte[] zkData =
+          getZkExt().getData(mergedAggregatorPath, false, null);
+      aggregatorArray = new JSONArray(new String(zkData));
+    } catch (KeeperException.NoNodeException e) {
+      LOG.info("getAggregatorValues: no aggregators in " +
+          mergedAggregatorPath + " on superstep " + superstep);
+      return;
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Failed to get data for " +
+          mergedAggregatorPath + " with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Failed to get data for " +
+          mergedAggregatorPath + " with InterruptedException", e);
+    } catch (JSONException e) {
+      throw new IllegalStateException("Failed to get data for " +
+          mergedAggregatorPath + " with JSONException", e);
+    }
+    for (int i = 0; i < aggregatorArray.length(); ++i) {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("getAggregatorValues: " +
+              "Getting aggregators from " +
+              aggregatorArray.getJSONObject(i));
+        }
+        String aggregatorName = aggregatorArray.getJSONObject(i).
+            getString(AGGREGATOR_NAME_KEY);
+        Aggregator<Writable> aggregator =
+            getAggregatorMap().get(aggregatorName);
+        if (aggregator == null) {
+          continue;
+        }
+        Writable aggregatorValue = aggregator.getAggregatedValue();
+        InputStream input =
+            new ByteArrayInputStream(
+                Base64.decode(aggregatorArray.getJSONObject(i).
+                    getString(AGGREGATOR_VALUE_KEY)));
+        aggregatorValue.readFields(
+            new DataInputStream(input));
+        aggregator.setAggregatedValue(aggregatorValue);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("getAggregatorValues: " +
+              "Got aggregator=" + aggregatorName + " value=" +
+              aggregatorValue);
+        }
+      } catch (JSONException e) {
+        throw new IllegalStateException("Failed to decode data for index " +
+            i + " with KeeperException", e);
+      } catch (IOException e) {
+        throw new IllegalStateException("Failed to decode data for index " +
+            i + " with KeeperException", e);
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("getAggregatorValues: Finished loading " +
+          mergedAggregatorPath + " with aggregator values " +
+          aggregatorArray);
+    }
+  }
+
+  /**
+   * Register the health of this worker for a given superstep
+   *
+   * @param superstep Superstep to register health on
+   */
+  private void registerHealth(long superstep) {
+    JSONArray hostnamePort = new JSONArray();
+    hostnamePort.put(getHostname());
+
+    hostnamePort.put(workerInfo.getPort());
+
+    String myHealthPath = null;
+    if (isHealthy()) {
+      myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    } else {
+      myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    }
+    myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+    try {
+      myHealthZnode = getZkExt().createExt(
+          myHealthPath,
+          WritableUtils.writeToByteArray(workerInfo),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("registerHealth: myHealthPath already exists (likely " +
+          "from previous failure): " + myHealthPath +
+          ".  Waiting for change in attempts " +
+          "to re-join the application");
+      getApplicationAttemptChangedEvent().waitForever();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("registerHealth: Got application " +
+            "attempt changed event, killing self");
+      }
+      throw new IllegalStateException(
+          "registerHealth: Trying " +
+              "to get the new application attempt by killing self", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with InterruptedException", e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("registerHealth: Created my health node for attempt=" +
+          getApplicationAttempt() + ", superstep=" +
+          getSuperstep() + " with " + myHealthZnode +
+          " and workerInfo= " + workerInfo);
+    }
+  }
+
+  /**
+   * Do this to help notify the master quicker that this worker has failed.
+   */
+  private void unregisterHealth() {
+    LOG.error("unregisterHealth: Got failure, unregistering health on " +
+        myHealthZnode + " on superstep " + getSuperstep());
+    try {
+      getZkExt().delete(myHealthZnode, -1);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: InterruptedException - Couldn't delete " +
+              myHealthZnode, e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: KeeperException - Couldn't delete " +
+              myHealthZnode, e);
+    }
+  }
+
+  @Override
+  public void failureCleanup() {
+    unregisterHealth();
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> startSuperstep() {
+    // Algorithm:
+    // 1. Communication service will combine message from previous
+    //    superstep
+    // 2. Register my health for the next superstep.
+    // 3. Wait until the partition assignment is complete and get it
+    // 4. Get the aggregator values from the previous superstep
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      commService.prepareSuperstep();
+    }
+
+    registerHealth(getSuperstep());
+
+    String partitionAssignmentsNode =
+        getPartitionAssignmentsPath(getApplicationAttempt(),
+            getSuperstep());
+    Collection<? extends PartitionOwner> masterSetPartitionOwners;
+    try {
+      while (getZkExt().exists(partitionAssignmentsNode, true) ==
+          null) {
+        getPartitionAssignmentsReadyChangedEvent().waitForever();
+        getPartitionAssignmentsReadyChangedEvent().reset();
+      }
+      List<? extends Writable> writableList =
+          WritableUtils.readListFieldsFromZnode(
+              getZkExt(),
+              partitionAssignmentsNode,
+              false,
+              null,
+              workerGraphPartitioner.createPartitionOwner().getClass(),
+              getConfiguration());
+
+      @SuppressWarnings("unchecked")
+      Collection<? extends PartitionOwner> castedWritableList =
+        (Collection<? extends PartitionOwner>) writableList;
+      masterSetPartitionOwners = castedWritableList;
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "startSuperstep: KeeperException getting assignments", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "startSuperstep: InterruptedException getting assignments", e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("startSuperstep: Ready for computation on superstep " +
+          getSuperstep() + " since worker " +
+          "selection and vertex range assignments are done in " +
+          partitionAssignmentsNode);
+    }
+
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      getAggregatorValues(getSuperstep());
+    }
+    getContext().setStatus("startSuperstep: " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+    return masterSetPartitionOwners;
+  }
+
+  @Override
+  public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
+    // This barrier blocks until success (or the master signals it to
+    // restart).
+    //
+    // Master will coordinate the barriers and aggregate "doneness" of all
+    // the vertices.  Each worker will:
+    // 1. Flush the unsent messages
+    // 2. Execute user postSuperstep() if necessary.
+    // 3. Save aggregator values that are in use.
+    // 4. Report the statistics (vertices, edges, messages, etc.)
+    //    of this worker
+    // 5. Let the master know it is finished.
+    // 6. Wait for the master's global stats, and check if done
+    long workerSentMessages = 0;
+    try {
+      workerSentMessages = commService.flush(getContext());
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: flush failed", e);
+    }
+
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      getWorkerContext().postSuperstep();
+      getContext().progress();
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+
+    JSONArray aggregatorValueArray =
+        marshalAggregatorValues(getSuperstep());
+    Collection<PartitionStats> finalizedPartitionStats =
         workerGraphPartitioner.finalizePartitionStats(
             partitionStatsList, workerPartitionMap);
-
-        finishSuperstep(partitionStatsList);
-    }
-
-    /**
-     *  Marshal the aggregator values of to a JSONArray that will later be
-     *  aggregated by master.  Reset the 'use' of aggregators in the next
-     *  superstep
-     *
-     * @param superstep
-     */
-    private JSONArray marshalAggregatorValues(long superstep) {
-        JSONArray aggregatorArray = new JSONArray();
-        if ((superstep == INPUT_SUPERSTEP) || (aggregatorInUse.size() == 0)) {
-            return aggregatorArray;
-        }
-
-        for (String name : aggregatorInUse) {
-            try {
-                Aggregator<Writable> aggregator = getAggregatorMap().get(name);
-                ByteArrayOutputStream outputStream =
-                    new ByteArrayOutputStream();
-                DataOutput output = new DataOutputStream(outputStream);
-                aggregator.getAggregatedValue().write(output);
-
-                JSONObject aggregatorObj = new JSONObject();
-                aggregatorObj.put(AGGREGATOR_NAME_KEY, name);
-                aggregatorObj.put(AGGREGATOR_CLASS_NAME_KEY,
-                                  aggregator.getClass().getName());
-                aggregatorObj.put(
-                    AGGREGATOR_VALUE_KEY,
-                    Base64.encodeBytes(outputStream.toByteArray()));
-                aggregatorArray.put(aggregatorObj);
-                LOG.info("marshalAggregatorValues: " +
-                         "Found aggregatorObj " +
-                         aggregatorObj + ", value (" +
-                         aggregator.getAggregatedValue() + ")");
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-
-        if (LOG.isInfoEnabled()) {
-	        LOG.info("marshalAggregatorValues: Finished assembling " +
-	                 "aggregator values in JSONArray - " + aggregatorArray);
-        }
-        aggregatorInUse.clear();
-        return aggregatorArray;
-    }
-
-    /**
-     * Get values of aggregators aggregated by master in previous superstep.
-     *
-     * @param superstep Superstep to get the aggregated values from
-     */
-    private void getAggregatorValues(long superstep) {
-        if (superstep <= (INPUT_SUPERSTEP + 1)) {
-            return;
-        }
-        String mergedAggregatorPath =
-            getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
-        JSONArray aggregatorArray = null;
-        try {
-            byte[] zkData =
-                getZkExt().getData(mergedAggregatorPath, false, null);
-            aggregatorArray = new JSONArray(new String(zkData));
-        } catch (KeeperException.NoNodeException e) {
-            LOG.info("getAggregatorValues: no aggregators in " +
-                     mergedAggregatorPath + " on superstep " + superstep);
-            return;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        for (int i = 0; i < aggregatorArray.length(); ++i) {
-            try {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("getAggregatorValues: " +
-                              "Getting aggregators from " +
-                              aggregatorArray.getJSONObject(i));
-                }
-                String aggregatorName = aggregatorArray.getJSONObject(i).
-                    getString(AGGREGATOR_NAME_KEY);
-                Aggregator<Writable> aggregator =
-                    getAggregatorMap().get(aggregatorName);
-                if (aggregator == null) {
-                    continue;
-                }
-                Writable aggregatorValue = aggregator.getAggregatedValue();
-                InputStream input =
-                    new ByteArrayInputStream(
-                        Base64.decode(aggregatorArray.getJSONObject(i).
-                            getString(AGGREGATOR_VALUE_KEY)));
-                aggregatorValue.readFields(
-                    new DataInputStream(input));
-                aggregator.setAggregatedValue(aggregatorValue);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("getAggregatorValues: " +
-                              "Got aggregator=" + aggregatorName + " value=" +
-                               aggregatorValue);
-                }
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-        if (LOG.isInfoEnabled()) {
-            LOG.info("getAggregatorValues: Finished loading " +
-                     mergedAggregatorPath + " with aggregator values " +
-                     aggregatorArray);
-        }
-    }
-
-    /**
-     * Register the health of this worker for a given superstep
-     *
-     * @param superstep Superstep to register health on
-     */
-    private void registerHealth(long superstep) {
-        JSONArray hostnamePort = new JSONArray();
-        hostnamePort.put(getHostname());
-
-        hostnamePort.put(workerInfo.getPort());
-
-        String myHealthPath = null;
-        if (isHealthy()) {
-            myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
-                                                    getSuperstep());
-        }
-        else {
-            myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
-                                                      getSuperstep());
-        }
-        myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+    List<PartitionStats> finalizedPartitionStatsList =
+        new ArrayList<PartitionStats>(finalizedPartitionStats);
+    byte [] partitionStatsBytes =
+        WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+    JSONObject workerFinishedInfoObj = new JSONObject();
+    try {
+      workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
+          aggregatorValueArray);
+      workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+          Base64.encodeBytes(partitionStatsBytes));
+      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
+          workerSentMessages);
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+    String finishedWorkerPath =
+        getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
+        "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(finishedWorkerPath,
+          workerFinishedInfoObj.toString().getBytes(),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: finished worker path " +
+          finishedWorkerPath + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with InterruptedException", e);
+    }
+    getContext().setStatus("finishSuperstep: (waiting for rest " +
+        "of workers) " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+    try {
+      while (getZkExt().exists(superstepFinishedNode, true) == null) {
+        getSuperstepFinishedEvent().waitForever();
+        getSuperstepFinishedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    }
+    GlobalStats globalStats = new GlobalStats();
+    WritableUtils.readFieldsFromZnode(
+        getZkExt(), superstepFinishedNode, false, null, globalStats);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
+          " with global stats " + globalStats);
+    }
+    incrCachedSuperstep();
+    getContext().setStatus("finishSuperstep: (all workers done) " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+    getGraphMapper().getGraphState().
+    setNumEdges(globalStats.getEdgeCount()).
+    setNumVertices(globalStats.getVertexCount());
+    return (globalStats.getFinishedVertexCount() ==
+        globalStats.getVertexCount()) &&
+        (globalStats.getMessageCount() == 0);
+  }
+
+  /**
+   * Save the vertices using the user-defined VertexOutputFormat from our
+   * vertexArray based on the split.
+   * @throws InterruptedException
+   */
+  private void saveVertices() throws IOException, InterruptedException {
+    if (getConfiguration().get(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS) ==
+        null) {
+      LOG.warn("saveVertices: " + GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS +
+          " not specified -- there will be no saved output");
+      return;
+    }
+
+    VertexOutputFormat<I, V, E> vertexOutputFormat =
+        BspUtils.<I, V, E>createVertexOutputFormat(getConfiguration());
+    VertexWriter<I, V, E> vertexWriter =
+        vertexOutputFormat.createVertexWriter(getContext());
+    vertexWriter.initialize(getContext());
+    for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+      for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
+        vertexWriter.writeVertex(vertex);
+      }
+    }
+    vertexWriter.close(getContext());
+  }
+
+  @Override
+  public void cleanup() throws IOException, InterruptedException {
+    commService.closeConnections();
+    setCachedSuperstep(getSuperstep() - 1);
+    saveVertices();
+    // All worker processes should denote they are done by adding special
+    // znode.  Once the number of znodes equals the number of partitions
+    // for workers and masters, the master will clean up the ZooKeeper
+    // znodes associated with this job.
+    String workerCleanedUpPath = cleanedUpPath  + "/" +
+        getTaskPartition() + WORKER_SUFFIX;
+    try {
+      String finalFinishedPath =
+          getZkExt().createExt(workerCleanedUpPath,
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT,
+              true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Notifying master its okay to cleanup with " +
+            finalFinishedPath);
+      }
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Couldn't create finished node '" +
+            workerCleanedUpPath);
+      }
+    } catch (KeeperException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got KeeperException on notifcation " +
+          "to master about cleanup", e);
+    } catch (InterruptedException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got InterruptedException on notifcation " +
+          "to master about cleanup", e);
+    }
+    try {
+      getZkExt().close();
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Zookeeper failed to close with " + e);
+    }
+
+    // Preferably would shut down the service only after
+    // all clients have disconnected (or the exceptions on the
+    // client side ignored).
+    commService.close();
+  }
+
+  @Override
+  public void storeCheckpoint() throws IOException {
+    getContext().setStatus("storeCheckpoint: Starting checkpoint " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+
+    // Algorithm:
+    // For each partition, dump vertices and messages
+    Path metadataFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_METADATA_POSTFIX);
+    Path verticesFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VERTICES_POSTFIX);
+    Path validFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VALID_POSTFIX);
+
+    // Remove these files if they already exist (shouldn't though, unless
+    // of previous failure of this worker)
+    if (getFs().delete(validFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed valid file " +
+          validFilePath);
+    }
+    if (getFs().delete(metadataFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed metadata file " +
+          metadataFilePath);
+    }
+    if (getFs().delete(verticesFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
+    }
+
+    FSDataOutputStream verticesOutputStream =
+        getFs().create(verticesFilePath);
+    ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
+    DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
+    for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
+      long startPos = verticesOutputStream.getPos();
+      partition.write(verticesOutputStream);
+      // Write the metadata for this partition
+      // Format:
+      // <index count>
+      //   <index 0 start pos><partition id>
+      //   <index 1 start pos><partition id>
+      metadataOutput.writeLong(startPos);
+      metadataOutput.writeInt(partition.getPartitionId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("storeCheckpoint: Vertex file starting " +
+            "offset = " + startPos + ", length = " +
+            (verticesOutputStream.getPos() - startPos) +
+            ", partition = " + partition.toString());
+      }
+    }
+    // Metadata is buffered and written at the end since it's small and
+    // needs to know how many partitions this worker owns
+    FSDataOutputStream metadataOutputStream =
+        getFs().create(metadataFilePath);
+    metadataOutputStream.writeInt(workerPartitionMap.size());
+    metadataOutputStream.write(metadataByteStream.toByteArray());
+    metadataOutputStream.close();
+    verticesOutputStream.close();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("storeCheckpoint: Finished metadata (" +
+          metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+    }
+
+    getFs().createNewFile(validFilePath);
+  }
+
+  @Override
+  public void loadCheckpoint(long superstep) {
+    // Algorithm:
+    // Examine all the partition owners and load the ones
+    // that match my hostname and id from the master designated checkpoint
+    // prefixes.
+    long startPos = 0;
+    int loadedPartitions = 0;
+    for (PartitionOwner partitionOwner :
+      workerGraphPartitioner.getPartitionOwners()) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+        String metadataFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_METADATA_POSTFIX;
+        String partitionsFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_VERTICES_POSTFIX;
         try {
-            myHealthZnode = getZkExt().createExt(
-                myHealthPath,
-                WritableUtils.writeToByteArray(workerInfo),
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.EPHEMERAL,
-                true);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.warn("registerHealth: myHealthPath already exists (likely " +
-                     "from previous failure): " + myHealthPath +
-                     ".  Waiting for change in attempts " +
-                     "to re-join the application");
-            getApplicationAttemptChangedEvent().waitForever();
-            if (LOG.isInfoEnabled()) {
-                LOG.info("registerHealth: Got application " +
-                         "attempt changed event, killing self");
+          int partitionId = -1;
+          DataInputStream metadataStream =
+              getFs().open(new Path(metadataFile));
+          int partitions = metadataStream.readInt();
+          for (int i = 0; i < partitions; ++i) {
+            startPos = metadataStream.readLong();
+            partitionId = metadataStream.readInt();
+            if (partitionId == partitionOwner.getPartitionId()) {
+              break;
             }
-            throw new RuntimeException(
-                "registerHealth: Trying " +
-                "to get the new application attempt by killing self", e);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        if (LOG.isInfoEnabled()) {
-            LOG.info("registerHealth: Created my health node for attempt=" +
-                     getApplicationAttempt() + ", superstep=" +
-                     getSuperstep() + " with " + myHealthZnode +
-                     " and workerInfo= " + workerInfo);
-        }
-    }
-
-    /**
-     * Do this to help notify the master quicker that this worker has failed.
-     */
-    private void unregisterHealth() {
-        LOG.error("unregisterHealth: Got failure, unregistering health on " +
-                  myHealthZnode + " on superstep " + getSuperstep());
-        try {
-            getZkExt().delete(myHealthZnode, -1);
-        } catch (InterruptedException e) {
+          }
+          if (partitionId != partitionOwner.getPartitionId()) {
             throw new IllegalStateException(
-                "unregisterHealth: InterruptedException - Couldn't delete " +
-                myHealthZnode, e);
-        } catch (KeeperException e) {
+                "loadCheckpoint: " + partitionOwner +
+                " not found!");
+          }
+          metadataStream.close();
+          Partition<I, V, E, M> partition =
+              new Partition<I, V, E, M>(
+                  getConfiguration(),
+                  partitionId);
+          DataInputStream partitionsStream =
+              getFs().open(new Path(partitionsFile));
+          if (partitionsStream.skip(startPos) != startPos) {
             throw new IllegalStateException(
-                "unregisterHealth: KeeperException - Couldn't delete " +
-                myHealthZnode, e);
-        }
-    }
-
-    @Override
-    public void failureCleanup() {
-        unregisterHealth();
-    }
-
-    @Override
-    public Collection<? extends PartitionOwner> startSuperstep() {
-        // Algorithm:
-        // 1. Communication service will combine message from previous
-        //    superstep
-        // 2. Register my health for the next superstep.
-        // 3. Wait until the partition assignment is complete and get it
-        // 4. Get the aggregator values from the previous superstep
-        if (getSuperstep() != INPUT_SUPERSTEP) {
-            commService.prepareSuperstep();
-        }
-
-        registerHealth(getSuperstep());
-
-        String partitionAssignmentsNode =
-            getPartitionAssignmentsPath(getApplicationAttempt(),
-                                        getSuperstep());
-        Collection<? extends PartitionOwner> masterSetPartitionOwners;
-        try {
-            while (getZkExt().exists(partitionAssignmentsNode, true) ==
-                    null) {
-                getPartitionAssignmentsReadyChangedEvent().waitForever();
-                getPartitionAssignmentsReadyChangedEvent().reset();
-            }
-            List<? extends Writable> writableList =
-                WritableUtils.readListFieldsFromZnode(
-                    getZkExt(),
-                    partitionAssignmentsNode,
-                    false,
-                    null,
-                    workerGraphPartitioner.createPartitionOwner().getClass(),
-                    getConfiguration());
-
-            @SuppressWarnings("unchecked")
-            Collection<? extends PartitionOwner> castedWritableList =
-                (Collection<? extends PartitionOwner>) writableList;
-            masterSetPartitionOwners = castedWritableList;
-        } catch (KeeperException e) {
+                "loadCheckpoint: Failed to skip " + startPos +
+                " on " + partitionsFile);
+          }
+          partition.readFields(partitionsStream);
+          partitionsStream.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("loadCheckpoint: Loaded partition " +
+                partition);
+          }
+          if (getPartitionMap().put(partitionId, partition) != null) {
             throw new IllegalStateException(
-                "startSuperstep: KeeperException getting assignments", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "startSuperstep: InterruptedException getting assignments", e);
-        }
-
-        if (LOG.isInfoEnabled()) {
-            LOG.info("startSuperstep: Ready for computation on superstep " +
-                     getSuperstep() + " since worker " +
-                     "selection and vertex range assignments are done in " +
-                     partitionAssignmentsNode);
-        }
-
-        if (getSuperstep() != INPUT_SUPERSTEP) {
-            getAggregatorValues(getSuperstep());
-        }
-        getContext().setStatus("startSuperstep: " +
-                               getGraphMapper().getMapFunctions().toString() +
-                               " - Attempt=" + getApplicationAttempt() +
-                               ", Superstep=" + getSuperstep());
-        return masterSetPartitionOwners;
-    }
-
-    @Override
-    public boolean finishSuperstep(List<PartitionStats> partitionStatsList) {
-        // This barrier blocks until success (or the master signals it to
-        // restart).
-        //
-        // Master will coordinate the barriers and aggregate "doneness" of all
-        // the vertices.  Each worker will:
-        // 1. Flush the unsent messages
-        // 2. Execute user postSuperstep() if necessary.
-        // 3. Save aggregator values that are in use.
-        // 4. Report the statistics (vertices, edges, messages, etc.)
-        //    of this worker
-        // 5. Let the master know it is finished.
-        // 6. Wait for the master's global stats, and check if done
-        long workerSentMessages = 0;
-        try {
-            workerSentMessages = commService.flush(getContext());
+                "loadCheckpoint: Already has partition owner " +
+                    partitionOwner);
+          }
+          ++loadedPartitions;
         } catch (IOException e) {
-            throw new IllegalStateException(
-                "finishSuperstep: flush failed", e);
-        }
-
-        if (getSuperstep() != INPUT_SUPERSTEP) {
-            getWorkerContext().postSuperstep();
-            getContext().progress();
-        }
-
-        if (LOG.isInfoEnabled()) {
-            LOG.info("finishSuperstep: Superstep " + getSuperstep() + " " +
-                      MemoryUtils.getRuntimeMemoryStats());
-        }
-
-        JSONArray aggregatorValueArray =
-            marshalAggregatorValues(getSuperstep());
-        Collection<PartitionStats> finalizedPartitionStats =
-            workerGraphPartitioner.finalizePartitionStats(
-                partitionStatsList, workerPartitionMap);
-        List<PartitionStats> finalizedPartitionStatsList =
-            new ArrayList<PartitionStats>(finalizedPartitionStats);
-        byte [] partitionStatsBytes =
-            WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
-        JSONObject workerFinishedInfoObj = new JSONObject();
-        try {
-            workerFinishedInfoObj.put(JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY,
-                                      aggregatorValueArray);
-            workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
-                                      Base64.encodeBytes(partitionStatsBytes));
-            workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY,
-                                      workerSentMessages);
-        } catch (JSONException e) {
-            throw new RuntimeException(e);
-        }
-        String finishedWorkerPath =
-            getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
-            "/" + getHostnamePartitionId();
-        try {
-            getZkExt().createExt(finishedWorkerPath,
-                                 workerFinishedInfoObj.toString().getBytes(),
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            LOG.warn("finishSuperstep: finished worker path " +
-                     finishedWorkerPath + " already exists!");
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        getContext().setStatus("finishSuperstep: (waiting for rest " +
-                               "of workers) " +
-                               getGraphMapper().getMapFunctions().toString() +
-                               " - Attempt=" + getApplicationAttempt() +
-                               ", Superstep=" + getSuperstep());
-
-        String superstepFinishedNode =
-            getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
-        try {
-            while (getZkExt().exists(superstepFinishedNode, true) == null) {
-                getSuperstepFinishedEvent().waitForever();
-                getSuperstepFinishedEvent().reset();
-            }
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "finishSuperstep: Failed while waiting for master to " +
-                "signal completion of superstep " + getSuperstep(), e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "finishSuperstep: Failed while waiting for master to " +
-                "signal completion of superstep " + getSuperstep(), e);
-        }
-        GlobalStats globalStats = new GlobalStats();
-        WritableUtils.readFieldsFromZnode(
-            getZkExt(), superstepFinishedNode, false, null, globalStats);
-        if (LOG.isInfoEnabled()) {
-            LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
-                     " with global stats " + globalStats);
-        }
-        incrCachedSuperstep();
-        getContext().setStatus("finishSuperstep: (all workers done) " +
-                               getGraphMapper().getMapFunctions().toString() +
-                               " - Attempt=" + getApplicationAttempt() +
-                               ", Superstep=" + getSuperstep());
-        getGraphMapper().getGraphState().
-            setNumEdges(globalStats.getEdgeCount()).
-            setNumVertices(globalStats.getVertexCount());
-        return ((globalStats.getFinishedVertexCount() ==
-                globalStats.getVertexCount()) &&
-                (globalStats.getMessageCount() == 0));
-    }
-
-    /**
-     * Save the vertices using the user-defined VertexOutputFormat from our
-     * vertexArray based on the split.
-     * @throws InterruptedException
-     */
-    private void saveVertices() throws IOException, InterruptedException {
-        if (getConfiguration().get(GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS)
-                == null) {
-            LOG.warn("saveVertices: " + GiraphJob.VERTEX_OUTPUT_FORMAT_CLASS +
-                     " not specified -- there will be no saved output");
-            return;
-        }
-
-        VertexOutputFormat<I, V, E> vertexOutputFormat =
-            BspUtils.<I, V, E>createVertexOutputFormat(getConfiguration());
-        VertexWriter<I, V, E> vertexWriter =
-            vertexOutputFormat.createVertexWriter(getContext());
-        vertexWriter.initialize(getContext());
-        for (Partition<I, V, E, M> partition : workerPartitionMap.values()) {
-            for (BasicVertex<I, V, E, M> vertex : partition.getVertices()) {
-                vertexWriter.writeVertex(vertex);
-            }
-        }
-        vertexWriter.close(getContext());
-    }
-
-    @Override
-    public void cleanup() throws IOException, InterruptedException {
-        commService.closeConnections();
-        setCachedSuperstep(getSuperstep() - 1);
-        saveVertices();
-         // All worker processes should denote they are done by adding special
-         // znode.  Once the number of znodes equals the number of partitions

[... 710 lines stripped ...]


Mime
View raw message