giraph-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From edu...@apache.org
Subject git commit: updated refs/heads/trunk to 7f2d584
Date Thu, 18 Dec 2014 23:15:32 GMT
Repository: giraph
Updated Branches:
  refs/heads/trunk 8675c84a8 -> 7f2d58445


GIRAPH-972 Race condition in checkpointing


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/7f2d5844
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/7f2d5844
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/7f2d5844

Branch: refs/heads/trunk
Commit: 7f2d58445e2353a1a42fbb4282ed5cad724186b5
Parents: 8675c84
Author: Sergey Edunov <edunov@fb.com>
Authored: Thu Dec 18 10:05:36 2014 -0800
Committer: Sergey Edunov <edunov@fb.com>
Committed: Thu Dec 18 15:13:19 2014 -0800

----------------------------------------------------------------------
 CHANGELOG                                       |  3 +++
 .../java/org/apache/giraph/bsp/BspService.java  | 21 +++++++++++++++++++
 .../apache/giraph/master/BspServiceMaster.java  | 22 +++++++++-----------
 .../apache/giraph/worker/BspServiceWorker.java  |  8 +++----
 4 files changed, 38 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 7b54584..efa2878 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,9 @@
 Giraph Change Log
 
 Release 1.2.0 - unreleased
+  
+  GIRAPH-972: Race condition in checkpointing (edunov)
+
   GIRAPH-905: Giraph Debugger (netj via edunov)
 
   GIRAPH-966: Add a way to ignore some thread exceptions (majakabiljo)

http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
index 579c772..0a5a7ba 100644
--- a/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
+++ b/giraph-core/src/main/java/org/apache/giraph/bsp/BspService.java
@@ -1203,4 +1203,25 @@ public abstract class BspService<I extends WritableComparable,
   }
 
 
+  /**
+   * For every worker this method returns unique number
+   * between 0 and N, where N is the total number of workers.
+   * This number stays the same throughout the computation.
+   * TaskID may be different from this number and task ID
+   * is not necessarily continuous
+   * @param workerInfo worker info object
+   * @return worker number
+   */
+  protected int getWorkerId(WorkerInfo workerInfo) {
+    return getWorkerInfoList().indexOf(workerInfo);
+  }
+
+  /**
+   * Returns worker info corresponding to specified worker id.
+   * @param id unique worker id
+   * @return WorkerInfo
+   */
+  protected WorkerInfo getWorkerInfoById(int id) {
+    return getWorkerInfoList().get(id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
index 39b4a1c..798f544 100644
--- a/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
+++ b/giraph-core/src/main/java/org/apache/giraph/master/BspServiceMaster.java
@@ -22,8 +22,6 @@ import static org.apache.giraph.conf.GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT;
 import static org.apache.giraph.conf.GiraphConstants.KEEP_ZOOKEEPER_DATA;
 import static org.apache.giraph.conf.GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT;
 import static org.apache.giraph.conf.GiraphConstants.USE_INPUT_SPLIT_LOCALITY;
-import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
-import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
 
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -413,9 +411,14 @@ public class BspServiceMaster<I extends WritableComparable,
     }
     for (String workerInfoPath : workerInfoPathList) {
       WorkerInfo workerInfo = new WorkerInfo();
-      WritableUtils.readFieldsFromZnode(
-          getZkExt(), workerInfoPath, true, null, workerInfo);
-      workerInfoList.add(workerInfo);
+      try {
+        WritableUtils.readFieldsFromZnode(
+            getZkExt(), workerInfoPath, true, null, workerInfo);
+        workerInfoList.add(workerInfo);
+      } catch (IllegalStateException e) {
+        LOG.warn("Can't get info from worker, did it die in between? " +
+            "workerInfoPath=" + workerInfoPath, e);
+      }
     }
     return workerInfoList;
   }
@@ -785,11 +788,6 @@ public class BspServiceMaster<I extends WritableComparable,
     getConfiguration().updateSuperstepClasses(superstepClasses);
     int prefixFileCount = finalizedStream.readInt();
 
-
-    Int2ObjectMap<WorkerInfo> workersMap = new Int2ObjectOpenHashMap<>();
-    for (WorkerInfo worker : chosenWorkerInfoList) {
-      workersMap.put(worker.getTaskId(), worker);
-    }
     String checkpointFile =
         finalizedStream.readUTF();
     for (int i = 0; i < prefixFileCount; ++i) {
@@ -798,7 +796,7 @@ public class BspServiceMaster<I extends WritableComparable,
       DataInputStream metadataStream = fs.open(new Path(checkpointFile +
           "." + mrTaskId + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
       long partitions = metadataStream.readInt();
-      WorkerInfo worker = workersMap.get(mrTaskId);
+      WorkerInfo worker = getWorkerInfoById(mrTaskId);
       for (long p = 0; p < partitions; ++p) {
         int partitionId = metadataStream.readInt();
         PartitionOwner partitionOwner = new BasicPartitionOwner(partitionId,
@@ -1107,7 +1105,7 @@ public class BspServiceMaster<I extends WritableComparable,
     finalizedOutputStream.writeInt(chosenWorkerInfoList.size());
     finalizedOutputStream.writeUTF(getCheckpointBasePath(superstep));
     for (WorkerInfo chosenWorkerInfo : chosenWorkerInfoList) {
-      finalizedOutputStream.writeInt(chosenWorkerInfo.getTaskId());
+      finalizedOutputStream.writeInt(getWorkerId(chosenWorkerInfo));
     }
     globalCommHandler.write(finalizedOutputStream);
     aggregatorTranslation.write(finalizedOutputStream);

http://git-wip-us.apache.org/repos/asf/giraph/blob/7f2d5844/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
----------------------------------------------------------------------
diff --git a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
index 4ad8400..381e51a 100644
--- a/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
+++ b/giraph-core/src/main/java/org/apache/giraph/worker/BspServiceWorker.java
@@ -1462,8 +1462,8 @@ public class BspServiceWorker<I extends WritableComparable,
    * @throws IOException
    */
   private Path createCheckpointFilePathSafe(String name) throws IOException {
-    Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + "." +
-        getTaskPartition() + name);
+    Path validFilePath = new Path(getCheckpointBasePath(getSuperstep()) + '.' +
+        getWorkerId(workerInfo) + name);
     // Remove these files if they already exist (shouldn't though, unless
     // of previous failure of this worker)
     if (getFs().delete(validFilePath, false)) {
@@ -1481,8 +1481,8 @@ public class BspServiceWorker<I extends WritableComparable,
    * @return fill file path to checkpoint file
    */
   private Path getSavedCheckpoint(long superstep, String name) {
-    return new Path(getSavedCheckpointBasePath(superstep) + "." +
-        getTaskPartition() + name);
+    return new Path(getSavedCheckpointBasePath(superstep) + '.' +
+        getWorkerId(workerInfo) + name);
   }
 
   /**


Mime
View raw message