incubator-giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ach...@apache.org
Subject svn commit: r1201987 [2/5] - in /incubator/giraph/trunk: ./ src/main/java/org/apache/giraph/benchmark/ src/main/java/org/apache/giraph/bsp/ src/main/java/org/apache/giraph/comm/ src/main/java/org/apache/giraph/examples/ src/main/java/org/apache/giraph/...
Date Tue, 15 Nov 2011 00:54:22 GMT
Modified: incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java
URL: http://svn.apache.org/viewvc/incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java?rev=1201987&r1=1201986&r2=1201987&view=diff
==============================================================================
--- incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java (original)
+++ incubator/giraph/trunk/src/main/java/org/apache/giraph/graph/BspServiceMaster.java Tue Nov 15 00:54:20 2011
@@ -56,21 +56,25 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.NavigableMap;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import org.apache.giraph.graph.partition.MasterGraphPartitioner;
+import org.apache.giraph.graph.partition.PartitionOwner;
+import org.apache.giraph.graph.partition.PartitionStats;
+import org.apache.giraph.graph.partition.PartitionUtils;
+import org.apache.giraph.utils.WritableUtils;
+
 /**
- * Zookeeper-based implementation of {@link CentralizedService}.
+ * ZooKeeper-based implementation of {@link CentralizedService}.
  */
 @SuppressWarnings("rawtypes")
 public class BspServiceMaster<
@@ -81,8 +85,6 @@ public class BspServiceMaster<
         implements CentralizedServiceMaster<I, V, E, M> {
     /** Class logger */
     private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
-    /** Synchronizes vertex ranges */
-    private Object vertexRangeSynchronization = new Object();
     /** Superstep counter */
     private Counter superstepCounter = null;
     /** Vertex counter */
@@ -97,6 +99,8 @@ public class BspServiceMaster<
     private Counter currentWorkersCounter = null;
     /** Current master task partition */
     private Counter currentMasterTaskPartitionCounter = null;
+    /** Last checkpointed superstep */
+    private Counter lastCheckpointedSuperstepCounter = null;
     /** Am I the master? */
     private boolean isMaster = false;
     /** Max number of workers */
@@ -116,6 +120,11 @@ public class BspServiceMaster<
     /** State of the superstep changed */
     private final BspEvent superstepStateChanged =
         new PredicateLock();
+    /** Master graph partitioner */
+    private final MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner;
+    /** All the partition stats from the last superstep */
+    private final List<PartitionStats> allPartitionStatsList =
+        new ArrayList<PartitionStats>();
 
     /** Counter group name for the Giraph statistics */
     public String GIRAPH_STATS_COUNTER_GROUP_NAME = "Giraph Stats";
@@ -144,6 +153,8 @@ public class BspServiceMaster<
         partitionLongTailMinPrint = getConfiguration().getInt(
             GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT,
             GiraphJob.PARTITION_LONG_TAIL_MIN_PRINT_DEFAULT);
+        masterGraphPartitioner =
+            getGraphPartitionerFactory().createMasterGraphPartitioner();
     }
 
     @Override
@@ -238,63 +249,59 @@ public class BspServiceMaster<
     }
 
     /**
-     * Get the healthy and unhealthy workers for a superstep
+     * Parse the {@link WorkerInfo} objects from a ZooKeeper path
+     * (and children).
      *
-     * @param superstep superstep to check
-     * @param healthyWorkerList filled in with current data
-     * @param unhealthyWorkerList filled in with current data
+     * @param workerInfosPath Path where all the workers are children
+     * @param watch Watch or not?
+     * @return List of workers in that path
      */
