giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ni...@apache.org
Subject [14/51] [partial] GIRAPH-457: update module names (nitay)
Date Thu, 20 Dec 2012 04:25:31 GMT
http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
new file mode 100644
index 0000000..6e97e6c
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspServiceWorker.java
@@ -0,0 +1,1392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+
+import org.apache.giraph.bsp.ApplicationState;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.ServerData;
+import org.apache.giraph.comm.WorkerClient;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.WorkerServer;
+import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerClient;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.comm.netty.NettyWorkerServer;
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionExchange;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.PartitionStore;
+import org.apache.giraph.graph.partition.WorkerGraphPartitioner;
+import org.apache.giraph.metrics.GiraphMetrics;
+import org.apache.giraph.metrics.GiraphTimer;
+import org.apache.giraph.metrics.GiraphTimerContext;
+import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
+import org.apache.giraph.metrics.SuperstepMetricsRegistry;
+import org.apache.giraph.metrics.WorkerSuperstepMetrics;
+import org.apache.giraph.utils.LoggerUtils;
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.ProgressableUtils;
+import org.apache.giraph.utils.WritableUtils;
+import org.apache.giraph.zk.BspEvent;
+import org.apache.giraph.zk.PredicateLock;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.json.JSONArray;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import net.iharder.Base64;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * ZooKeeper-based implementation of {@link CentralizedServiceWorker}.
+ *
+ * @param <I> Vertex id
+ * @param <V> Vertex data
+ * @param <E> Edge data
+ * @param <M> Message data
+ */
+@SuppressWarnings("rawtypes")
+public class BspServiceWorker<I extends WritableComparable,
+    V extends Writable, E extends Writable, M extends Writable>
+    extends BspService<I, V, E, M>
+    implements CentralizedServiceWorker<I, V, E, M>,
+    ResetSuperstepMetricsObserver {
+  /** Name of gauge for time spent waiting on other workers */
+  public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
+  /** Class logger */
+  private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
+  /** My process health znode */
+  private String myHealthZnode;
+  /** Worker info */
+  private final WorkerInfo workerInfo;
+  /** Worker graph partitioner */
+  private final WorkerGraphPartitioner<I, V, E, M> workerGraphPartitioner;
+
+  /** IPC Client */
+  private final WorkerClient<I, V, E, M> workerClient;
+  /** IPC Server */
+  private final WorkerServer<I, V, E, M> workerServer;
+  /** Request processor for aggregator requests */
+  private final WorkerAggregatorRequestProcessor
+  workerAggregatorRequestProcessor;
+  /** Master info */
+  private MasterInfo masterInfo = new MasterInfo();
+  /** List of workers */
+  private List<WorkerInfo> workerInfoList = Lists.newArrayList();
+  /** Have the partition exchange children (workers) changed? */
+  private final BspEvent partitionExchangeChildrenChanged;
+
+  /** Worker Context */
+  private final WorkerContext workerContext;
+
+  /** Handler for aggregators */
+  private final WorkerAggregatorHandler aggregatorHandler;
+
+  // Per-Superstep Metrics
+  /** Timer for WorkerContext#postSuperstep */
+  private GiraphTimer wcPostSuperstepTimer;
+  /** Time spent waiting on requests to finish */
+  private GiraphTimer waitRequestsTimer;
+
+  /**
+   * Constructor for setting up the worker.
+   *
+   * @param serverPortList ZooKeeper server port list
+   * @param sessionMsecTimeout Msecs to timeout connecting to ZooKeeper
+   * @param context Mapper context
+   * @param graphMapper Graph mapper
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public BspServiceWorker(
+    String serverPortList,
+    int sessionMsecTimeout,
+    Mapper<?, ?, ?, ?>.Context context,
+    GraphMapper<I, V, E, M> graphMapper)
+    throws IOException, InterruptedException {
+    super(serverPortList, sessionMsecTimeout, context, graphMapper);
+    partitionExchangeChildrenChanged = new PredicateLock(context);
+    registerBspEvent(partitionExchangeChildrenChanged);
+    workerGraphPartitioner =
+        getGraphPartitionerFactory().createWorkerGraphPartitioner();
+    workerInfo = new WorkerInfo();
+    workerServer =
+        new NettyWorkerServer<I, V, E, M>(getConfiguration(), this, context);
+    workerInfo.setInetSocketAddress(workerServer.getMyAddress());
+    workerInfo.setTaskId(getTaskPartition());
+    workerClient =
+        new NettyWorkerClient<I, V, E, M>(context, getConfiguration(), this);
+
+    workerAggregatorRequestProcessor =
+        new NettyWorkerAggregatorRequestProcessor(getContext(),
+            getConfiguration(), this);
+
+    this.workerContext = getConfiguration().createWorkerContext(null);
+
+    aggregatorHandler =
+        new WorkerAggregatorHandler(this, getConfiguration(), context);
+
+    GiraphMetrics.get().addSuperstepResetObserver(this);
+  }
+
+  @Override
+  public void newSuperstep(SuperstepMetricsRegistry superstepMetrics) {
+    waitRequestsTimer = new GiraphTimer(superstepMetrics,
+        TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
+    wcPostSuperstepTimer = new GiraphTimer(superstepMetrics,
+        "worker-context-post-superstep", TimeUnit.MICROSECONDS);
+  }
+
+  @Override
+  public WorkerContext getWorkerContext() {
+    return workerContext;
+  }
+
+  @Override
+  public WorkerClient<I, V, E, M> getWorkerClient() {
+    return workerClient;
+  }
+
+  /**
+   * Intended to check the health of the node.  For instance, can it ssh,
+   * dmesg, etc. For now, does nothing.
+   * TODO: Make this check configurable by the user (i.e. search dmesg for
+   * problems).
+   *
+   * @return True if healthy (always in this case).
+   */
+  public boolean isHealthy() {
+    return true;
+  }
+
+  /**
+   * Load the vertices/edges from input slits. Do this until all the
+   * InputSplits have been processed.
+   * All workers will try to do as many InputSplits as they can.  The master
+   * will monitor progress and stop this once all the InputSplits have been
+   * loaded and check-pointed.  Keep track of the last input split path to
+   * ensure the input split cache is flushed prior to marking the last input
+   * split complete.
+   *
+   * Use one or more threads to do the loading.
+   *
+   * @param inputSplitPathList List of input split paths
+   * @param inputSplitsCallableFactory Factory for {@link InputSplitsCallable}s
+   * @return Statistics of the vertices and edges loaded
+   * @throws InterruptedException
+   * @throws KeeperException
+   */
+  private VertexEdgeCount loadInputSplits(
+      List<String> inputSplitPathList,
+      InputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory)
+    throws KeeperException, InterruptedException {
+    VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
+    // Determine how many threads to use based on the number of input splits
+    int maxInputSplitThreads =
+        Math.max(
+            inputSplitPathList.size() / getConfiguration().getMaxWorkers(), 1);
+    int numThreads = Math.min(getConfiguration().getNumInputSplitsThreads(),
+        maxInputSplitThreads);
+    ExecutorService inputSplitsExecutor =
+        Executors.newFixedThreadPool(numThreads,
+            new ThreadFactoryBuilder().setNameFormat("load-%d").build());
+    List<Future<VertexEdgeCount>> threadsFutures =
+        Lists.newArrayListWithCapacity(numThreads);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadInputSplits: Using " + numThreads + " thread(s), " +
+          "originally " + getConfiguration().getNumInputSplitsThreads() +
+          " threads(s) for " + inputSplitPathList.size() + " total splits.");
+    }
+    for (int i = 0; i < numThreads; ++i) {
+      Callable<VertexEdgeCount> inputSplitsCallable =
+          inputSplitsCallableFactory.newCallable();
+      threadsFutures.add(inputSplitsExecutor.submit(inputSplitsCallable));
+    }
+
+    // Wait until all the threads are done to wait on all requests
+    for (Future<VertexEdgeCount> threadFuture : threadsFutures) {
+      VertexEdgeCount threadVertexEdgeCount =
+          ProgressableUtils.getFutureResult(threadFuture, getContext());
+      vertexEdgeCount =
+          vertexEdgeCount.incrVertexEdgeCount(threadVertexEdgeCount);
+    }
+
+    workerClient.waitAllRequests();
+    inputSplitsExecutor.shutdown();
+    return vertexEdgeCount;
+  }
+
+
+  /**
+   * Load the vertices from the user-defined {@link VertexReader}
+   *
+   * @return Count of vertices and edges loaded
+   */
+  private VertexEdgeCount loadVertices() throws KeeperException,
+      InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(vertexInputSplitsPaths.getPath(),
+            false, false, true);
+
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null, null);
+
+    VertexInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+        new VertexInputSplitsCallableFactory<I, V, E, M>(
+            getContext(),
+            graphState,
+            getConfiguration(),
+            this,
+            inputSplitPathList,
+            getWorkerInfo(),
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory);
+  }
+
+  /**
+   * Load the edges from the user-defined {@link EdgeReader}.
+   *
+   * @return Number of edges loaded
+   */
+  private long loadEdges() throws KeeperException, InterruptedException {
+    List<String> inputSplitPathList =
+        getZkExt().getChildrenExt(edgeInputSplitsPaths.getPath(),
+            false, false, true);
+
+    GraphState<I, V, E, M> graphState = new GraphState<I, V, E, M>(
+        INPUT_SUPERSTEP, 0, 0, getContext(), getGraphMapper(),
+        null, null);
+
+    EdgeInputSplitsCallableFactory<I, V, E, M> inputSplitsCallableFactory =
+        new EdgeInputSplitsCallableFactory<I, V, E, M>(
+            getContext(),
+            graphState,
+            getConfiguration(),
+            this,
+            inputSplitPathList,
+            getWorkerInfo(),
+            getZkExt());
+
+    return loadInputSplits(inputSplitPathList, inputSplitsCallableFactory).
+        getEdgeCount();
+  }
+
+  @Override
+  public MasterInfo getMasterInfo() {
+    return masterInfo;
+  }
+
+  @Override
+  public List<WorkerInfo> getWorkerInfoList() {
+    return workerInfoList;
+  }
+
+  /**
+   * Ensure the input splits are ready for processing
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths,
+                                      InputSplitEvents inputSplitEvents) {
+    while (true) {
+      Stat inputSplitsReadyStat;
+      try {
+        inputSplitsReadyStat = getZkExt().exists(
+            inputSplitPaths.getAllReadyPath(), true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "KeeperException waiting on input splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("ensureInputSplitsReady: " +
+            "InterruptedException waiting on input splits", e);
+      }
+      if (inputSplitsReadyStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllReadyChanged().waitForever();
+      inputSplitEvents.getAllReadyChanged().reset();
+    }
+  }
+
+  /**
+   * Wait for all workers to finish processing input splits.
+   *
+   * @param inputSplitPaths Input split paths
+   * @param inputSplitEvents Input split events
+   */
+  private void waitForOtherWorkers(InputSplitPaths inputSplitPaths,
+                                   InputSplitEvents inputSplitEvents) {
+    String workerInputSplitsDonePath =
+        inputSplitPaths.getDonePath() + "/" +
+            getWorkerInfo().getHostnameId();
+    try {
+      getZkExt().createExt(workerInputSplitsDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "KeeperException creating worker done splits", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("waitForOtherWorkers: " +
+          "InterruptedException creating worker done splits", e);
+    }
+    while (true) {
+      Stat inputSplitsDoneStat;
+      try {
+        inputSplitsDoneStat =
+            getZkExt().exists(inputSplitPaths.getAllDonePath(),
+                true);
+      } catch (KeeperException e) {
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "KeeperException waiting on worker done splits", e);
+      } catch (InterruptedException e) {
+        throw new IllegalStateException("waitForOtherWorkers: " +
+            "InterruptedException waiting on worker done splits", e);
+      }
+      if (inputSplitsDoneStat != null) {
+        break;
+      }
+      inputSplitEvents.getAllDoneChanged().waitForever();
+      inputSplitEvents.getAllDoneChanged().reset();
+    }
+  }
+
+  @Override
+  public FinishedSuperstepStats setup() {
+    // Unless doing a restart, prepare for computation:
+    // 1. Start superstep INPUT_SUPERSTEP (no computation)
+    // 2. Wait until the INPUT_SPLIT_ALL_READY_PATH node has been created
+    // 3. Process input splits until there are no more.
+    // 4. Wait until the INPUT_SPLIT_ALL_DONE_PATH node has been created
+    // 5. Process any mutations deriving from add edge requests
+    // 6. Wait for superstep INPUT_SUPERSTEP to complete.
+    if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
+      setCachedSuperstep(getRestartedSuperstep());
+      return new FinishedSuperstepStats(false, -1, -1);
+    }
+
+    JSONObject jobState = getJobState();
+    if (jobState != null) {
+      try {
+        if ((ApplicationState.valueOf(jobState.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jobState.getLong(JSONOBJ_SUPERSTEP_KEY) ==
+            getSuperstep()) {
+          if (LOG.isInfoEnabled()) {
+            LOG.info("setup: Restarting from an automated " +
+                "checkpointed superstep " +
+                getSuperstep() + ", attempt " +
+                getApplicationAttempt());
+          }
+          setRestartedSuperstep(getSuperstep());
+          return new FinishedSuperstepStats(false, -1, -1);
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "setup: Failed to get key-values from " +
+                jobState.toString(), e);
+      }
+    }
+
+    // Add the partitions that this worker owns
+    GraphState<I, V, E, M> graphState =
+        new GraphState<I, V, E, M>(INPUT_SUPERSTEP, 0, 0,
+            getContext(), getGraphMapper(), null, null);
+    Collection<? extends PartitionOwner> masterSetPartitionOwners =
+        startSuperstep(graphState);
+    workerGraphPartitioner.updatePartitionOwners(
+        getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+
+/*if[HADOOP_NON_SECURE]
+    workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+    workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+
+    VertexEdgeCount vertexEdgeCount;
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Ensure the vertex InputSplits are ready for processing
+      ensureInputSplitsReady(vertexInputSplitsPaths, vertexInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = loadVertices();
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadVertices failed with KeeperException", e);
+      }
+      getContext().progress();
+    } else {
+      vertexEdgeCount = new VertexEdgeCount();
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Ensure the edge InputSplits are ready for processing
+      ensureInputSplitsReady(edgeInputSplitsPaths, edgeInputSplitsEvents);
+      getContext().progress();
+      try {
+        vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0, loadEdges());
+      } catch (InterruptedException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with InterruptedException", e);
+      } catch (KeeperException e) {
+        throw new IllegalStateException(
+            "setup: loadEdges failed with KeeperException", e);
+      }
+      getContext().progress();
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
+    }
+
+    if (getConfiguration().hasVertexInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(vertexInputSplitsPaths, vertexInputSplitsEvents);
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Workers wait for each other to finish, coordinated by master
+      waitForOtherWorkers(edgeInputSplitsPaths, edgeInputSplitsEvents);
+    }
+
+    // Create remaining partitions owned by this worker.
+    for (PartitionOwner partitionOwner : masterSetPartitionOwners) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) &&
+          !getPartitionStore().hasPartition(
+              partitionOwner.getPartitionId())) {
+        Partition<I, V, E, M> partition =
+            getConfiguration().createPartition(
+                partitionOwner.getPartitionId(), getContext());
+        getPartitionStore().addPartition(partition);
+      }
+    }
+
+    if (getConfiguration().hasEdgeInputFormat()) {
+      // Create vertices from added edges via vertex resolver.
+      // Doing this at the beginning of superstep 0 is not enough,
+      // because we want the vertex/edge stats to be accurate.
+      workerServer.resolveMutations(graphState);
+    }
+
+    // Generate the partition stats for the input superstep and process
+    // if necessary
+    List<PartitionStats> partitionStatsList =
+        new ArrayList<PartitionStats>();
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
+      PartitionStats partitionStats =
+          new PartitionStats(partition.getId(),
+              partition.getVertexCount(),
+              0,
+              partition.getEdgeCount(),
+              0);
+      partitionStatsList.add(partitionStats);
+    }
+    workerGraphPartitioner.finalizePartitionStats(
+        partitionStatsList, getPartitionStore());
+
+    return finishSuperstep(graphState, partitionStatsList);
+  }
+
+  /**
+   * Register the health of this worker for a given superstep
+   *
+   * @param superstep Superstep to register health on
+   */
+  private void registerHealth(long superstep) {
+    JSONArray hostnamePort = new JSONArray();
+    hostnamePort.put(getHostname());
+
+    hostnamePort.put(workerInfo.getPort());
+
+    String myHealthPath = null;
+    if (isHealthy()) {
+      myHealthPath = getWorkerInfoHealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    } else {
+      myHealthPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(),
+          getSuperstep());
+    }
+    myHealthPath = myHealthPath + "/" + workerInfo.getHostnameId();
+    try {
+      myHealthZnode = getZkExt().createExt(
+          myHealthPath,
+          WritableUtils.writeToByteArray(workerInfo),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.EPHEMERAL,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("registerHealth: myHealthPath already exists (likely " +
+          "from previous failure): " + myHealthPath +
+          ".  Waiting for change in attempts " +
+          "to re-join the application");
+      getApplicationAttemptChangedEvent().waitForever();
+      if (LOG.isInfoEnabled()) {
+        LOG.info("registerHealth: Got application " +
+            "attempt changed event, killing self");
+      }
+      throw new IllegalStateException(
+          "registerHealth: Trying " +
+              "to get the new application attempt by killing self", e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + myHealthPath +
+          " failed with InterruptedException", e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("registerHealth: Created my health node for attempt=" +
+          getApplicationAttempt() + ", superstep=" +
+          getSuperstep() + " with " + myHealthZnode +
+          " and workerInfo= " + workerInfo);
+    }
+  }
+
+  /**
+   * Do this to help notify the master quicker that this worker has failed.
+   */
+  private void unregisterHealth() {
+    LOG.error("unregisterHealth: Got failure, unregistering health on " +
+        myHealthZnode + " on superstep " + getSuperstep());
+    try {
+      getZkExt().deleteExt(myHealthZnode, -1, false);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: InterruptedException - Couldn't delete " +
+              myHealthZnode, e);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "unregisterHealth: KeeperException - Couldn't delete " +
+              myHealthZnode, e);
+    }
+  }
+
+  @Override
+  public void failureCleanup() {
+    unregisterHealth();
+  }
+
+  @Override
+  public Collection<? extends PartitionOwner> startSuperstep(
+      GraphState<I, V, E, M> graphState) {
+    // Algorithm:
+    // 1. Communication service will combine message from previous
+    //    superstep
+    // 2. Register my health for the next superstep.
+    // 3. Wait until the partition assignment is complete and get it
+    // 4. Get the aggregator values from the previous superstep
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      workerServer.prepareSuperstep(graphState);
+    }
+
+    registerHealth(getSuperstep());
+
+    String addressesAndPartitionsPath =
+        getAddressesAndPartitionsPath(getApplicationAttempt(),
+            getSuperstep());
+    AddressesAndPartitionsWritable addressesAndPartitions =
+        new AddressesAndPartitionsWritable(
+            workerGraphPartitioner.createPartitionOwner().getClass());
+    try {
+      while (getZkExt().exists(addressesAndPartitionsPath, true) ==
+          null) {
+        getAddressesAndPartitionsReadyChangedEvent().waitForever();
+        getAddressesAndPartitionsReadyChangedEvent().reset();
+      }
+      WritableUtils.readFieldsFromZnode(
+          getZkExt(),
+          addressesAndPartitionsPath,
+          false,
+          null,
+          addressesAndPartitions);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "startSuperstep: KeeperException getting assignments", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "startSuperstep: InterruptedException getting assignments", e);
+    }
+
+    workerInfoList.clear();
+    workerInfoList = addressesAndPartitions.getWorkerInfos();
+    masterInfo = addressesAndPartitions.getMasterInfo();
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("startSuperstep: " + masterInfo);
+      LOG.info("startSuperstep: Ready for computation on superstep " +
+          getSuperstep() + " since worker " +
+          "selection and vertex range assignments are done in " +
+          addressesAndPartitionsPath);
+    }
+
+    getContext().setStatus("startSuperstep: " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+    return addressesAndPartitions.getPartitionOwners();
+  }
+
+  @Override
+  public FinishedSuperstepStats finishSuperstep(
+      GraphState<I, V, E, M> graphState,
+      List<PartitionStats> partitionStatsList) {
+    // This barrier blocks until success (or the master signals it to
+    // restart).
+    //
+    // Master will coordinate the barriers and aggregate "doneness" of all
+    // the vertices.  Each worker will:
+    // 1. Ensure that the requests are complete
+    // 2. Execute user postSuperstep() if necessary.
+    // 3. Save aggregator values that are in use.
+    // 4. Report the statistics (vertices, edges, messages, etc.)
+    //    of this worker
+    // 5. Let the master know it is finished.
+    // 6. Wait for the master's global stats, and check if done
+    waitForRequestsToFinish();
+
+    graphState.getGraphMapper().notifyFinishedCommunication();
+
+    long workerSentMessages = 0;
+    for (PartitionStats partitionStats : partitionStatsList) {
+      workerSentMessages += partitionStats.getMessagesSentCount();
+    }
+
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      getWorkerContext().setGraphState(graphState);
+      GiraphTimerContext timerContext = wcPostSuperstepTimer.time();
+      getWorkerContext().postSuperstep();
+      timerContext.stop();
+      getContext().progress();
+    }
+
+    aggregatorHandler.finishSuperstep(workerAggregatorRequestProcessor);
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Superstep " + getSuperstep() +
+          ", messages = " + workerSentMessages + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+
+    writeFinshedSuperstepInfoToZK(partitionStatsList, workerSentMessages);
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "finishSuperstep: (waiting for rest " +
+            "of workers) " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
+
+    String superstepFinishedNode =
+        getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
+
+    waitForOtherWorkers(superstepFinishedNode);
+
+    GlobalStats globalStats = new GlobalStats();
+    WritableUtils.readFieldsFromZnode(
+        getZkExt(), superstepFinishedNode, false, null, globalStats);
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Completed superstep " + getSuperstep() +
+          " with global stats " + globalStats);
+    }
+    incrCachedSuperstep();
+    getContext().setStatus("finishSuperstep: (all workers done) " +
+        getGraphMapper().getMapFunctions().toString() +
+        " - Attempt=" + getApplicationAttempt() +
+        ", Superstep=" + getSuperstep());
+
+    return new FinishedSuperstepStats(
+        globalStats.getHaltComputation(),
+        globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
+  }
+
+  /**
+   * Wait for all the requests to finish.
+   */
+  private void waitForRequestsToFinish() {
+    if (LOG.isInfoEnabled()) {
+      LOG.info("finishSuperstep: Waiting on all requests, superstep " +
+          getSuperstep() + " " +
+          MemoryUtils.getRuntimeMemoryStats());
+    }
+    GiraphTimerContext timerContext = waitRequestsTimer.time();
+    workerClient.waitAllRequests();
+    timerContext.stop();
+  }
+
+  /**
+   * Wait for all the other Workers to finish the superstep.
+   *
+   * @param superstepFinishedNode ZooKeeper path to wait on.
+   */
+  private void waitForOtherWorkers(String superstepFinishedNode) {
+    try {
+      while (getZkExt().exists(superstepFinishedNode, true) == null) {
+        getSuperstepFinishedEvent().waitForever();
+        getSuperstepFinishedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "finishSuperstep: Failed while waiting for master to " +
+              "signal completion of superstep " + getSuperstep(), e);
+    }
+  }
+
+  /**
+   * Write finished superstep info to ZooKeeper.
+   *
+   * @param partitionStatsList List of partition stats from superstep.
+   * @param workerSentMessages Number of messages sent in superstep.
+   */
+  private void writeFinshedSuperstepInfoToZK(
+      List<PartitionStats> partitionStatsList, long workerSentMessages) {
+    Collection<PartitionStats> finalizedPartitionStats =
+        workerGraphPartitioner.finalizePartitionStats(
+            partitionStatsList, getPartitionStore());
+    List<PartitionStats> finalizedPartitionStatsList =
+        new ArrayList<PartitionStats>(finalizedPartitionStats);
+    byte[] partitionStatsBytes =
+        WritableUtils.writeListToByteArray(finalizedPartitionStatsList);
+    WorkerSuperstepMetrics metrics = new WorkerSuperstepMetrics();
+    metrics.readFromRegistry();
+    byte[] metricsBytes = WritableUtils.writeToByteArray(metrics);
+
+    JSONObject workerFinishedInfoObj = new JSONObject();
+    try {
+      workerFinishedInfoObj.put(JSONOBJ_PARTITION_STATS_KEY,
+          Base64.encodeBytes(partitionStatsBytes));
+      workerFinishedInfoObj.put(JSONOBJ_NUM_MESSAGES_KEY, workerSentMessages);
+      workerFinishedInfoObj.put(JSONOBJ_METRICS_KEY,
+          Base64.encodeBytes(metricsBytes));
+    } catch (JSONException e) {
+      throw new RuntimeException(e);
+    }
+
+    String finishedWorkerPath =
+        getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) +
+        "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(finishedWorkerPath,
+          workerFinishedInfoObj.toString().getBytes(),
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: finished worker path " +
+          finishedWorkerPath + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " + finishedWorkerPath +
+          " failed with InterruptedException", e);
+    }
+  }
+
+  /**
+   * Save the vertices using the user-defined VertexOutputFormat from our
+   * vertexArray based on the split.
+   * @throws InterruptedException
+   */
+  private void saveVertices() throws IOException, InterruptedException {
+    if (getConfiguration().getVertexOutputFormatClass() == null) {
+      LOG.warn("saveVertices: " +
+          GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS +
+          " not specified -- there will be no saved output");
+      return;
+    }
+
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Starting to save vertices");
+    VertexOutputFormat<I, V, E> vertexOutputFormat =
+        getConfiguration().createVertexOutputFormat();
+    VertexWriter<I, V, E> vertexWriter =
+        vertexOutputFormat.createVertexWriter(getContext());
+    vertexWriter.initialize(getContext());
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
+      for (Vertex<I, V, E, M> vertex : partition) {
+        getContext().progress();
+        vertexWriter.writeVertex(vertex);
+      }
+      getContext().progress();
+    }
+    vertexWriter.close(getContext());
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "saveVertices: Done saving vertices");
+  }
+
+  @Override
+  public void cleanup() throws IOException, InterruptedException {
+    workerClient.closeConnections();
+    setCachedSuperstep(getSuperstep() - 1);
+    saveVertices();
+    // All worker processes should denote they are done by adding special
+    // znode.  Once the number of znodes equals the number of partitions
+    // for workers and masters, the master will clean up the ZooKeeper
+    // znodes associated with this job.
+    String workerCleanedUpPath = cleanedUpPath  + "/" +
+        getTaskPartition() + WORKER_SUFFIX;
+    try {
+      String finalFinishedPath =
+          getZkExt().createExt(workerCleanedUpPath,
+              null,
+              Ids.OPEN_ACL_UNSAFE,
+              CreateMode.PERSISTENT,
+              true);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Notifying master its okay to cleanup with " +
+            finalFinishedPath);
+      }
+    } catch (KeeperException.NodeExistsException e) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("cleanup: Couldn't create finished node '" +
+            workerCleanedUpPath);
+      }
+    } catch (KeeperException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got KeeperException on notification " +
+          "to master about cleanup", e);
+    } catch (InterruptedException e) {
+      // Cleaning up, it's okay to fail after cleanup is successful
+      LOG.error("cleanup: Got InterruptedException on notification " +
+          "to master about cleanup", e);
+    }
+    try {
+      getZkExt().close();
+    } catch (InterruptedException e) {
+      // cleanup phase -- just log the error
+      LOG.error("cleanup: Zookeeper failed to close with " + e);
+    }
+
+    if (getConfiguration().metricsEnabled()) {
+      GiraphMetrics.get().dumpToStdout();
+    }
+
+    // Preferably would shut down the service only after
+    // all clients have disconnected (or the exceptions on the
+    // client side ignored).
+    workerServer.close();
+  }
+
+  @Override
+  public void storeCheckpoint() throws IOException {
+    LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO,
+        "storeCheckpoint: Starting checkpoint " +
+            getGraphMapper().getMapFunctions().toString() +
+            " - Attempt=" + getApplicationAttempt() +
+            ", Superstep=" + getSuperstep());
+
+    // Algorithm:
+    // For each partition, dump vertices and messages
+    Path metadataFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_METADATA_POSTFIX);
+    Path verticesFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VERTICES_POSTFIX);
+    Path validFilePath =
+        new Path(getCheckpointBasePath(getSuperstep()) + "." +
+            getHostnamePartitionId() +
+            CHECKPOINT_VALID_POSTFIX);
+
+    // Remove these files if they already exist (shouldn't though, unless
+    // of previous failure of this worker)
+    if (getFs().delete(validFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed valid file " +
+          validFilePath);
+    }
+    if (getFs().delete(metadataFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed metadata file " +
+          metadataFilePath);
+    }
+    if (getFs().delete(verticesFilePath, false)) {
+      LOG.warn("storeCheckpoint: Removed file " + verticesFilePath);
+    }
+
+    FSDataOutputStream verticesOutputStream =
+        getFs().create(verticesFilePath);
+    ByteArrayOutputStream metadataByteStream = new ByteArrayOutputStream();
+    DataOutput metadataOutput = new DataOutputStream(metadataByteStream);
+    for (Partition<I, V, E, M> partition :
+        getPartitionStore().getPartitions()) {
+      long startPos = verticesOutputStream.getPos();
+      partition.write(verticesOutputStream);
+      // write messages
+      getServerData().getCurrentMessageStore().writePartition(
+          verticesOutputStream, partition.getId());
+      // Write the metadata for this partition
+      // Format:
+      // <index count>
+      //   <index 0 start pos><partition id>
+      //   <index 1 start pos><partition id>
+      metadataOutput.writeLong(startPos);
+      metadataOutput.writeInt(partition.getId());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("storeCheckpoint: Vertex file starting " +
+            "offset = " + startPos + ", length = " +
+            (verticesOutputStream.getPos() - startPos) +
+            ", partition = " + partition.toString());
+      }
+      getContext().progress();
+    }
+    // Metadata is buffered and written at the end since it's small and
+    // needs to know how many partitions this worker owns
+    FSDataOutputStream metadataOutputStream =
+        getFs().create(metadataFilePath);
+    metadataOutputStream.writeInt(getPartitionStore().getNumPartitions());
+    metadataOutputStream.write(metadataByteStream.toByteArray());
+    metadataOutputStream.close();
+    verticesOutputStream.close();
+    if (LOG.isInfoEnabled()) {
+      LOG.info("storeCheckpoint: Finished metadata (" +
+          metadataFilePath + ") and vertices (" + verticesFilePath + ").");
+    }
+
+    getFs().createNewFile(validFilePath);
+
+    // Notify master that checkpoint is stored
+    String workerWroteCheckpoint =
+        getWorkerWroteCheckpointPath(getApplicationAttempt(),
+            getSuperstep()) + "/" + getHostnamePartitionId();
+    try {
+      getZkExt().createExt(workerWroteCheckpoint,
+          new byte[0],
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException.NodeExistsException e) {
+      LOG.warn("finishSuperstep: wrote checkpoint worker path " +
+          workerWroteCheckpoint + " already exists!");
+    } catch (KeeperException e) {
+      throw new IllegalStateException("Creating " + workerWroteCheckpoint +
+          " failed with KeeperException", e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException("Creating " +
+          workerWroteCheckpoint +
+          " failed with InterruptedException", e);
+    }
+  }
+
+  @Override
+  public VertexEdgeCount loadCheckpoint(long superstep) {
+    try {
+      // clear old message stores
+      getServerData().getIncomingMessageStore().clearAll();
+      getServerData().getCurrentMessageStore().clearAll();
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "loadCheckpoint: Failed to clear message stores ", e);
+    }
+
+    // Algorithm:
+    // Examine all the partition owners and load the ones
+    // that match my hostname and id from the master designated checkpoint
+    // prefixes.
+    long startPos = 0;
+    int loadedPartitions = 0;
+    for (PartitionOwner partitionOwner :
+      workerGraphPartitioner.getPartitionOwners()) {
+      if (partitionOwner.getWorkerInfo().equals(getWorkerInfo())) {
+        String metadataFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_METADATA_POSTFIX;
+        String partitionsFile =
+            partitionOwner.getCheckpointFilesPrefix() +
+            CHECKPOINT_VERTICES_POSTFIX;
+        try {
+          int partitionId = -1;
+          DataInputStream metadataStream =
+              getFs().open(new Path(metadataFile));
+          int partitions = metadataStream.readInt();
+          for (int i = 0; i < partitions; ++i) {
+            startPos = metadataStream.readLong();
+            partitionId = metadataStream.readInt();
+            if (partitionId == partitionOwner.getPartitionId()) {
+              break;
+            }
+          }
+          if (partitionId != partitionOwner.getPartitionId()) {
+            throw new IllegalStateException(
+                "loadCheckpoint: " + partitionOwner +
+                " not found!");
+          }
+          metadataStream.close();
+          Partition<I, V, E, M> partition =
+              getConfiguration().createPartition(partitionId, getContext());
+          DataInputStream partitionsStream =
+              getFs().open(new Path(partitionsFile));
+          if (partitionsStream.skip(startPos) != startPos) {
+            throw new IllegalStateException(
+                "loadCheckpoint: Failed to skip " + startPos +
+                " on " + partitionsFile);
+          }
+          partition.readFields(partitionsStream);
+          if (partitionsStream.readBoolean()) {
+            getServerData().getCurrentMessageStore().readFieldsForPartition(
+                partitionsStream, partitionId);
+          }
+          partitionsStream.close();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("loadCheckpoint: Loaded partition " +
+                partition);
+          }
+          if (getPartitionStore().hasPartition(partitionId)) {
+            throw new IllegalStateException(
+                "loadCheckpoint: Already has partition owner " +
+                    partitionOwner);
+          }
+          getPartitionStore().addPartition(partition);
+          getContext().progress();
+          ++loadedPartitions;
+        } catch (IOException e) {
+          throw new RuntimeException(
+              "loadCheckpoint: Failed to get partition owner " +
+                  partitionOwner, e);
+        }
+      }
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("loadCheckpoint: Loaded " + loadedPartitions +
+          " partitions of out " +
+          workerGraphPartitioner.getPartitionOwners().size() +
+          " total.");
+    }
+
+    // Load global statistics
+    GlobalStats globalStats = null;
+    String finalizedCheckpointPath =
+        getCheckpointBasePath(superstep) + CHECKPOINT_FINALIZED_POSTFIX;
+    try {
+      DataInputStream finalizedStream =
+          getFs().open(new Path(finalizedCheckpointPath));
+      globalStats = new GlobalStats();
+      globalStats.readFields(finalizedStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(
+          "loadCheckpoint: Failed to load global statistics", e);
+    }
+
+    // Communication service needs to setup the connections prior to
+    // processing vertices
+/*if[HADOOP_NON_SECURE]
+    workerClient.setup();
+else[HADOOP_NON_SECURE]*/
+    workerClient.setup(getConfiguration().authenticate());
+/*end[HADOOP_NON_SECURE]*/
+    return new VertexEdgeCount(globalStats.getVertexCount(),
+        globalStats.getEdgeCount());
+  }
+
+  /**
+   * Send the worker partitions to their destination workers
+   *
+   * @param workerPartitionMap Map of worker info to the partitions stored
+   *        on this worker to be sent
+   */
+  private void sendWorkerPartitions(
+      Map<WorkerInfo, List<Integer>> workerPartitionMap) {
+    List<Entry<WorkerInfo, List<Integer>>> randomEntryList =
+        new ArrayList<Entry<WorkerInfo, List<Integer>>>(
+            workerPartitionMap.entrySet());
+    Collections.shuffle(randomEntryList);
+    WorkerClientRequestProcessor<I, V, E, M> workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(getContext(),
+            getConfiguration(), this);
+    for (Entry<WorkerInfo, List<Integer>> workerPartitionList :
+      randomEntryList) {
+      for (Integer partitionId : workerPartitionList.getValue()) {
+        Partition<I, V, E, M> partition =
+            getPartitionStore().removePartition(partitionId);
+        if (partition == null) {
+          throw new IllegalStateException(
+              "sendWorkerPartitions: Couldn't find partition " +
+                  partitionId + " to send to " +
+                  workerPartitionList.getKey());
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("sendWorkerPartitions: Sending worker " +
+              workerPartitionList.getKey() + " partition " +
+              partitionId);
+        }
+        workerClientRequestProcessor.sendPartitionRequest(
+            workerPartitionList.getKey(),
+            partition);
+      }
+    }
+
+
+    try {
+      workerClientRequestProcessor.flush();
+      workerClient.waitAllRequests();
+    } catch (IOException e) {
+      throw new IllegalStateException("sendWorkerPartitions: Flush failed", e);
+    }
+    String myPartitionExchangeDonePath =
+        getPartitionExchangeWorkerPath(
+            getApplicationAttempt(), getSuperstep(), getWorkerInfo());
+    try {
+      getZkExt().createExt(myPartitionExchangeDonePath,
+          null,
+          Ids.OPEN_ACL_UNSAFE,
+          CreateMode.PERSISTENT,
+          true);
+    } catch (KeeperException e) {
+      throw new IllegalStateException(
+          "sendWorkerPartitions: KeeperException to create " +
+              myPartitionExchangeDonePath, e);
+    } catch (InterruptedException e) {
+      throw new IllegalStateException(
+          "sendWorkerPartitions: InterruptedException to create " +
+              myPartitionExchangeDonePath, e);
+    }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("sendWorkerPartitions: Done sending all my partitions.");
+    }
+  }
+
+  @Override
+  public final void exchangeVertexPartitions(
+      Collection<? extends PartitionOwner> masterSetPartitionOwners) {
+    // 1. Fix the addresses of the partition ids if they have changed.
+    // 2. Send all the partitions to their destination workers in a random
+    //    fashion.
+    // 3. Notify completion with a ZooKeeper stamp
+    // 4. Wait for all my dependencies to be done (if any)
+    // 5. Add the partitions to myself.
+    PartitionExchange partitionExchange =
+        workerGraphPartitioner.updatePartitionOwners(
+            getWorkerInfo(), masterSetPartitionOwners, getPartitionStore());
+    workerClient.openConnections();
+
+    Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap =
+        partitionExchange.getSendWorkerPartitionMap();
+    if (!getPartitionStore().isEmpty()) {
+      sendWorkerPartitions(sendWorkerPartitionMap);
+    }
+
+    Set<WorkerInfo> myDependencyWorkerSet =
+        partitionExchange.getMyDependencyWorkerSet();
+    Set<String> workerIdSet = new HashSet<String>();
+    for (WorkerInfo tmpWorkerInfo : myDependencyWorkerSet) {
+      if (!workerIdSet.add(tmpWorkerInfo.getHostnameId())) {
+        throw new IllegalStateException(
+            "exchangeVertexPartitions: Duplicate entry " + tmpWorkerInfo);
+      }
+    }
+    if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("exchangeVertexPartitions: Nothing to exchange, " +
+            "exiting early");
+      }
+      return;
+    }
+
+    String vertexExchangePath =
+        getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
+    List<String> workerDoneList;
+    try {
+      while (true) {
+        workerDoneList = getZkExt().getChildrenExt(
+            vertexExchangePath, true, false, false);
+        workerIdSet.removeAll(workerDoneList);
+        if (workerIdSet.isEmpty()) {
+          break;
+        }
+        if (LOG.isInfoEnabled()) {
+          LOG.info("exchangeVertexPartitions: Waiting for workers " +
+              workerIdSet);
+        }
+        getPartitionExchangeChildrenChangedEvent().waitForever();
+        getPartitionExchangeChildrenChangedEvent().reset();
+      }
+    } catch (KeeperException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (LOG.isInfoEnabled()) {
+      LOG.info("exchangeVertexPartitions: Done with exchange.");
+    }
+  }
+
+  /**
+   * Get event when the state of a partition exchange has changed.
+   *
+   * @return Event to check.
+   */
+  public final BspEvent getPartitionExchangeChildrenChangedEvent() {
+    return partitionExchangeChildrenChanged;
+  }
+
+  @Override
+  protected boolean processEvent(WatchedEvent event) {
+    boolean foundEvent = false;
+    if (event.getPath().startsWith(masterJobStatePath) &&
+        (event.getType() == EventType.NodeChildrenChanged)) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("processEvent: Job state changed, checking " +
+            "to see if it needs to restart");
+      }
+      JSONObject jsonObj = getJobState();
+      try {
+        if ((ApplicationState.valueOf(jsonObj.getString(JSONOBJ_STATE_KEY)) ==
+            ApplicationState.START_SUPERSTEP) &&
+            jsonObj.getLong(JSONOBJ_APPLICATION_ATTEMPT_KEY) !=
+            getApplicationAttempt()) {
+          LOG.fatal("processEvent: Worker will restart " +
+              "from command - " + jsonObj.toString());
+          System.exit(-1);
+        }
+      } catch (JSONException e) {
+        throw new RuntimeException(
+            "processEvent: Couldn't properly get job state from " +
+                jsonObj.toString());
+      }
+      foundEvent = true;
+    } else if (event.getPath().contains(PARTITION_EXCHANGE_DIR) &&
+        event.getType() == EventType.NodeChildrenChanged) {
+      if (LOG.isInfoEnabled()) {
+        LOG.info("processEvent : partitionExchangeChildrenChanged " +
+            "(at least one worker is done sending partitions)");
+      }
+      partitionExchangeChildrenChanged.signal();
+      foundEvent = true;
+    }
+
+    return foundEvent;
+  }
+
+  @Override
+  public WorkerInfo getWorkerInfo() {
+    return workerInfo;
+  }
+
+  @Override
+  public PartitionStore<I, V, E, M> getPartitionStore() {
+    return getServerData().getPartitionStore();
+  }
+
+  @Override
+  public PartitionOwner getVertexPartitionOwner(I vertexId) {
+    return workerGraphPartitioner.getPartitionOwner(vertexId);
+  }
+
+  @Override
+  public Iterable<? extends PartitionOwner> getPartitionOwners() {
+    return workerGraphPartitioner.getPartitionOwners();
+  }
+
+  @Override
+  public Partition<I, V, E, M> getPartition(I vertexId) {
+    return getPartitionStore().getPartition(getPartitionId(vertexId));
+  }
+
+  @Override
+  public Integer getPartitionId(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+    return partitionOwner.getPartitionId();
+  }
+
+  @Override
+  public boolean hasPartition(Integer partitionId) {
+    return getPartitionStore().hasPartition(partitionId);
+  }
+
+  @Override
+  public Vertex<I, V, E, M> getVertex(I vertexId) {
+    PartitionOwner partitionOwner = getVertexPartitionOwner(vertexId);
+    if (getPartitionStore().hasPartition(partitionOwner.getPartitionId())) {
+      return getPartitionStore().getPartition(
+          partitionOwner.getPartitionId()).getVertex(vertexId);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public ServerData<I, V, E, M> getServerData() {
+    return workerServer.getServerData();
+  }
+
+  @Override
+  public WorkerAggregatorHandler getAggregatorHandler() {
+    return aggregatorHandler;
+  }
+
+  @Override
+  public void prepareSuperstep() {
+    if (getSuperstep() != INPUT_SUPERSTEP) {
+      aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java b/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java
new file mode 100644
index 0000000..651290d
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/BspUtils.java
@@ -0,0 +1,523 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.giraph.conf.GiraphConstants;
+import org.apache.giraph.graph.partition.GraphPartitionerFactory;
+import org.apache.giraph.graph.partition.HashPartitionerFactory;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * Help to use the configuration to get the appropriate classes or
+ * instantiate them.
+ */
+public class BspUtils {
+  /**
+   * Do not construct.
+   */
+  private BspUtils() { }
+
+  /**
+   * Get the user's subclassed {@link GraphPartitionerFactory}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's graph partitioner
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  Class<? extends GraphPartitionerFactory<I, V, E, M>>
+  getGraphPartitionerClass(Configuration conf) {
+    return (Class<? extends GraphPartitionerFactory<I, V, E, M>>)
+      conf.getClass(GiraphConstants.GRAPH_PARTITIONER_FACTORY_CLASS,
+        HashPartitionerFactory.class,
+        GraphPartitionerFactory.class);
+  }
+
+  /**
+   * Create a user graph partitioner class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user graph partitioner class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  GraphPartitionerFactory<I, V, E, M>
+  createGraphPartitioner(Configuration conf) {
+    Class<? extends GraphPartitionerFactory<I, V, E, M>>
+    graphPartitionerFactoryClass = getGraphPartitionerClass(conf);
+    return ReflectionUtils.newInstance(graphPartitionerFactoryClass, conf);
+  }
+
+  /**
+   * Create a user graph partitioner partition stats class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user graph partition stats class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  PartitionStats createGraphPartitionStats(Configuration conf) {
+    GraphPartitionerFactory<I, V, E, M> graphPartitioner =
+      createGraphPartitioner(conf);
+    return graphPartitioner.createMasterGraphPartitioner().
+      createPartitionStats();
+  }
+
+  /**
+   * Get the user's subclassed {@link VertexInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex input format class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable,
+  V extends Writable,
+  E extends Writable,
+  M extends Writable>
+  Class<? extends VertexInputFormat<I, V, E, M>>
+  getVertexInputFormatClass(Configuration conf) {
+    return (Class<? extends VertexInputFormat<I, V, E, M>>)
+      conf.getClass(GiraphConstants.VERTEX_INPUT_FORMAT_CLASS,
+        null,
+        VertexInputFormat.class);
+  }
+
+  /**
+   * Create a user vertex input format class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex input format class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable,
+  V extends Writable,
+  E extends Writable,
+  M extends Writable> VertexInputFormat<I, V, E, M>
+  createVertexInputFormat(Configuration conf) {
+    Class<? extends VertexInputFormat<I, V, E, M>> vertexInputFormatClass =
+      getVertexInputFormatClass(conf);
+    VertexInputFormat<I, V, E, M> inputFormat =
+      ReflectionUtils.newInstance(vertexInputFormatClass, conf);
+    return inputFormat;
+  }
+
+  /**
+   * Get the user's subclassed {@link VertexOutputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return User's vertex output format class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable,
+  V extends Writable,
+  E extends Writable>
+  Class<? extends VertexOutputFormat<I, V, E>>
+  getVertexOutputFormatClass(Configuration conf) {
+    return (Class<? extends VertexOutputFormat<I, V, E>>)
+      conf.getClass(GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS,
+        null,
+        VertexOutputFormat.class);
+  }
+
+  /**
+   * Create a user vertex output format class
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex output format class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable> VertexOutputFormat<I, V, E>
+  createVertexOutputFormat(Configuration conf) {
+    Class<? extends VertexOutputFormat<I, V, E>> vertexOutputFormatClass =
+      getVertexOutputFormatClass(conf);
+    return ReflectionUtils.newInstance(vertexOutputFormatClass, conf);
+  }
+
+  /**
+   * Get the user's subclassed {@link EdgeInputFormat}.
+   *
+   * @param <I> Vertex id
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return User's edge input format class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, E extends Writable>
+  Class<? extends EdgeInputFormat<I, E>>
+  getEdgeInputFormatClass(Configuration conf) {
+    return (Class<? extends EdgeInputFormat<I, E>>)
+        conf.getClass(GiraphConstants.EDGE_INPUT_FORMAT_CLASS,
+            null,
+            EdgeInputFormat.class);
+  }
+
+  /**
+   * Create a user edge input format class
+   *
+   * @param <I> Vertex id
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return Instantiated user edge input format class
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, E extends Writable>
+  EdgeInputFormat<I, E> createEdgeInputFormat(Configuration conf) {
+    Class<? extends EdgeInputFormat<I, E>> edgeInputFormatClass =
+        getEdgeInputFormatClass(conf);
+    EdgeInputFormat<I, E> inputFormat =
+        ReflectionUtils.newInstance(edgeInputFormatClass, conf);
+    return inputFormat;
+  }
+
+  /**
+   * Get the user's subclassed {@link AggregatorWriter}.
+   *
+   * @param conf Configuration to check
+   * @return User's aggregator writer class
+   */
+  public static Class<? extends AggregatorWriter>
+  getAggregatorWriterClass(Configuration conf) {
+    return conf.getClass(GiraphConstants.AGGREGATOR_WRITER_CLASS,
+        TextAggregatorWriter.class,
+        AggregatorWriter.class);
+  }
+
+  /**
+   * Create a user aggregator output format class
+   *
+   * @param conf Configuration to check
+   * @return Instantiated user aggregator writer class
+   */
+  public static AggregatorWriter createAggregatorWriter(Configuration conf) {
+    Class<? extends AggregatorWriter> aggregatorWriterClass =
+      getAggregatorWriterClass(conf);
+    return ReflectionUtils.newInstance(aggregatorWriterClass, conf);
+  }
+
+  /**
+   * Get the user's subclassed {@link Combiner}.
+   *
+   * @param <I> Vertex id
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex combiner class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, M extends Writable>
+  Class<? extends Combiner<I, M>> getCombinerClass(Configuration conf) {
+    return (Class<? extends Combiner<I, M>>)
+      conf.getClass(GiraphConstants.VERTEX_COMBINER_CLASS,
+        null,
+        Combiner.class);
+  }
+
+  /**
+   * Get the user's subclassed VertexResolver.
+   *
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex resolver class
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  Class<? extends VertexResolver<I, V, E, M>>
+  getVertexResolverClass(Configuration conf) {
+    return (Class<? extends VertexResolver<I, V, E, M>>)
+      conf.getClass(GiraphConstants.VERTEX_RESOLVER_CLASS,
+        DefaultVertexResolver.class,
+        VertexResolver.class);
+  }
+
+  /**
+   * Get the user's subclassed WorkerContext.
+   *
+   * @param conf Configuration to check
+   * @return User's worker context class
+   */
+  public static Class<? extends WorkerContext>
+  getWorkerContextClass(Configuration conf) {
+    return (Class<? extends WorkerContext>)
+      conf.getClass(GiraphConstants.WORKER_CONTEXT_CLASS,
+        DefaultWorkerContext.class,
+        WorkerContext.class);
+  }
+
+  /**
+   * Create a user worker context
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @param graphState State of the graph from the worker
+   * @return Instantiated user worker context
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  WorkerContext createWorkerContext(Configuration conf,
+    GraphState<I, V, E, M> graphState) {
+    Class<? extends WorkerContext> workerContextClass =
+      getWorkerContextClass(conf);
+    WorkerContext workerContext =
+      ReflectionUtils.newInstance(workerContextClass, conf);
+    workerContext.setGraphState(graphState);
+    return workerContext;
+  }
+
+  /**
+   * Get the user's subclassed {@link MasterCompute}
+   *
+   * @param conf Configuration to check
+   * @return User's master class
+   */
+  public static Class<? extends MasterCompute>
+  getMasterComputeClass(Configuration conf) {
+    return (Class<? extends MasterCompute>)
+      conf.getClass(GiraphConstants.MASTER_COMPUTE_CLASS,
+        DefaultMasterCompute.class,
+        MasterCompute.class);
+  }
+
+  /**
+   * Create a user master
+   *
+   * @param conf Configuration to check
+   * @return Instantiated user master
+   */
+  public static MasterCompute
+  createMasterCompute(Configuration conf) {
+    Class<? extends MasterCompute> masterComputeClass =
+        getMasterComputeClass(conf);
+    MasterCompute masterCompute =
+        ReflectionUtils.newInstance(masterComputeClass, conf);
+    return masterCompute;
+  }
+
+  /**
+   * Get the user's subclassed {@link Vertex}
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex class
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable>
+  Class<? extends Vertex<I, V, E, M>> getVertexClass(Configuration conf) {
+    return (Class<? extends Vertex<I, V, E, M>>)
+      conf.getClass(GiraphConstants.VERTEX_CLASS,
+        null,
+        Vertex.class);
+  }
+
+  /**
+   * Create a user vertex
+   *
+   * @param <I> Vertex id
+   * @param <V> Vertex data
+   * @param <E> Edge data
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable, V extends Writable,
+  E extends Writable, M extends Writable> Vertex<I, V, E, M>
+  createVertex(Configuration conf) {
+    Class<? extends Vertex<I, V, E, M>> vertexClass = getVertexClass(conf);
+    Vertex<I, V, E, M> vertex =
+      ReflectionUtils.newInstance(vertexClass, conf);
+    return vertex;
+  }
+
+  /**
+   * Get the user's subclassed vertex index class.
+   *
+   * @param <I> Vertex id
+   * @param conf Configuration to check
+   * @return User's vertex index class
+   */
+  @SuppressWarnings("unchecked")
+  public static <I extends Writable> Class<I>
+  getVertexIdClass(Configuration conf) {
+    return (Class<I>) conf.getClass(GiraphConstants.VERTEX_ID_CLASS,
+      WritableComparable.class);
+  }
+
+  /**
+   * Create a user vertex index
+   *
+   * @param <I> Vertex id
+   * @param conf Configuration to check
+   * @return Instantiated user vertex index
+   */
+  @SuppressWarnings("rawtypes")
+  public static <I extends WritableComparable>
+  I createVertexId(Configuration conf) {
+    Class<I> vertexIdClass = getVertexIdClass(conf);
+    try {
+      return vertexIdClass.newInstance();
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(
+        "createVertexId: Failed to instantiate", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(
+        "createVertexId: Illegally accessed", e);
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex value class.
+   *
+   * @param <V> Vertex data
+   * @param conf Configuration to check
+   * @return User's vertex value class
+   */
+  @SuppressWarnings("unchecked")
+  public static <V extends Writable> Class<V>
+  getVertexValueClass(Configuration conf) {
+    return (Class<V>) conf.getClass(GiraphConstants.VERTEX_VALUE_CLASS,
+      Writable.class);
+  }
+
+  /**
+   * Create a user vertex value
+   *
+   * @param <V> Vertex data
+   * @param conf Configuration to check
+   * @return Instantiated user vertex value
+   */
+  @SuppressWarnings("unchecked")
+  public static <V extends Writable> V
+  createVertexValue(Configuration conf) {
+    Class<V> vertexValueClass = getVertexValueClass(conf);
+    if (vertexValueClass == NullWritable.class) {
+      return (V) NullWritable.get();
+    } else {
+      try {
+        return vertexValueClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+          "createVertexValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+          "createVertexValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Get the user's subclassed edge value class.
+   *
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return User's vertex edge value class
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Writable> Class<E>
+  getEdgeValueClass(Configuration conf) {
+    return (Class<E>) conf.getClass(GiraphConstants.EDGE_VALUE_CLASS,
+      Writable.class);
+  }
+
+  /**
+   * Create a user edge value
+   *
+   * @param <E> Edge data
+   * @param conf Configuration to check
+   * @return Instantiated user edge value
+   */
+  @SuppressWarnings("unchecked")
+  public static <E extends Writable> E
+  createEdgeValue(Configuration conf) {
+    Class<E> edgeValueClass = getEdgeValueClass(conf);
+    if (edgeValueClass == NullWritable.class) {
+      return (E) NullWritable.get();
+    } else {
+      try {
+        return edgeValueClass.newInstance();
+      } catch (InstantiationException e) {
+        throw new IllegalArgumentException(
+          "createEdgeValue: Failed to instantiate", e);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException(
+          "createEdgeValue: Illegally accessed", e);
+      }
+    }
+  }
+
+  /**
+   * Get the user's subclassed vertex message value class.
+   *
+   * @param <M> Message data
+   * @param conf Configuration to check
+   * @return User's vertex message value class
+   */
+  @SuppressWarnings("unchecked")
+  public static <M extends Writable> Class<M>
+  getMessageValueClass(Configuration conf) {
+    return (Class<M>) conf.getClass(GiraphConstants.MESSAGE_VALUE_CLASS,
+      Writable.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java b/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java
new file mode 100644
index 0000000..20f0a6a
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/Combiner.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/**
+ * Abstract class to extend for combining messages sent to the same vertex.
+ * Combiner for applications where each two messages for one vertex can be
+ * combined into one.
+ *
+ * @param <I> Vertex id
+ * @param <M> Message data
+ */
+public abstract class Combiner<I extends WritableComparable,
+    M extends Writable> {
+  /**
+   * Combine messageToCombine with originalMassage,
+   * by modifying originalMessage.
+   *
+   * @param vertexIndex Index of the vertex getting these messages
+   * @param originalMessage The first message which we want to combine;
+   *                        put the result of combining in this message
+   * @param messageToCombine The second message which we want to combine
+   */
+  public abstract void combine(I vertexIndex, M originalMessage,
+      M messageToCombine);
+
+  /**
+   * Get the initial message. When combined with any other message M,
+   * the result should be M.
+   *
+   * @return Initial message
+   */
+  public abstract M createInitialMessage();
+}

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
new file mode 100644
index 0000000..fa1bda3
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/ComputeCallable.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.giraph.graph;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.core.TimerContext;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
+import org.apache.giraph.bsp.CentralizedServiceWorker;
+import org.apache.giraph.comm.WorkerClientRequestProcessor;
+import org.apache.giraph.comm.messages.MessageStoreByPartition;
+import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
+import org.apache.giraph.graph.partition.Partition;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.metrics.GiraphMetrics;
+
+import org.apache.giraph.utils.MemoryUtils;
+import org.apache.giraph.utils.SystemTime;
+import org.apache.giraph.utils.Time;
+import org.apache.giraph.utils.TimedLogger;
+import org.apache.giraph.utils.Times;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.log4j.Logger;
+
+/**
+ * Compute as many vertex partitions as possible.  Every thread will has its
+ * own instance of WorkerClientRequestProcessor to send requests.  Note that
+ * the partition ids are used in the partitionIdQueue rather than the actual
+ * partitions since that would cause the partitions to be loaded into memory
+ * when using the out-of-core graph partition store.  We should only load on
+ * demand.
+ *
+ * @param <I> Vertex index value
+ * @param <V> Vertex value
+ * @param <E> Edge value
+ * @param <M> Message data
+ */
+public class ComputeCallable<I extends WritableComparable, V extends Writable,
+    E extends Writable, M extends Writable> implements Callable {
+  /** Name of timer for compute call */
+  public static final String TIMER_COMPUTE_ONE = "compute-one";
+  /** Class logger */
+  private static final Logger LOG  = Logger.getLogger(ComputeCallable.class);
+  /** Class time object */
+  private static final Time TIME = SystemTime.get();
+  /** Context */
+  private final Mapper<?, ?, ?, ?>.Context context;
+  /** Graph state (note that it is recreated in call() for locality) */
+  private GraphState<I, V, E, M> graphState;
+  /** Thread-safe queue of all partition ids */
+  private final BlockingQueue<Integer> partitionIdQueue;
+  /** Message store */
+  private final MessageStoreByPartition<I, M> messageStore;
+  /** Configuration */
+  private final ImmutableClassesGiraphConfiguration<I, V, E, M> configuration;
+  /** Worker (for NettyWorkerClientRequestProcessor) */
+  private final CentralizedServiceWorker<I, V, E, M> serviceWorker;
+  /** Dump some progress every 30 seconds */
+  private final TimedLogger timedLogger = new TimedLogger(30 * 1000, LOG);
+  /** Sends the messages (unique per Callable) */
+  private WorkerClientRequestProcessor<I, V, E, M>
+  workerClientRequestProcessor;
+  /** Get the start time in nanos */
+  private final long startNanos = TIME.getNanoseconds();
+
+  // Per-Superstep Metrics
+  /** Timer for single compute() call */
+  private final Timer computeOneTimer;
+
+  /**
+   * Constructor
+   *
+   * @param context Context
+   * @param graphState Current graph state (use to create own graph state)
+   * @param messageStore Message store
+   * @param partitionIdQueue Queue of partition ids (thread-safe)
+   * @param configuration Configuration
+   * @param serviceWorker Service worker
+   */
+  public ComputeCallable(
+      Mapper<?, ?, ?, ?>.Context context, GraphState<I, V, E, M> graphState,
+      MessageStoreByPartition<I, M> messageStore,
+      BlockingQueue<Integer> partitionIdQueue,
+      ImmutableClassesGiraphConfiguration<I, V, E, M> configuration,
+      CentralizedServiceWorker<I, V, E, M> serviceWorker) {
+    this.context = context;
+    this.configuration = configuration;
+    this.partitionIdQueue = partitionIdQueue;
+    this.messageStore = messageStore;
+    this.serviceWorker = serviceWorker;
+    // Will be replaced later in call() for locality
+    this.graphState = graphState;
+
+    GiraphMetrics metrics = GiraphMetrics.get();
+    // Normally we would use ResetSuperstepMetricsObserver but this class is
+    // not long-lived, so just instantiating in the constructor is good enough.
+    computeOneTimer = metrics.perSuperstep().getTimer(TIMER_COMPUTE_ONE);
+  }
+
+  @Override
+  public Collection<PartitionStats> call() {
+    // Thread initialization (for locality)
+    this.workerClientRequestProcessor =
+        new NettyWorkerClientRequestProcessor<I, V, E, M>(
+            context, configuration, serviceWorker);
+    WorkerThreadAggregatorUsage aggregatorUsage =
+        serviceWorker.getAggregatorHandler().newThreadAggregatorUsage();
+
+    this.graphState = new GraphState<I, V, E, M>(graphState.getSuperstep(),
+        graphState.getTotalNumVertices(), graphState.getTotalNumEdges(),
+        context, graphState.getGraphMapper(), workerClientRequestProcessor,
+        aggregatorUsage);
+
+    List<PartitionStats> partitionStatsList = Lists.newArrayList();
+    while (!partitionIdQueue.isEmpty()) {
+      Integer partitionId = partitionIdQueue.poll();
+      if (partitionId == null) {
+        break;
+      }
+
+      Partition<I, V, E, M> partition =
+          serviceWorker.getPartitionStore().getPartition(partitionId);
+      try {
+        PartitionStats partitionStats = computePartition(partition);
+        partitionStatsList.add(partitionStats);
+        partitionStats.addMessagesSentCount(
+            workerClientRequestProcessor.resetMessageCount());
+        timedLogger.info("call: Completed " +
+            partitionStatsList.size() + " partitions, " +
+            partitionIdQueue.size() + " remaining " +
+            MemoryUtils.getRuntimeMemoryStats());
+      } catch (IOException e) {
+        throw new IllegalStateException("call: Caught unexpected IOException," +
+            " failing.", e);
+      }
+    }
+
+    if (LOG.isInfoEnabled()) {
+      float seconds = Times.getNanosSince(TIME, startNanos) /
+          Time.NS_PER_SECOND_AS_FLOAT;
+      LOG.info("call: Computation took " + seconds + " secs for "  +
+          partitionStatsList.size() + " partitions on superstep " +
+          graphState.getSuperstep() + ".  Flushing started");
+    }
+    try {
+      workerClientRequestProcessor.flush();
+      aggregatorUsage.finishThreadComputation();
+    } catch (IOException e) {
+      throw new IllegalStateException("call: Flushing failed.", e);
+    }
+    return partitionStatsList;
+  }
+
+  /**
+   * Compute a single partition
+   *
+   * @param partition Partition to compute
+   * @return Partition stats for this computed partition
+   */
+  private PartitionStats computePartition(Partition<I, V, E, M> partition)
+    throws IOException {
+    PartitionStats partitionStats =
+        new PartitionStats(partition.getId(), 0, 0, 0, 0);
+    // Make sure this is thread-safe across runs
+    synchronized (partition) {
+      for (Vertex<I, V, E, M> vertex : partition) {
+        // Make sure every vertex has this thread's
+        // graphState before computing
+        vertex.setGraphState(graphState);
+        Iterable<M> messages =
+            messageStore.getVertexMessages(vertex.getId());
+        if (vertex.isHalted() && !Iterables.isEmpty(messages)) {
+          vertex.wakeUp();
+        }
+        if (!vertex.isHalted()) {
+          context.progress();
+          TimerContext computeOneTimerContext = computeOneTimer.time();
+          try {
+            vertex.compute(messages);
+          } finally {
+            computeOneTimerContext.stop();
+          }
+          // Need to save the vertex changes (possibly)
+          partition.saveVertex(vertex);
+        }
+        if (vertex.isHalted()) {
+          partitionStats.incrFinishedVertexCount();
+        }
+        // Remove the messages now that the vertex has finished computation
+        messageStore.clearVertexMessages(vertex.getId());
+
+        // Add statistics for this vertex
+        partitionStats.incrVertexCount();
+        partitionStats.addEdgeCount(vertex.getNumEdges());
+      }
+
+      messageStore.clearPartition(partition.getId());
+    }
+    return partitionStats;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/giraph/blob/45851391/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
new file mode 100644
index 0000000..f0f2d0f
--- /dev/null
+++ b/giraph-core/src/main/java/org/apache/giraph/graph/DefaultMasterCompute.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.giraph.graph;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * A dumb implementation of {@link MasterCompute}. This is the default
+ * implementation when no MasterCompute is defined by the user. It does
+ * nothing.
+ */
+
+public class DefaultMasterCompute extends MasterCompute {
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+  }
+
+  @Override
+  public void compute() {
+  }
+
+  @Override
+  public void initialize() throws InstantiationException,
+      IllegalAccessException {
+  }
+
+}


Mime
View raw message