-    private void getWorkers(long superstep,
-                            List<String> healthyWorkerList,
-                            List<String> unhealthyWorkerList) {
-        String healthyWorkerPath =
-            getWorkerHealthyPath(getApplicationAttempt(), superstep);
-        String unhealthyWorkerPath =
-            getWorkerUnhealthyPath(getApplicationAttempt(), superstep);
-
+    private List<WorkerInfo> getWorkerInfosFromPath(String workerInfosPath,
+                                                    boolean watch) {
+        List<WorkerInfo> workerInfoList = new ArrayList<WorkerInfo>();
+        List<String> workerInfoPathList;
         try {
-            getZkExt().createExt(healthyWorkerPath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getWorkers: " + healthyWorkerPath +
-                          " already exists, no need to create.");
-            }
+            workerInfoPathList =
+                getZkExt().getChildrenExt(workerInfosPath, watch, false, true);
         } catch (KeeperException e) {
-            throw new IllegalStateException("getWorkers: KeeperException", e);
+            throw new IllegalStateException(
+                "getWorkers: Got KeeperException", e);
         } catch (InterruptedException e) {
-            throw new IllegalStateException("getWorkers: IllegalStateException"
-                                            , e);
+            throw new IllegalStateException(
+                "getWorkers: Got InterruptedStateException", e);
         }
-
-        try {
-            getZkExt().createExt(unhealthyWorkerPath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("getWorkers: " + healthyWorkerPath +
-                          " already exists, no need to create.");
-            }
-        } catch (KeeperException e) {
-            throw new IllegalStateException("getWorkers: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException("getWorkers: IllegalStateException"
-                                            , e);
+        for (String workerInfoPath : workerInfoPathList) {
+            WorkerInfo workerInfo = new WorkerInfo();
+            WritableUtils.readFieldsFromZnode(
+                getZkExt(), workerInfoPath, true, null, workerInfo);
+            workerInfoList.add(workerInfo);
         }
+        return workerInfoList;
+    }
+
+    /**
+     * Get the healthy and unhealthy {@link WorkerInfo} objects for
+     * a superstep
+     *
+     * @param superstep superstep to check
+     * @param healthyWorkerInfoList filled in with current data
+     * @param unhealthyWorkerInfoList filled in with current data
+     */
+    private void getAllWorkerInfos(
+            long superstep,
+            List<WorkerInfo> healthyWorkerInfoList,
+            List<WorkerInfo> unhealthyWorkerInfoList) {
+        String healthyWorkerInfoPath =
+            getWorkerInfoHealthyPath(getApplicationAttempt(), superstep);
+        String unhealthyWorkerInfoPath =
+            getWorkerInfoUnhealthyPath(getApplicationAttempt(), superstep);
 
-        List<String> currentHealthyWorkerList = null;
-        List<String> currentUnhealthyWorkerList = null;
         try {
-            currentHealthyWorkerList =
-                getZkExt().getChildrenExt(healthyWorkerPath, true, false, false);
-        } catch (KeeperException.NoNodeException e) {
-            LOG.info("getWorkers: No node for " + healthyWorkerPath);
+            getZkExt().createOnceExt(healthyWorkerInfoPath,
+                                     null,
+                                     Ids.OPEN_ACL_UNSAFE,
+                                     CreateMode.PERSISTENT,
+                                     true);
         } catch (KeeperException e) {
             throw new IllegalStateException("getWorkers: KeeperException", e);
         } catch (InterruptedException e) {
@@ -303,10 +310,11 @@ public class BspServiceMaster<
         }
 
         try {
-            currentUnhealthyWorkerList =
-                getZkExt().getChildrenExt(unhealthyWorkerPath, true, false, false);
-        } catch (KeeperException.NoNodeException e) {
-            LOG.info("getWorkers: No node for " + unhealthyWorkerPath);
+            getZkExt().createOnceExt(unhealthyWorkerInfoPath,
+                                     null,
+                                     Ids.OPEN_ACL_UNSAFE,
+                                     CreateMode.PERSISTENT,
+                                     true);
         } catch (KeeperException e) {
             throw new IllegalStateException("getWorkers: KeeperException", e);
         } catch (InterruptedException e) {
@@ -314,38 +322,45 @@ public class BspServiceMaster<
                                             , e);
         }
 
-        healthyWorkerList.clear();
-        if (currentHealthyWorkerList != null) {
-            for (String healthyWorker : currentHealthyWorkerList) {
-                healthyWorkerList.add(healthyWorker);
+        List<WorkerInfo> currentHealthyWorkerInfoList =
+            getWorkerInfosFromPath(healthyWorkerInfoPath, true);
+        List<WorkerInfo> currentUnhealthyWorkerInfoList =
+            getWorkerInfosFromPath(unhealthyWorkerInfoPath, false);
+
+        healthyWorkerInfoList.clear();
+        if (currentHealthyWorkerInfoList != null) {
+            for (WorkerInfo healthyWorkerInfo :
+                    currentHealthyWorkerInfoList) {
+                healthyWorkerInfoList.add(healthyWorkerInfo);
             }
         }
 
-        unhealthyWorkerList.clear();
-        if (currentUnhealthyWorkerList != null) {
-            for (String unhealthyWorker : currentUnhealthyWorkerList) {
-                unhealthyWorkerList.add(unhealthyWorker);
+        unhealthyWorkerInfoList.clear();
+        if (currentUnhealthyWorkerInfoList != null) {
+            for (WorkerInfo unhealthyWorkerInfo :
+                    currentUnhealthyWorkerInfoList) {
+                unhealthyWorkerInfoList.add(unhealthyWorkerInfo);
             }
         }
     }
 
     /**
-     * Check the workers to ensure that a minimum number of good workers exists
-     * out of the total that have reported.
+     * Check all the {@link WorkerInfo} objects to ensure that a minimum
+     * number of good workers exists out of the total that have reported.
      *
-     * @return map of healthy worker list to JSONArray(hostname, port)
+     * @return List of of healthy workers
      */
-    private Map<String, JSONArray> checkWorkers() {
+    private List<WorkerInfo> checkWorkers() {
         boolean failJob = true;
         int pollAttempt = 0;
-        List<String> healthyWorkerList = new ArrayList<String>();
-        List<String> unhealthyWorkerList = new ArrayList<String>();
+        List<WorkerInfo> healthyWorkerInfoList = new ArrayList<WorkerInfo>();
+        List<WorkerInfo> unhealthyWorkerInfoList = new ArrayList<WorkerInfo>();
         int totalResponses = -1;
         while (pollAttempt < maxPollAttempts) {
-            getWorkers(
-                getSuperstep(), healthyWorkerList, unhealthyWorkerList);
-            totalResponses = healthyWorkerList.size() +
-                unhealthyWorkerList.size();
+            getAllWorkerInfos(
+                getSuperstep(), healthyWorkerInfoList, unhealthyWorkerInfoList);
+            totalResponses = healthyWorkerInfoList.size() +
+                unhealthyWorkerInfoList.size();
             if ((totalResponses * 100.0f / maxWorkers) >=
                     minPercentResponded) {
                 failJob = false;
@@ -374,19 +389,14 @@ public class BspServiceMaster<
                          msecsPollPeriod + " msecs and used " + pollAttempt +
                          " of " + maxPollAttempts + " attempts.");
                 // Find the missing workers if there are only a few
-                if ((maxWorkers - totalResponses) <= partitionLongTailMinPrint) {
+                if ((maxWorkers - totalResponses) <=
+                        partitionLongTailMinPrint) {
                     Set<Integer> partitionSet = new TreeSet<Integer>();
-                    for (String hostnamePartitionId : healthyWorkerList) {
-                        int lastIndex = hostnamePartitionId.lastIndexOf("_");
-                        Integer partition = Integer.parseInt(
-                            hostnamePartitionId.substring(lastIndex + 1));
-                        partitionSet.add(partition);
+                    for (WorkerInfo workerInfo : healthyWorkerInfoList) {
+                        partitionSet.add(workerInfo.getPartitionId());
                     }
-                    for (String hostnamePartitionId : unhealthyWorkerList) {
-                        int lastIndex = hostnamePartitionId.lastIndexOf("_");
-                        Integer partition = Integer.parseInt(
-                            hostnamePartitionId.substring(lastIndex + 1));
-                        partitionSet.add(partition);
+                    for (WorkerInfo workerInfo : unhealthyWorkerInfoList) {
+                        partitionSet.add(workerInfo.getPartitionId());
                     }
                     for (int i = 1; i <= maxWorkers; ++i) {
                         if (partitionSet.contains(new Integer(i))) {
@@ -412,40 +422,13 @@ public class BspServiceMaster<
             return null;
         }
 
-        if (healthyWorkerList.size() < minWorkers) {
-            LOG.error("checkWorkers: Only " + healthyWorkerList.size() +
+        if (healthyWorkerInfoList.size() < minWorkers) {
+            LOG.error("checkWorkers: Only " + healthyWorkerInfoList.size() +
                       " available when " + minWorkers + " are required.");
             return null;
         }
 
-        Map<String, JSONArray> workerHostnamePortMap =
-            new HashMap<String, JSONArray>();
-        for (String healthyWorker: healthyWorkerList) {
-            String healthyWorkerPath = null;
-            try {
-                healthyWorkerPath =
-                    getWorkerHealthyPath(getApplicationAttempt(),
-                                         getSuperstep()) + "/" +  healthyWorker;
-                JSONArray hostnamePortArray =
-                    new JSONArray(
-                        new String(getZkExt().getData(healthyWorkerPath,
-                                                      false,
-                                                      null)));
-                workerHostnamePortMap.put(healthyWorker, hostnamePortArray);
-            } catch (JSONException e) {
-                throw new RuntimeException(
-                    "checkWorkers: Problem fetching hostname and port for " +
-                    healthyWorker + " in " + healthyWorkerPath);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "checkWorkers: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "checkWorkers: IllegalStateException", e);
-            }
-        }
-
-        return workerHostnamePortMap;
+        return healthyWorkerInfoList;
     }
 
     @Override
@@ -477,27 +460,20 @@ public class BspServiceMaster<
 
         // When creating znodes, in case the master has already run, resume
         // where it left off.
-        Map<String, JSONArray> healthyWorkerHostnamePortMap = checkWorkers();
-        if (healthyWorkerHostnamePortMap == null) {
+        List<WorkerInfo> healthyWorkerInfoList = checkWorkers();
+        if (healthyWorkerInfoList.isEmpty()) {
             setJobState(ApplicationState.FAILED, -1, -1);
             return -1;
         }
 
         List<InputSplit> splitList =
-            generateInputSplits(healthyWorkerHostnamePortMap.size());
-        if (healthyWorkerHostnamePortMap.size() > splitList.size()) {
+            generateInputSplits(healthyWorkerInfoList.size());
+        if (healthyWorkerInfoList.size() > splitList.size()) {
             LOG.warn("createInputSplits: Number of inputSplits="
                      + splitList.size() + " < " +
-                     healthyWorkerHostnamePortMap.size() +
+                     healthyWorkerInfoList.size() +
                      "=number of healthy processes");
         }
-        else if (healthyWorkerHostnamePortMap.size() < splitList.size()) {
-            LOG.fatal("masterGenerateInputSplits: Number of inputSplits="
-                      + splitList.size() + " > " +
-                      healthyWorkerHostnamePortMap.size() +
-                      "=number of healthy processes");
-            setJobState(ApplicationState.FAILED, -1, -1);
-        }
         String inputSplitPath = null;
         for (int i = 0; i< splitList.size(); ++i) {
             try {
@@ -559,21 +535,21 @@ public class BspServiceMaster<
 
     /**
      * Read the finalized checkpoint file and associated metadata files for the
-     * checkpoint.  Assign one worker per checkpoint point (and associated
-     * vertex ranges).  Remove any unused chosen workers
-     * (this is safe since it wasn't finalized).  The workers can then
-     * find the vertex ranges within the files.  It is an
-     * optimization to prevent all workers from searching all the files.
-     * Also read in the aggregator data from the finalized checkpoint
-     * file.
+     * checkpoint.  Modifies the {@link PartitionOwner} objects to get the
+     * checkpoint prefixes.  It is an optimization to prevent all workers from
+     * searching all the files.  Also read in the aggregator data from the
+     * finalized checkpoint file and setting it.
      *
-     * @param superstep checkpoint set to examine.
+     * @param superstep Checkpoint set to examine.
+     * @param partitionOwners Partition owners to modify with checkpoint
+     *        prefixes
      * @throws IOException
      * @throws InterruptedException
      * @throws KeeperException
      */
-    private void mapFilesToWorkers(long superstep,
-                                   List<String> chosenWorkerList)
+    private void prepareCheckpointRestart(
+            long superstep,
+            Collection<PartitionOwner> partitionOwners)
             throws IOException, KeeperException, InterruptedException {
         FileSystem fs = getFs();
         List<Path> validMetadataPathList = new ArrayList<Path>();
@@ -596,15 +572,16 @@ public class BspServiceMaster<
                 finalizedStream.read(aggregatorZkData, 0, aggregatorDataSize);
             if (actualDataRead != aggregatorDataSize) {
                 throw new RuntimeException(
-                    "mapFilesToWorkers: Only read " + actualDataRead + " of " +
-                    aggregatorDataSize + " aggregator bytes from " +
+                    "prepareCheckpointRestart: Only read " + actualDataRead +
+                    " of " + aggregatorDataSize + " aggregator bytes from " +
                     finalizedCheckpointPath);
             }
             String mergedAggregatorPath =
                 getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
             if (LOG.isInfoEnabled()) {
-                LOG.info("mapFilesToWorkers: Reloading merged aggregator " +
-                         "data '" + Arrays.toString(aggregatorZkData) +
+                LOG.info("prepareCheckpointRestart: Reloading merged " +
+                         "aggregator " + "data '" +
+                         Arrays.toString(aggregatorZkData) +
                          "' to previous checkpoint in path " +
                          mergedAggregatorPath);
             }
@@ -621,16 +598,18 @@ public class BspServiceMaster<
         }
         finalizedStream.close();
 
-        Map<String, Set<String>> workerFileSetMap =
-            new HashMap<String, Set<String>>();
-        // Reading the metadata files.  For now, implement simple algorithm:
-        // 1. Every file is set an input split and the partitions are mapped
-        //    accordingly
-        // 2. Every worker gets a hint about which files to look in to find
-        //    the input splits
-        int chosenWorkerListIndex = 0;
-        int inputSplitIndex = 0;
-        I maxVertexIndex = BspUtils.<I>createVertexIndex(getConfiguration());
+        Map<Integer, PartitionOwner> idOwnerMap =
+            new HashMap<Integer, PartitionOwner>();
+        for (PartitionOwner partitionOwner : partitionOwners) {
+            if (idOwnerMap.put(partitionOwner.getPartitionId(),
+                               partitionOwner) != null) {
+                throw new IllegalStateException(
+                    "prepareCheckpointRestart: Duplicate partition " +
+                    partitionOwner);
+            }
+        }
+        // Reading the metadata files.  Simply assign each partition owner
+        // the correct file prefix based on the partition id.
         for (Path metadataPath : validMetadataPathList) {
             String checkpointFilePrefix = metadataPath.toString();
             checkpointFilePrefix =
@@ -639,90 +618,20 @@ public class BspServiceMaster<
                 checkpointFilePrefix.length() -
                 CHECKPOINT_METADATA_POSTFIX.length());
             DataInputStream metadataStream = fs.open(metadataPath);
-            long entries = metadataStream.readLong();
-            JSONArray vertexRangeMetaArray = new JSONArray();
-            JSONArray vertexRangeArray = new JSONArray();
-            String chosenWorker =
-                chosenWorkerList.get(chosenWorkerListIndex);
-            for (long i = 0; i < entries; ++i) {
+            long partitions = metadataStream.readInt();
+            for (long i = 0; i < partitions; ++i) {
                 long dataPos = metadataStream.readLong();
-                Long vertexCount = metadataStream.readLong();
-                Long edgeCount = metadataStream.readLong();
-                maxVertexIndex.readFields(metadataStream);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("mapFileToWorkers: File " + metadataPath +
-                              " with position " + dataPos + ", vertex count " +
-                              vertexCount + " assigned to " + chosenWorker);
-                }
-                ByteArrayOutputStream outputStream =
-                    new ByteArrayOutputStream();
-                DataOutput output = new DataOutputStream(outputStream);
-                maxVertexIndex.write(output);
-
-                JSONObject vertexRangeObj = new JSONObject();
-
-                try {
-                    vertexRangeObj.put(JSONOBJ_NUM_VERTICES_KEY,
-                                       vertexCount);
-                    vertexRangeObj.put(JSONOBJ_NUM_EDGES_KEY,
-                                       edgeCount);
-                    vertexRangeObj.put(JSONOBJ_HOSTNAME_ID_KEY,
-                                       chosenWorker);
-                    vertexRangeObj.put(JSONOBJ_CHECKPOINT_FILE_PREFIX_KEY,
-                                       checkpointFilePrefix);
-                    vertexRangeObj.put(JSONOBJ_MAX_VERTEX_INDEX_KEY,
-                                       Base64.encodeBytes(
-                                           outputStream.toByteArray()));
-                    vertexRangeMetaArray.put(vertexRangeObj);
-                    vertexRangeArray.put(outputStream.toString("UTF-8"));
-                } catch (JSONException e) {
-                    throw new IllegalStateException(
-                        "mapFilesToWorkers: JSONException ", e);
-                }
-
-                if (!workerFileSetMap.containsKey(chosenWorker)) {
-                    workerFileSetMap.put(chosenWorker, new HashSet<String>());
-                }
-                Set<String> fileSet = workerFileSetMap.get(chosenWorker);
-                fileSet.add(metadataPath.toString());
-            }
-            metadataStream.close();
-
-            String inputSplitPathFinishedPath =
-                INPUT_SPLIT_PATH + "/" + inputSplitIndex +
-                INPUT_SPLIT_FINISHED_NODE;
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("mapFilesToWorkers: Assigning (if not empty) " +
-                          chosenWorker + " the following vertexRanges: " +
-                          vertexRangeArray.toString());
-            }
-
-            // Assign the input splits if there is at least one vertex range
-            if (vertexRangeArray.length() == 0) {
-                continue;
-            }
-            try {
+                int partitionId = metadataStream.readInt();
+                PartitionOwner partitionOwner = idOwnerMap.get(partitionId);
                 if (LOG.isInfoEnabled()) {
-                    LOG.info("mapFilesToWorkers: vertexRangeMetaArray size=" +
-                             vertexRangeMetaArray.toString().length());
+                    LOG.info("prepareSuperstepRestart: File " + metadataPath +
+                              " with position " + dataPos +
+                              ", partition id = " + partitionId +
+                              " assigned to " + partitionOwner);
                 }
-                getZkExt().createExt(inputSplitPathFinishedPath,
-                                     vertexRangeMetaArray.toString().getBytes(),
-                                     Ids.OPEN_ACL_UNSAFE,
-                                     CreateMode.PERSISTENT,
-                                     true);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "mapFilesToWorkers: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "mapFilesToWorkers: IllegalStateException", e);
-            }
-            ++inputSplitIndex;
-            ++chosenWorkerListIndex;
-            if (chosenWorkerListIndex == chosenWorkerList.size()) {
-                chosenWorkerListIndex = 0;
+                partitionOwner.setCheckpointFilesPrefix(checkpointFilePrefix);
             }
+            metadataStream.close();
         }
     }
 
@@ -746,6 +655,8 @@ public class BspServiceMaster<
             GIRAPH_STATS_COUNTER_GROUP_NAME, "Current workers");
         currentMasterTaskPartitionCounter = getContext().getCounter(
             GIRAPH_STATS_COUNTER_GROUP_NAME, "Current master task partition");
+        lastCheckpointedSuperstepCounter = getContext().getCounter(
+            GIRAPH_STATS_COUNTER_GROUP_NAME, "Last checkpointed superstep");
         if (getRestartedSuperstep() != UNSET_SUPERSTEP) {
             superstepCounter.increment(getRestartedSuperstep());
         }
@@ -821,135 +732,68 @@ public class BspServiceMaster<
      * Collect and aggregate the worker statistics for a particular superstep.
      *
      * @param superstep Superstep to aggregate on
-     * @return JSONObject with all the aggregated fields
+     * @return Global statistics aggregated on all worker statistics
      */
-    private JSONObject aggregateWorkerStats(long superstep) {
-        long aggregateFinishedVertices = 0;
-        long aggregateVertices = 0;
-        long aggregateEdges = 0;
-        long aggregateSentMessages = 0;
-        // INPUT_SUPERSTEP is special since there is no computation, just get
-        // the stats from the input splits finished nodes.  Otherwise, get the
-        // stats from the all the worker selected nodes
-        if (superstep == INPUT_SUPERSTEP) {
-            List<String> inputSplitList = null;
-            try {
-                inputSplitList = getZkExt().getChildrenExt(
-                    INPUT_SPLIT_PATH, false, false, true);
-            } catch (KeeperException e) {
-                throw new IllegalStateException(
-                    "aggregateWorkerStats: KeeperException", e);
-            } catch (InterruptedException e) {
-                throw new IllegalStateException(
-                    "aggregateWorkerStats: IllegalStateException", e);
-            }
-            for (String inputSplitPath : inputSplitList) {
-                JSONArray statArray = null;
-                try {
-                    String inputSplitFinishedPath = inputSplitPath +
-                        INPUT_SPLIT_FINISHED_NODE;
-                    byte [] zkData =
-                        getZkExt().getData(inputSplitFinishedPath, false, null);
-                    if (zkData == null || zkData.length == 0) {
-                        continue;
-                    }
-                    statArray = new JSONArray(new String(zkData));
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("aggregateWorkerStats: input split path " +
-                                  inputSplitPath + " got " + statArray);
-                    }
-                } catch (JSONException e) {
-                    throw new IllegalStateException(
-                        "aggregateWorkerStats: JSONException", e);
-                } catch (KeeperException e) {
-                    throw new IllegalStateException(
-                        "aggregateWorkerStats: KeeperException", e);
-                } catch (InterruptedException e) {
-                    throw new IllegalStateException(
-                        "aggregateWorkerStats: InterruptedException", e);
-                }
-                for (int i = 0; i < statArray.length(); ++i) {
-                    try {
-                        aggregateVertices +=
-                            statArray.getJSONObject(i).getLong(
-                                JSONOBJ_NUM_VERTICES_KEY);
-                        aggregateEdges +=
-                            statArray.getJSONObject(i).getLong(
-                                JSONOBJ_NUM_EDGES_KEY);
-                    } catch (JSONException e) {
-                        throw new IllegalStateException(
-                            "aggregateWorkerStats: JSONException", e);
-                    }
-                }
-            }
+    private GlobalStats aggregateWorkerStats(long superstep) {
+        Class<? extends Writable> partitionStatsClass =
+            masterGraphPartitioner.createPartitionStats().getClass();
+        GlobalStats globalStats = new GlobalStats();
+        // Get the stats from the all the worker selected nodes
+        String workerFinishedPath =
+            getWorkerFinishedPath(getApplicationAttempt(), superstep);
+        List<String> workerFinishedPathList = null;
+        try {
+            workerFinishedPathList =
+                getZkExt().getChildrenExt(
+                    workerFinishedPath, false, false, true);
+        } catch (KeeperException e) {
+            throw new IllegalStateException(
+                "aggregateWorkerStats: KeeperException", e);
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(
+                "aggregateWorkerStats: InterruptedException", e);
         }
-        else {
-            String workerFinishedPath =
-                getWorkerFinishedPath(getApplicationAttempt(), superstep);
-            List<String> workerFinishedPathList = null;
+
+        allPartitionStatsList.clear();
+        for (String finishedPath : workerFinishedPathList) {
+            JSONObject workerFinishedInfoObj = null;
             try {
-                workerFinishedPathList =
-                    getZkExt().getChildrenExt(
-                        workerFinishedPath, false, false, true);
+                byte [] zkData =
+                    getZkExt().getData(finishedPath, false, null);
+                workerFinishedInfoObj = new JSONObject(new String(zkData));
+                List<? extends Writable> writableList =
+                    WritableUtils.readListFieldsFromByteArray(
+                        Base64.decode(workerFinishedInfoObj.getString(
+                            JSONOBJ_PARTITION_STATS_KEY)),
+                        partitionStatsClass,
+                        getConfiguration());
+                for (Writable writable : writableList) {
+                    globalStats.addPartitionStats((PartitionStats) writable);
+                    globalStats.addMessageCount(
+                        workerFinishedInfoObj.getLong(
+                            JSONOBJ_NUM_MESSAGES_KEY));
+                    allPartitionStatsList.add((PartitionStats) writable);
+                }
+            } catch (JSONException e) {
+                throw new IllegalStateException(
+                    "aggregateWorkerStats: JSONException", e);
             } catch (KeeperException e) {
                 throw new IllegalStateException(
                     "aggregateWorkerStats: KeeperException", e);
             } catch (InterruptedException e) {
                 throw new IllegalStateException(
                     "aggregateWorkerStats: InterruptedException", e);
-            }
-
-            for (String finishedPath : workerFinishedPathList) {
-                JSONObject aggregatorStatObj= null;
-                try {
-                    byte [] zkData =
-                        getZkExt().getData(finishedPath, false, null);
-                    aggregatorStatObj = new JSONObject(new String(zkData));
-                    aggregateFinishedVertices +=
-                        aggregatorStatObj.getLong(JSONOBJ_FINISHED_VERTICES_KEY);
-                    aggregateVertices +=
-                        aggregatorStatObj.getLong(JSONOBJ_NUM_VERTICES_KEY);
-                    aggregateEdges +=
-                        aggregatorStatObj.getLong(JSONOBJ_NUM_EDGES_KEY);
-                    aggregateSentMessages +=
-                        aggregatorStatObj.getLong(JSONOBJ_NUM_MESSAGES_KEY);
-                } catch (JSONException e) {
-                    throw new IllegalStateException(
-                        "aggregateWorkerStats: JSONException", e);
-                } catch (KeeperException e) {
-                    throw new IllegalStateException(
-                        "aggregateWorkerStats: KeeperException", e);
-                } catch (InterruptedException e) {
-                    throw new IllegalStateException(
-                        "aggregateWorkerStats: InterruptedException", e);
-                }
+            } catch (IOException e) {
+                throw new IllegalStateException(
+                    "aggregateWorkerStats: IOException", e);
             }
          }
 
-        JSONObject aggregateStats = new JSONObject();
-        try {
-            aggregateStats.put(JSONOBJ_FINISHED_VERTICES_KEY,
-                               aggregateFinishedVertices);
-            aggregateStats.put(JSONOBJ_NUM_VERTICES_KEY,
-                               aggregateVertices);
-            aggregateStats.put(JSONOBJ_NUM_EDGES_KEY,
-                               aggregateEdges);
-            aggregateStats.put(JSONOBJ_NUM_MESSAGES_KEY,
-                               aggregateSentMessages);
-        } catch (JSONException e) {
-            throw new IllegalStateException(
-                "aggregateWorkerStats: Failed to put the aggregator " +
-                "stats together", e);
-        }
         if (LOG.isInfoEnabled()) {
-            LOG.info("aggregateWorkerStats: Aggregation found " +
-                     aggregateFinishedVertices + " of " +
-                     aggregateVertices +
-                     " vertices finished, " + aggregateEdges +
-                     " edges, " + aggregateSentMessages +
-                     " messages sent on superstep = " + getSuperstep());
+            LOG.info("aggregateWorkerStats: Aggregation found " + globalStats +
+                     " on superstep = " + getSuperstep());
         }
-        return aggregateStats;
+        return globalStats;
     }
 
     /**
@@ -981,12 +825,12 @@ public class BspServiceMaster<
         }
 
         for (String hostnameIdPath : hostnameIdPathList) {
-            JSONObject aggregatorsStatObj = null;
+            JSONObject workerFinishedInfoObj = null;
             JSONArray aggregatorArray = null;
             try {
                 byte [] zkData =
                     getZkExt().getData(hostnameIdPath, false, null);
-                aggregatorsStatObj = new JSONObject(new String(zkData));
+                workerFinishedInfoObj = new JSONObject(new String(zkData));
             } catch (KeeperException e) {
                 throw new IllegalStateException(
                     "collectAndProcessAggregatorValues: KeeperException", e);
@@ -999,7 +843,7 @@ public class BspServiceMaster<
                     "collectAndProcessAggregatorValues: JSONException", e);
             }
             try {
-                aggregatorArray = aggregatorsStatObj.getJSONArray(
+                aggregatorArray = workerFinishedInfoObj.getJSONArray(
                     JSONOBJ_AGGREGATOR_VALUE_ARRAY_KEY);
             } catch (JSONException e) {
                 if (LOG.isDebugEnabled()) {
@@ -1167,7 +1011,7 @@ public class BspServiceMaster<
      */
     private void finalizeCheckpoint(
             long superstep,
-            List<String> chosenWorkerList)
+            List<WorkerInfo> chosenWorkerInfoList)
             throws IOException, KeeperException, InterruptedException {
         Path finalizedCheckpointPath =
             new Path(getCheckpointBasePath(superstep) +
@@ -1185,11 +1029,12 @@ public class BspServiceMaster<
         // <aggregator data length><aggregators as a serialized JSON byte array>
         FSDataOutputStream finalizedOutputStream =
             getFs().create(finalizedCheckpointPath);
-        finalizedOutputStream.writeInt(chosenWorkerList.size());
-        for (String chosenWorker : chosenWorkerList) {
-            String chosenWorkerPrefix =
-                getCheckpointBasePath(superstep) + "." + chosenWorker;
-            finalizedOutputStream.writeUTF(chosenWorkerPrefix);
+        finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
+        for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
+            String chosenWorkerInfoPrefix =
+                getCheckpointBasePath(superstep) + "." +
+                chosenWorkerInfo.getHostnameId();
+            finalizedOutputStream.writeUTF(chosenWorkerInfoPrefix);
         }
         String mergedAggregatorPath =
             getMergedAggregatorPath(getApplicationAttempt(), superstep - 1);
@@ -1204,313 +1049,120 @@ public class BspServiceMaster<
         }
         finalizedOutputStream.close();
         lastCheckpointedSuperstep = superstep;
+        lastCheckpointedSuperstepCounter.increment(superstep -
+            lastCheckpointedSuperstepCounter.getValue());
     }
 
     /**
-     * Convert the processed input split data into vertex range assignments
-     * that can be used on the next superstep.
+     * Assign the partitions for this superstep.  If there are changes,
+     * the workers will know how to do the exchange.  If this was a restarted
+     * superstep, then make sure to provide information on where to find the
+     * checkpoint file.
      *
-     * @param chosenWorkerHostnamePortMap map of chosen workers to
-     *        a hostname and port array
+     * @param allPartitionStatsList All partition stats
+     * @param chosenWorkerInfoList All the chosen worker infos
+     * @param masterGraphPartitioner Master graph partitioner
      */
-    private void inputSplitsToVertexRanges(
-            Map<String, JSONArray> chosenWorkerHostnamePortMap) {
-        List<String> inputSplitPathList = null;
-        List<VertexRange<I, V, E, M>> vertexRangeList =
-            new ArrayList<VertexRange<I, V, E, M>>();
-        try {
-            inputSplitPathList = getZkExt().getChildrenExt(INPUT_SPLIT_PATH,
-                                                           false,
-                                                           false,
-                                                           true);
-            int numRanges = 0;
-            for (String inputSplitPath : inputSplitPathList) {
-                String inputSplitFinishedPath = inputSplitPath +
-                    INPUT_SPLIT_FINISHED_NODE;
-                byte [] zkData =
-                    getZkExt().getData(inputSplitFinishedPath, false, null);
-                if (zkData == null || zkData.length == 0) {
-                    if (LOG.isInfoEnabled()) {
-                        LOG.info("inputSplitsToVertexRanges: No vertex ranges " +
-                                 "in " + inputSplitFinishedPath);
-                    }
-                    continue;
-                }
-                JSONArray statArray = new JSONArray(new String(zkData));
-                JSONObject vertexRangeObj = null;
-                for (int i = 0; i < statArray.length(); ++i) {
-                    vertexRangeObj = statArray.getJSONObject(i);
-                    String hostnamePartitionId =
-                        vertexRangeObj.getString(JSONOBJ_HOSTNAME_ID_KEY);
-                    if (!chosenWorkerHostnamePortMap.containsKey(
-                        hostnamePartitionId)) {
-                        LOG.fatal("inputSplitsToVertexRanges: Worker " +
-                              hostnamePartitionId + " died, failing.");
-                        setJobState(ApplicationState.FAILED, -1, -1);
-                    }
-                   JSONArray hostnamePortArray =
-                       chosenWorkerHostnamePortMap.get(hostnamePartitionId);
-                   vertexRangeObj.put(JSONOBJ_HOSTNAME_KEY,
-                                      hostnamePortArray.getString(0));
-                   vertexRangeObj.put(JSONOBJ_PORT_KEY,
-                                      hostnamePortArray.getString(1));
-                   Class<I> indexClass =
-                       BspUtils.getVertexIndexClass(getConfiguration());
-                    VertexRange<I, V, E, M> vertexRange =
-                        new VertexRange<I, V, E, M>(indexClass, vertexRangeObj);
-                    vertexRangeList.add(vertexRange);
-                    ++numRanges;
-                }
+    private void assignPartitionOwners(
+            List<PartitionStats> allPartitionStatsList,
+            List<WorkerInfo> chosenWorkerInfoList,
+            MasterGraphPartitioner<I, V, E, M> masterGraphPartitioner) {
+        Collection<PartitionOwner> partitionOwners;
+        if (getSuperstep() == INPUT_SUPERSTEP ||
+                getSuperstep() == getRestartedSuperstep()) {
+            partitionOwners =
+                masterGraphPartitioner.createInitialPartitionOwners(
+                    chosenWorkerInfoList, maxWorkers);
+            if (partitionOwners.isEmpty()) {
+                throw new IllegalStateException(
+                    "assignAndExchangePartitions: No partition owners set");
             }
+        } else {
+            partitionOwners =
+                masterGraphPartitioner.generateChangedPartitionOwners(
+                    allPartitionStatsList,
+                    chosenWorkerInfoList,
+                    maxWorkers,
+                    getSuperstep());
 
-            JSONArray vertexRangeAssignmentArray =
-                new JSONArray();
-            for (VertexRange<I, V, E, M> vertexRange : vertexRangeList) {
-                vertexRangeAssignmentArray.put(vertexRange.toJSONObject());
-            }
-            String vertexRangeAssignmentsPath =
-                getVertexRangeAssignmentsPath(getApplicationAttempt(),
-                                              getSuperstep());
-            if (LOG.isInfoEnabled()) {
-                LOG.info("inputSplitsToVertexRanges: Assigning " + numRanges +
-                         " vertex ranges of total length " +
-                         vertexRangeAssignmentArray.toString().length() +
-                         " to path " + vertexRangeAssignmentsPath);
-            }
-            getZkExt().createExt(
-                vertexRangeAssignmentsPath,
-                vertexRangeAssignmentArray.toString().getBytes(),
-                Ids.OPEN_ACL_UNSAFE,
-                CreateMode.PERSISTENT,
-                true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "inputSplitsToVertexRanges: KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "inputSplitsToVertexRanges: IllegalStateException", e);
-        } catch (JSONException e) {
-            throw new IllegalStateException(
-                "inputSplitsToVertexRanges: JSONException", e);
-        } catch (IOException e) {
-            throw new IllegalStateException(
-                "inputSplitsToVertexRanges: IOException", e);
-        } catch (InstantiationException e) {
-            throw new IllegalStateException(
-                "inputSplitsToVertexRanges: InstantiationException", e);
-        } catch (IllegalAccessException e) {
-            throw new IllegalStateException(
-                "inputSplitsToVertexRanges: IllegalAccessException", e);
+            PartitionUtils.analyzePartitionStats(partitionOwners,
+                                                 allPartitionStatsList);
         }
-    }
 
-    /**
-     * Balance the vertex ranges before the next superstep computation begins.
-     * Wait until the vertex ranges data has been moved to the correct
-     * worker prior to continuing.
-     *
-     * @param vertexRangeBalancer balancer to use
-     * @param chosenWorkerHostnamePortMap workers available
-     * @throws JSONException
-     * @throws IOException
-     * @throws UnsupportedEncodingException
-     */
-    private void balanceVertexRanges(
-            VertexRangeBalancer<I, V, E, M> vertexRangeBalancer,
-            Map<String, JSONArray> chosenWorkerHostnamePortMap) {
-        vertexRangeBalancer.setSuperstep(getSuperstep());
-        vertexRangeBalancer.setWorkerHostnamePortMap(
-            chosenWorkerHostnamePortMap);
-        NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-            getVertexRangeMap(getSuperstep() - 1);
-        if (vertexRangeMap.isEmpty()) {
-            throw new IllegalStateException(
-                "balanceVertexRanges: Couldn't get the " +
-                "previous vertex range map (superstep " +
-                (getSuperstep() - 1) + ")");
-        }
-        vertexRangeBalancer.setPrevVertexRangeMap(vertexRangeMap);
-        NavigableMap<I, VertexRange<I, V, E, M>> nextVertexRangeMap =
-            vertexRangeBalancer.rebalance();
-        vertexRangeBalancer.setNextVertexRangeMap(nextVertexRangeMap);
-        vertexRangeBalancer.setPreviousHostnamePort();
-        nextVertexRangeMap = vertexRangeBalancer.getNextVertexRangeMap();
-        if (nextVertexRangeMap.size() != vertexRangeMap.size()) {
-            throw new IllegalArgumentException(
-                "balanceVertexRanges: Next vertex range count " +
-                nextVertexRangeMap.size() + " != " + vertexRangeMap.size());
-        }
-        JSONArray vertexRangeAssignmentArray = new JSONArray();
-        VertexRange<I, V, E, M> maxVertexRange = null; // TODO: delete after Bug 4340282 fixed.
-        for (VertexRange<I, V, E, M> vertexRange :
-                nextVertexRangeMap.values()) {
+        // If restarted, prepare the checkpoint restart
+        if (getRestartedSuperstep() == getSuperstep()) {
             try {
-                vertexRangeAssignmentArray.put(
-                    vertexRange.toJSONObject());
-                if (maxVertexRange == null || // TODO: delete after Bug 4340282 fixed.
-                        vertexRange.toJSONObject().toString().length() <
-                        maxVertexRange.toJSONObject().toString().length()) {
-                    maxVertexRange = vertexRange;
-                }
+                prepareCheckpointRestart(getSuperstep(), partitionOwners);
             } catch (IOException e) {
                 throw new IllegalStateException(
-                    "balanceVertexRanges: IOException", e);
-            } catch (JSONException e) {
+                    "assignPartitionOwners: IOException on preparing", e);
+            } catch (KeeperException e) {
                 throw new IllegalStateException(
-                    "balanceVertexRanges: JSONException", e);
+                    "assignPartitionOwners: KeeperException on preparing", e);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException(
+                    "assignPartitionOwners: InteruptedException on preparing",
+                    e);
             }
         }
-        if (vertexRangeAssignmentArray.length() == 0) {
-            throw new RuntimeException(
-                "balanceVertexRanges: Impossible there are no " +
-                "vertex ranges on superstep " + getSuperstep());
-        }
 
-        byte[] vertexAssignmentBytes = null;
-        try {
-            vertexAssignmentBytes =
-                vertexRangeAssignmentArray.toString().getBytes("UTF-8");
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(
-                "balanceVertexRanges: Cannot get bytes from " +
-                vertexRangeAssignmentArray.toString(), e);
-        }
-
-        // TODO: delete after Bug 4340282 fixed.
-        if (LOG.isInfoEnabled()) {
+        // There will be some exchange of partitions
+        if (!partitionOwners.isEmpty()) {
+            String vertexExchangePath =
+                getPartitionExchangePath(getApplicationAttempt(),
+                                         getSuperstep());
             try {
-                LOG.info("balanceVertexRanges: numVertexRanges=" +
-                         nextVertexRangeMap.values().size() +
-                         " lengthVertexRanges=" + vertexAssignmentBytes.length +
-                         " maxVertexRangeLength=" +
-                         maxVertexRange.toJSONObject().toString().length());
-            } catch (IOException e) {
-                throw new RuntimeException(
-                    "balanceVertexRanges: IOException fetching " +
-                    maxVertexRange.toString(), e);
-            } catch (JSONException e) {
-                throw new RuntimeException(
-                    "balanceVertexRanges: JSONException fetching " +
-                    maxVertexRange.toString(), e);
-            }
-        }
-
-        String vertexRangeAssignmentsPath =
-            getVertexRangeAssignmentsPath(getApplicationAttempt(),
-                                          getSuperstep());
-        try {
-            getZkExt().createExt(vertexRangeAssignmentsPath,
-                                 vertexAssignmentBytes,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("balanceVertexRanges: Node " +
-                     vertexRangeAssignmentsPath + " already exists", e);
-            }
-        } catch (KeeperException e) {
-            LOG.fatal("balanceVertexRanges: Got KeeperException", e);
-            throw new IllegalStateException(
-                "balanceVertexRanges: Got KeeperException", e);
-        } catch (InterruptedException e) {
-            LOG.fatal("balanceVertexRanges: Got InterruptedException", e);
-            throw new IllegalStateException(
-                "balanceVertexRanges: Got InterruptedException", e);
-        }
-
-        long changes = vertexRangeBalancer.getVertexRangeChanges();
-        if (LOG.isInfoEnabled()) {
-            LOG.info("balanceVertexRanges: Waiting on " + changes +
-                     " vertex range changes");
-        }
-        if (changes == 0) {
-            return;
-        }
-
-        String vertexRangeExchangePath =
-            getVertexRangeExchangePath(getApplicationAttempt(), getSuperstep());
-        List<String> workerExchangeList = null;
-        try {
-            getZkExt().createExt(vertexRangeExchangePath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isInfoEnabled()) {
-                LOG.info("balanceVertexRanges: " + vertexRangeExchangePath +
-                         "exists");
-            }
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "balanceVertexRanges: Got KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "balanceVertexRanges: Got InterruptedException", e);
-        }
-        while (true) {
-            try {
-                workerExchangeList =
-                    getZkExt().getChildrenExt(
-                        vertexRangeExchangePath, true, false, false);
+                getZkExt().createOnceExt(vertexExchangePath,
+                                         null,
+                                         Ids.OPEN_ACL_UNSAFE,
+                                         CreateMode.PERSISTENT,
+                                         true);
             } catch (KeeperException e) {
                 throw new IllegalStateException(
-                    "balanceVertexRanges: Got KeeperException", e);
+                    "assignPartitionOwners: KeeperException creating " +
+                    vertexExchangePath);
             } catch (InterruptedException e) {
                 throw new IllegalStateException(
-                    "balanceVertexRanges: Got InterruptedException", e);
-            }
-            if (workerExchangeList.size() == changes) {
-                break;
+                    "assignPartitionOwners: InterruptedException creating " +
+                    vertexExchangePath);
             }
-            getVertexRangeExchangeChildrenChangedEvent().waitForever();
-            getVertexRangeExchangeChildrenChangedEvent().reset();
-        }
-        String vertexRangeExchangeFinishedPath =
-            getVertexRangeExchangeFinishedPath(getApplicationAttempt(),
-                                               getSuperstep());
-        try {
-            getZkExt().createExt(vertexRangeExchangeFinishedPath,
-                                 null,
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-        } catch (KeeperException e) {
-            throw new IllegalStateException(
-                "balanceVertexRanges: Got KeeperException", e);
-        } catch (InterruptedException e) {
-            throw new IllegalStateException(
-                "balanceVertexRanges: Got InterruptedException", e);
         }
+
+        // Workers are waiting for these assignments
+        String partitionAssignmentsPath =
+            getPartitionAssignmentsPath(getApplicationAttempt(),
+                                        getSuperstep());
+        WritableUtils.writeListToZnode(
+            getZkExt(),
+            partitionAssignmentsPath,
+            -1,
+            new ArrayList<Writable>(partitionOwners));
     }
 
     /**
      * Check whether the workers chosen for this superstep are still alive
      *
      * @param chosenWorkerHealthPath Path to the healthy workers in ZooKeeper
-     * @param chosenWorkerSet Set of the healthy workers
+     * @param chosenWorkerList List of the healthy workers
      * @return true if they are all alive, false otherwise.
      * @throws InterruptedException
      * @throws KeeperException
      */
     private boolean superstepChosenWorkerAlive(
-            String chosenWorkerHealthPath,
-            Set<String> chosenWorkerSet)
+            String chosenWorkerInfoHealthPath,
+            List<WorkerInfo> chosenWorkerInfoList)
             throws KeeperException, InterruptedException {
-        List<String> chosenWorkerHealthyList =
-            getZkExt().getChildrenExt(chosenWorkerHealthPath,
-                                      true,
-                                      false,
-                                      false);
-        Set<String> chosenWorkerHealthySet =
-            new HashSet<String>(chosenWorkerHealthyList);
+        List<WorkerInfo> chosenWorkerInfoHealthyList =
+            getWorkerInfosFromPath(chosenWorkerInfoHealthPath, false);
+        Set<WorkerInfo> chosenWorkerInfoHealthySet =
+            new HashSet<WorkerInfo>(chosenWorkerInfoHealthyList);
         boolean allChosenWorkersHealthy = true;
-        for (String chosenWorker : chosenWorkerSet) {
-            if (!chosenWorkerHealthySet.contains(chosenWorker)) {
+        for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
+            if (!chosenWorkerInfoHealthySet.contains(chosenWorkerInfo)) {
                 allChosenWorkersHealthy = false;
-                LOG.warn("superstepChosenWorkerAlive: Missing chosen worker " +
-                         chosenWorker + " on superstep " + getSuperstep());
+                LOG.error("superstepChosenWorkerAlive: Missing chosen " +
+                          "worker " + chosenWorkerInfo +
+                          " on superstep " + getSuperstep());
             }
         }
         return allChosenWorkersHealthy;
@@ -1568,10 +1220,12 @@ public class BspServiceMaster<
             Arrays.sort(fileStatusArray);
             lastCheckpointedSuperstep = getCheckpoint(
                 fileStatusArray[fileStatusArray.length - 1].getPath());
-            LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
-                     lastCheckpointedSuperstep + " from " +
-                     fileStatusArray[fileStatusArray.length - 1].
-                     getPath().toString());
+            if (LOG.isInfoEnabled()) {
+                LOG.info("getLastGoodCheckpoint: Found last good checkpoint " +
+                         lastCheckpointedSuperstep + " from " +
+                         fileStatusArray[fileStatusArray.length - 1].
+                         getPath().toString());
+            }
         }
         return lastCheckpointedSuperstep;
     }
@@ -1581,81 +1235,55 @@ public class BspServiceMaster<
             KeeperException, InterruptedException {
         // 1. Get chosen workers and set up watches on them.
         // 2. Assign partitions to the workers
-        //    or possibly reload from a superstep
+        //    (possibly reloading from a superstep)
         // 3. Wait for all workers to complete
         // 4. Collect and process aggregators
         // 5. Create superstep finished node
         // 6. If the checkpoint frequency is met, finalize the checkpoint
-        Map<String, JSONArray> chosenWorkerHostnamePortMap = checkWorkers();
-        if (chosenWorkerHostnamePortMap == null) {
+        List<WorkerInfo> chosenWorkerInfoList = checkWorkers();
+        if (chosenWorkerInfoList.isEmpty()) {
             LOG.fatal("coordinateSuperstep: Not enough healthy workers for " +
                       "superstep " + getSuperstep());
             setJobState(ApplicationState.FAILED, -1, -1);
         } else {
-            for (Entry<String, JSONArray> entry :
-                    chosenWorkerHostnamePortMap.entrySet()) {
-                String workerHealthyPath =
-                    getWorkerHealthyPath(getApplicationAttempt(),
-                                         getSuperstep()) + "/" + entry.getKey();
-                if (getZkExt().exists(workerHealthyPath, true) == null) {
+            for (WorkerInfo workerInfo : chosenWorkerInfoList) {
+                String workerInfoHealthyPath =
+                    getWorkerInfoHealthyPath(getApplicationAttempt(),
+                                             getSuperstep()) + "/" +
+                                             workerInfo.getHostnameId();
+                if (getZkExt().exists(workerInfoHealthyPath, true) == null) {
                     LOG.warn("coordinateSuperstep: Chosen worker " +
-                             workerHealthyPath +
+                             workerInfoHealthyPath +
                              " is no longer valid, failing superstep");
                 }
             }
         }
 
-        currentWorkersCounter.increment(chosenWorkerHostnamePortMap.size() -
+        currentWorkersCounter.increment(chosenWorkerInfoList.size() -
                                         currentWorkersCounter.getValue());
-        if (getRestartedSuperstep() == getSuperstep()) {
-            try {
-                if (LOG.isInfoEnabled()) {
-                    LOG.info("coordinateSuperstep: Reloading from superstep " +
-                             getSuperstep());
-                }
-                mapFilesToWorkers(
-                    getRestartedSuperstep(),
-                    new ArrayList<String>(
-                        chosenWorkerHostnamePortMap.keySet()));
-                inputSplitsToVertexRanges(chosenWorkerHostnamePortMap);
-            } catch (IOException e) {
-                throw new IllegalStateException(
-                    "coordinateSuperstep: Failed to reload", e);
-            }
-        } else {
-            if (getSuperstep() > INPUT_SUPERSTEP) {
-                VertexRangeBalancer<I, V, E, M> vertexRangeBalancer =
-                    BspUtils.<I, V, E, M>createVertexRangeBalancer(
-                        getConfiguration());
-                synchronized (vertexRangeSynchronization) {
-                    balanceVertexRanges(vertexRangeBalancer,
-                                        chosenWorkerHostnamePortMap);
-                }
-            }
-        }
+        assignPartitionOwners(allPartitionStatsList,
+                              chosenWorkerInfoList,
+                              masterGraphPartitioner);
 
         String finishedWorkerPath =
             getWorkerFinishedPath(getApplicationAttempt(), getSuperstep());
-        try {
-            getZkExt().createExt(finishedWorkerPath,
+        getZkExt().createOnceExt(finishedWorkerPath,
                                  null,
                                  Ids.OPEN_ACL_UNSAFE,
                                  CreateMode.PERSISTENT,
                                  true);
-        } catch (KeeperException.NodeExistsException e) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("coordinateSuperstep: finishedWorkers " +
-                          finishedWorkerPath +
-                          " already exists, no need to create");
-            }
+        String WorkerInfoHealthyPath =
+            getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
+        List<String> chosenHostnameIdList =
+            new ArrayList<String>(chosenWorkerInfoList.size());
+        for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
+            chosenHostnameIdList.add(chosenWorkerInfo.getHostnameId());
         }
-        String workerHealthyPath =
-            getWorkerHealthyPath(getApplicationAttempt(), getSuperstep());
-        List<String> finishedWorkerList = null;
+        List<String> finishedHostnameIdList = null;
         long nextInfoMillis = System.currentTimeMillis();
         while (true) {
             try {
-                finishedWorkerList =
+                finishedHostnameIdList =
                     getZkExt().getChildrenExt(finishedWorkerPath,
                                               true,
                                               false,
@@ -1667,86 +1295,73 @@ public class BspServiceMaster<
             }
             if (LOG.isDebugEnabled()) {
                 LOG.debug("coordinateSuperstep: Got finished worker list = " +
-                          finishedWorkerList + ", size = " +
-                          finishedWorkerList.size() +
+                          finishedHostnameIdList + ", size = " +
+                          finishedHostnameIdList.size() +
                           ", chosen worker list = " +
-                          chosenWorkerHostnamePortMap.keySet() + ", size = " +
-                          chosenWorkerHostnamePortMap.size() +
+                          chosenWorkerInfoList + ", size = " +
+                          chosenWorkerInfoList.size() +
                           " from " + finishedWorkerPath);
             }
 
             if (LOG.isInfoEnabled() &&
                     (System.currentTimeMillis() > nextInfoMillis)) {
                 nextInfoMillis = System.currentTimeMillis() + 30000;
-                LOG.info("coordinateSuperstep: " + finishedWorkerList.size() +
-                         " out of " +
-                         chosenWorkerHostnamePortMap.size() +
+                LOG.info("coordinateSuperstep: " +
+                         finishedHostnameIdList.size() +
+                         " out of " + chosenWorkerInfoList.size() +
                          " chosen workers finished on superstep " +
                          getSuperstep());
             }
             getContext().setStatus(getGraphMapper().getMapFunctions() + " - " +
-                                   finishedWorkerList.size() +
+                                   finishedHostnameIdList.size() +
                                    " finished out of " +
-                                   chosenWorkerHostnamePortMap.size() +
+                                   chosenWorkerInfoList.size() +
                                    " on superstep " + getSuperstep());
-            if (finishedWorkerList.containsAll(
-                chosenWorkerHostnamePortMap.keySet())) {
+            if (finishedHostnameIdList.containsAll(chosenHostnameIdList)) {
                 break;
             }
-            getSuperstepStateChangedEvent().waitForever();
+
+            // Wait for a signal or no more than 60 seconds to progress
+            // or else will die.
+            getSuperstepStateChangedEvent().waitMsecs(60*1000);
             getSuperstepStateChangedEvent().reset();
+            getContext().progress();
 
             // Did a worker die?
             if ((getSuperstep() > 0) &&
                     !superstepChosenWorkerAlive(
-                        workerHealthyPath,
-                        chosenWorkerHostnamePortMap.keySet())) {
+                        WorkerInfoHealthyPath,
+                        chosenWorkerInfoList)) {
                 return SuperstepState.WORKER_FAILURE;
             }
         }
         collectAndProcessAggregatorValues(getSuperstep());
-        JSONObject globalInfoObject = aggregateWorkerStats(getSuperstep());
-
-        // Convert the input split stats to vertex ranges in INPUT_SUPERSTEP
-        if (getSuperstep() == INPUT_SUPERSTEP) {
-            inputSplitsToVertexRanges(chosenWorkerHostnamePortMap);
-        }
+        GlobalStats globalStats = aggregateWorkerStats(getSuperstep());
 
         // Let everyone know the aggregated application state through the
-        // superstep
+        // superstep finishing znode.
         String superstepFinishedNode =
             getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
-        try {
-            getZkExt().createExt(superstepFinishedNode,
-                                 globalInfoObject.toString().getBytes(),
-                                 Ids.OPEN_ACL_UNSAFE,
-                                 CreateMode.PERSISTENT,
-                                 true);
-            vertexCounter.increment(
-                globalInfoObject.getLong(JSONOBJ_NUM_VERTICES_KEY) -
-                vertexCounter.getValue());
-            finishedVertexCounter.increment(
-                globalInfoObject.getLong(JSONOBJ_FINISHED_VERTICES_KEY) -
-                finishedVertexCounter.getValue());
-            edgeCounter.increment(
-                globalInfoObject.getLong(JSONOBJ_NUM_EDGES_KEY) -
-                edgeCounter.getValue());
-            sentMessagesCounter.increment(
-                globalInfoObject.getLong(JSONOBJ_NUM_MESSAGES_KEY) -
-                sentMessagesCounter.getValue());
-        } catch (JSONException e) {
-            throw new IllegalStateException("coordinateSuperstep: " +
-                                            "JSONException", e);
-        }
-
+        WritableUtils.writeToZnode(
+            getZkExt(), superstepFinishedNode, -1, globalStats);
+        vertexCounter.increment(
+            globalStats.getVertexCount() -
+            vertexCounter.getValue());
+        finishedVertexCounter.increment(
+            globalStats.getFinishedVertexCount() -
+            finishedVertexCounter.getValue());
+        edgeCounter.increment(
+            globalStats.getEdgeCount() -
+            edgeCounter.getValue());
+        sentMessagesCounter.increment(
+            globalStats.getMessageCount() -
+            sentMessagesCounter.getValue());
 
         // Finalize the valid checkpoint file prefixes and possibly
         // the aggregators.
         if (checkpointFrequencyMet(getSuperstep())) {
             try {
-                finalizeCheckpoint(
-                    getSuperstep(),
-                    new ArrayList<String>(chosenWorkerHostnamePortMap.keySet()));
+                finalizeCheckpoint(getSuperstep(), chosenWorkerInfoList);
             } catch (IOException e) {
                 throw new IllegalStateException(
                     "coordinateSuperstep: IOException on finalizing checkpoint",
@@ -1781,21 +1396,16 @@ public class BspServiceMaster<
             }
         }
         incrCachedSuperstep();
-        if (getSuperstep() > 0) {  // counter starts at zero, so no need to incr
+        // Counter starts at zero, so no need to increment
+        if (getSuperstep() > 0) {
             superstepCounter.increment(1);
         }
-        try {
-            if ((globalInfoObject.getLong(JSONOBJ_FINISHED_VERTICES_KEY) ==
-                    globalInfoObject.getLong(JSONOBJ_NUM_VERTICES_KEY)) &&
-                    (globalInfoObject.getLong(JSONOBJ_NUM_MESSAGES_KEY)) == 0) {
-                return SuperstepState.ALL_SUPERSTEPS_DONE;
-            } else {
-                return SuperstepState.THIS_SUPERSTEP_DONE;
-            }
-        } catch (JSONException e) {
-            throw new IllegalStateException(
-                "coordinateSuperstep: JSONException on checking if " +
-                "the application is done", e);
+        if ((globalStats.getFinishedVertexCount() ==
+                globalStats.getVertexCount()) &&
+                globalStats.getMessageCount() == 0) {
+            return SuperstepState.ALL_SUPERSTEPS_DONE;
+        } else {
+            return SuperstepState.THIS_SUPERSTEP_DONE;
         }
     }
 
@@ -1950,30 +1560,28 @@ public class BspServiceMaster<
      * @param failedWorkerPath Full path to the failed worker
      */
     final private void checkHealthyWorkerFailure(String failedWorkerPath) {
-        synchronized(vertexRangeSynchronization) {
-            if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
-                return;
-            }
+        if (getSuperstepFromPath(failedWorkerPath) < getSuperstep()) {
+            return;
+        }
 
-            // Check all the workers used in this superstep
-            NavigableMap<I, VertexRange<I, V, E, M>> vertexRangeMap =
-                getVertexRangeMap(getSuperstep());
-            String hostnameId =
-                getHealthyHostnameIdFromPath(failedWorkerPath);
-            for (VertexRange<I, V, E, M> vertexRange :
-                    vertexRangeMap.values()) {
-                if (vertexRange.getHostnameId().equals(hostnameId) ||
-                        ((vertexRange.getPreviousHostnameId() != null) &&
-                                vertexRange.getPreviousHostnameId().equals(
-                                    hostnameId))) {
-                    LOG.warn("checkHealthyWorkerFailure: " +
-                             "at least one healthy worker went down " +
-                             "for superstep " + getSuperstep() + " - " +
-                             hostnameId + ", will try to restart from " +
-                             "checkpointed superstep " +
-                             lastCheckpointedSuperstep);
-                    superstepStateChanged.signal();
-                }
+        Collection<PartitionOwner> partitionOwners =
+            masterGraphPartitioner.getCurrentPartitionOwners();
+        String hostnameId =
+            getHealthyHostnameIdFromPath(failedWorkerPath);
+        for (PartitionOwner partitionOwner : partitionOwners) {
+            WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
+            WorkerInfo previousWorkerInfo =
+                partitionOwner.getPreviousWorkerInfo();
+            if (workerInfo.getHostnameId().equals(hostnameId) ||
+                ((previousWorkerInfo != null) &&
+                    previousWorkerInfo.getHostnameId().equals(hostnameId))) {
+                LOG.warn("checkHealthyWorkerFailure: " +
+                        "at least one healthy worker went down " +
+                        "for superstep " + getSuperstep() + " - " +
+                        hostnameId + ", will try to restart from " +
+                        "checkpointed superstep " +
+                        lastCheckpointedSuperstep);
+                superstepStateChanged.signal();
             }
         }
     }



Mime
View raw message