hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From oz...@apache.org
Subject hadoop git commit: YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail due to IOException. Contributed by Zhihai Xu.
Date Fri, 27 Feb 2015 15:57:20 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 657b027bb -> 79f73f461


YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail due to IOException. Contributed
by Zhihai Xu.

(cherry picked from commit 01a1621930df17a745dd37892996c68fca3447d1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79f73f46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79f73f46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79f73f46

Branch: refs/heads/branch-2
Commit: 79f73f461362d6d574e248f65d1e0dc6e895524a
Parents: 657b027
Author: Tsuyoshi Ozawa <ozawa@apache.org>
Authored: Sat Feb 28 00:56:44 2015 +0900
Committer: Tsuyoshi Ozawa <ozawa@apache.org>
Committed: Sat Feb 28 00:57:01 2015 +0900

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   9 +
 .../src/main/resources/yarn-default.xml         |  15 +
 .../recovery/FileSystemRMStateStore.java        | 303 ++++++++++++++-----
 .../recovery/TestFSRMStateStore.java            |   5 +
 5 files changed, 265 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f73f46/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 801192a..b016cbb 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -294,6 +294,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3255. RM, NM, JobHistoryServer, and WebAppProxyServer's main()
     should support generic options. (shv)
 
+    YARN-2820. Retry in FileSystemRMStateStore when FS's operations fail 
+    due to IOException. (Zhihai Xu via ozawa)
+
   OPTIMIZATIONS
 
     YARN-2990. FairScheduler's delay-scheduling always waits for node-local and 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f73f46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 544ae1b..8cc7ad7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -508,6 +508,15 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_FS_RM_STATE_STORE_RETRY_POLICY_SPEC =
       "2000, 500";
 
+  public static final String FS_RM_STATE_STORE_NUM_RETRIES =
+      RM_PREFIX + "fs.state-store.num-retries";
+  public static final int DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES = 0;
+
+  public static final String FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
+      RM_PREFIX + "fs.state-store.retry-interval-ms";
+  public static final long DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS =
+      1000L;
+
   public static final String RM_LEVELDB_STORE_PATH = RM_PREFIX
       + "leveldb-state-store.path";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f73f46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 0a1d3db..f311f16 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -420,6 +420,21 @@
   </property>
 
   <property>
+    <description>the number of retries to recover from IOException in
+    FileSystemRMStateStore.
+    </description>
+    <name>yarn.resourcemanager.fs.state-store.num-retries</name>
+    <value>0</value>
+  </property>
+
+  <property>
+    <description>Retry interval in milliseconds in FileSystemRMStateStore.
+    </description>
+    <name>yarn.resourcemanager.fs.state-store.retry-interval-ms</name>
+    <value>1000</value>
+  </property>
+
+  <property>
     <description>Local path where the RM state will be stored when using
     org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore
     as the value for yarn.resourcemanager.store.class</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f73f46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 6e830a0..8147597 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -92,6 +92,8 @@ public class FileSystemRMStateStore extends RMStateStore {
   Path rmDTSecretManagerRoot;
   private Path rmAppRoot;
   private Path dtSequenceNumberPath = null;
+  private int fsNumRetries;
+  private long fsRetryInterval;
 
   @VisibleForTesting
   Path fsWorkingPath;
@@ -106,6 +108,12 @@ public class FileSystemRMStateStore extends RMStateStore {
     rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
     amrmTokenSecretManagerRoot =
         new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
+    fsNumRetries =
+        conf.getInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES,
+            YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_NUM_RETRIES);
+    fsRetryInterval =
+        conf.getLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
+                YarnConfiguration.DEFAULT_FS_RM_STATE_STORE_RETRY_INTERVAL_MS);
   }
 
   @Override
@@ -121,14 +129,14 @@ public class FileSystemRMStateStore extends RMStateStore {
     conf.set("dfs.client.retry.policy.spec", retryPolicy);
 
     fs = fsWorkingPath.getFileSystem(conf);
-    fs.mkdirs(rmDTSecretManagerRoot);
-    fs.mkdirs(rmAppRoot);
-    fs.mkdirs(amrmTokenSecretManagerRoot);
+    mkdirsWithRetries(rmDTSecretManagerRoot);
+    mkdirsWithRetries(rmAppRoot);
+    mkdirsWithRetries(amrmTokenSecretManagerRoot);
   }
 
   @Override
   protected synchronized void closeInternal() throws Exception {
-    fs.close();
+    closeWithRetries();
   }
 
   @Override
@@ -139,9 +147,9 @@ public class FileSystemRMStateStore extends RMStateStore {
   @Override
   protected synchronized Version loadVersion() throws Exception {
     Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
-    FileStatus status = getFileStatus(versionNodePath);
+    FileStatus status = getFileStatusWithRetries(versionNodePath);
     if (status != null) {
-      byte[] data = readFile(versionNodePath, status.getLen());
+      byte[] data = readFileWithRetries(versionNodePath, status.getLen());
       Version version =
           new VersionPBImpl(VersionProto.parseFrom(data));
       return version;
@@ -154,10 +162,10 @@ public class FileSystemRMStateStore extends RMStateStore {
     Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
     byte[] data =
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
-    if (fs.exists(versionNodePath)) {
+    if (existsWithRetries(versionNodePath)) {
       updateFile(versionNodePath, data);
     } else {
-      writeFile(versionNodePath, data);
+      writeFileWithRetries(versionNodePath, data);
     }
   }
   
@@ -165,10 +173,10 @@ public class FileSystemRMStateStore extends RMStateStore {
   public synchronized long getAndIncrementEpoch() throws Exception {
     Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
     long currentEpoch = 0;
-    FileStatus status = getFileStatus(epochNodePath);
+    FileStatus status = getFileStatusWithRetries(epochNodePath);
     if (status != null) {
       // load current epoch
-      byte[] data = readFile(epochNodePath, status.getLen());
+      byte[] data = readFileWithRetries(epochNodePath, status.getLen());
       Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
       currentEpoch = epoch.getEpoch();
       // increment epoch and store it
@@ -179,7 +187,7 @@ public class FileSystemRMStateStore extends RMStateStore {
       // initialize epoch file with 1 for the next time.
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      writeFile(epochNodePath, storeData);
+      writeFileWithRetries(epochNodePath, storeData);
     }
     return currentEpoch;
   }
@@ -201,12 +209,14 @@ public class FileSystemRMStateStore extends RMStateStore {
     checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
     Path amrmTokenSecretManagerStateDataDir =
         new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
-    FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir);
+    FileStatus status = getFileStatusWithRetries(
+        amrmTokenSecretManagerStateDataDir);
     if (status == null) {
       return;
     }
     assert status.isFile();
-    byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
+    byte[] data = readFileWithRetries(amrmTokenSecretManagerStateDataDir,
+            status.getLen());
     AMRMTokenSecretManagerStatePBImpl stateData =
         new AMRMTokenSecretManagerStatePBImpl(
           AMRMTokenSecretManagerStateProto.parseFrom(data));
@@ -220,16 +230,18 @@ public class FileSystemRMStateStore extends RMStateStore {
       List<ApplicationAttemptStateData> attempts =
           new ArrayList<ApplicationAttemptStateData>();
 
-      for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
+      for (FileStatus appDir : listStatusWithRetries(rmAppRoot)) {
         checkAndResumeUpdateOperation(appDir.getPath());
-        for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
+        for (FileStatus childNodeStatus :
+            listStatusWithRetries(appDir.getPath())) {
           assert childNodeStatus.isFile();
           String childNodeName = childNodeStatus.getPath().getName();
-          if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+          if (checkAndRemovePartialRecordWithRetries(
+              childNodeStatus.getPath())) {
             continue;
           }
-          byte[] childData =
-              readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
+          byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
+              childNodeStatus.getLen());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
             // application
             if (LOG.isDebugEnabled()) {
@@ -292,7 +304,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     // If it does, the prior updateFile is failed on half way. We need to
     // complete replacing the old file first.
     FileStatus[] newChildNodes =
-        fs.listStatus(path, new PathFilter() {
+        listStatusWithRetries(path, new PathFilter() {
       @Override
       public boolean accept(Path path) {
         return path.getName().endsWith(".new");
@@ -310,12 +322,12 @@ public class FileSystemRMStateStore extends RMStateStore {
   }
   private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
     checkAndResumeUpdateOperation(rmDTSecretManagerRoot);
-    FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
+    FileStatus[] childNodes = listStatusWithRetries(rmDTSecretManagerRoot);
 
     for(FileStatus childNodeStatus : childNodes) {
       assert childNodeStatus.isFile();
       String childNodeName = childNodeStatus.getPath().getName();
-      if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
+      if (checkAndRemovePartialRecordWithRetries(childNodeStatus.getPath())) {
         continue;
       }
       if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
@@ -325,35 +337,36 @@ public class FileSystemRMStateStore extends RMStateStore {
       }
 
       Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
-      byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
+      byte[] childData = readFileWithRetries(childNodePath,
+          childNodeStatus.getLen());
       ByteArrayInputStream is = new ByteArrayInputStream(childData);
-      DataInputStream fsIn = new DataInputStream(is);
-      if(childNodeName.startsWith(DELEGATION_KEY_PREFIX)){
-        DelegationKey key = new DelegationKey();
-        key.readFields(fsIn);
-        rmState.rmSecretManagerState.masterKeyState.add(key);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
-              + ", expirationDate=" + key.getExpiryDate());
-        }
-      } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
-        RMDelegationTokenIdentifierData identifierData =
-            new RMDelegationTokenIdentifierData();
-        identifierData.readFields(fsIn);
-        RMDelegationTokenIdentifier identifier =
-            identifierData.getTokenIdentifier();
-        long renewDate = identifierData.getRenewDate();
-
-        rmState.rmSecretManagerState.delegationTokenState.put(identifier,
-          renewDate);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
-              + " renewDate=" + renewDate);
+      try (DataInputStream fsIn = new DataInputStream(is)) {
+        if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
+          DelegationKey key = new DelegationKey();
+          key.readFields(fsIn);
+          rmState.rmSecretManagerState.masterKeyState.add(key);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded delegation key: keyId=" + key.getKeyId()
+                + ", expirationDate=" + key.getExpiryDate());
+          }
+        } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+          RMDelegationTokenIdentifierData identifierData =
+              new RMDelegationTokenIdentifierData();
+          identifierData.readFields(fsIn);
+          RMDelegationTokenIdentifier identifier =
+              identifierData.getTokenIdentifier();
+          long renewDate = identifierData.getRenewDate();
+
+          rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+            renewDate);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded RMDelegationTokenIdentifier: " + identifier
+                + " renewDate=" + renewDate);
+          }
+        } else {
+          LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
         }
-      } else {
-        LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
       }
-      fsIn.close();
     }
   }
 
@@ -361,7 +374,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   public synchronized void storeApplicationStateInternal(ApplicationId appId,
       ApplicationStateData appStateDataPB) throws Exception {
     Path appDirPath = getAppDir(rmAppRoot, appId);
-    fs.mkdirs(appDirPath);
+    mkdirsWithRetries(appDirPath);
     Path nodeCreatePath = getNodePath(appDirPath, appId.toString());
 
     LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
@@ -369,7 +382,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      writeFile(nodeCreatePath, appStateData);
+      writeFileWithRetries(nodeCreatePath, appStateData);
     } catch (Exception e) {
       LOG.info("Error storing info for app: " + appId, e);
       throw e;
@@ -408,7 +421,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      writeFile(nodeCreatePath, attemptStateData);
+      writeFileWithRetries(nodeCreatePath, attemptStateData);
     } catch (Exception e) {
       LOG.info("Error storing info for attempt: " + appAttemptId, e);
       throw e;
@@ -444,7 +457,7 @@ public class FileSystemRMStateStore extends RMStateStore {
         appState.getApplicationSubmissionContext().getApplicationId();
     Path nodeRemovePath = getAppDir(rmAppRoot, appId);
     LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
-    deleteFile(nodeRemovePath);
+    deleteFileWithRetries(nodeRemovePath);
   }
 
   @Override
@@ -460,7 +473,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
       DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
     LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
-    deleteFile(nodeCreatePath);
+    deleteFileWithRetries(nodeCreatePath);
   }
 
   @Override
@@ -483,7 +496,7 @@ public class FileSystemRMStateStore extends RMStateStore {
       updateFile(nodeCreatePath, identifierData.toByteArray());
     } else {
       LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
-      writeFile(nodeCreatePath, identifierData.toByteArray());
+      writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
 
       // store sequence number
       Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
@@ -492,11 +505,12 @@ public class FileSystemRMStateStore extends RMStateStore {
       LOG.info("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX
           + identifier.getSequenceNumber());
       if (dtSequenceNumberPath == null) {
-        if (!createFile(latestSequenceNumberPath)) {
+        if (!createFileWithRetries(latestSequenceNumberPath)) {
           throw new Exception("Failed to create " + latestSequenceNumberPath);
         }
       } else {
-        if (!renameFile(dtSequenceNumberPath, latestSequenceNumberPath)) {
+        if (!renameFileWithRetries(dtSequenceNumberPath,
+            latestSequenceNumberPath)) {
           throw new Exception("Failed to rename " + dtSequenceNumberPath);
         }
       }
@@ -510,11 +524,11 @@ public class FileSystemRMStateStore extends RMStateStore {
     Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
           DELEGATION_KEY_PREFIX + masterKey.getKeyId());
     ByteArrayOutputStream os = new ByteArrayOutputStream();
-    DataOutputStream fsOut = new DataOutputStream(os);
-    LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
-    masterKey.write(fsOut);
-    writeFile(nodeCreatePath, os.toByteArray());
-    fsOut.close();
+    try (DataOutputStream fsOut = new DataOutputStream(os)) {
+      LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
+      masterKey.write(fsOut);
+      writeFileWithRetries(nodeCreatePath, os.toByteArray());
+    }
   }
 
   @Override
@@ -523,13 +537,13 @@ public class FileSystemRMStateStore extends RMStateStore {
     Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
           DELEGATION_KEY_PREFIX + masterKey.getKeyId());
     LOG.info("Removing RMDelegationKey_"+ masterKey.getKeyId());
-    deleteFile(nodeCreatePath);
+    deleteFileWithRetries(nodeCreatePath);
   }
 
   @Override
-  public synchronized void deleteStore() throws IOException {
-    if (fs.exists(rootDirPath)) {
-      fs.delete(rootDirPath, true);
+  public synchronized void deleteStore() throws Exception {
+    if (existsWithRetries(rootDirPath)) {
+      deleteFileWithRetries(rootDirPath);
     }
   }
 
@@ -539,6 +553,146 @@ public class FileSystemRMStateStore extends RMStateStore {
 
   // FileSystem related code
 
+  private boolean checkAndRemovePartialRecordWithRetries(final Path record)
+      throws Exception {
+    return new FSAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        return checkAndRemovePartialRecord(record);
+      }
+    }.runWithRetries();
+  }
+
+  private void mkdirsWithRetries(final Path appDirPath) throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        fs.mkdirs(appDirPath);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void writeFileWithRetries(final Path outputPath,final byte[] data)
+      throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        writeFile(outputPath, data);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void deleteFileWithRetries(final Path deletePath) throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        deleteFile(deletePath);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private boolean renameFileWithRetries(final Path src, final Path dst)
+      throws Exception {
+    return new FSAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        return renameFile(src, dst);
+      }
+    }.runWithRetries();
+  }
+
+  private boolean createFileWithRetries(final Path newFile) throws Exception {
+    return new FSAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        return createFile(newFile);
+      }
+    }.runWithRetries();
+  }
+
+  private FileStatus getFileStatusWithRetries(final Path path)
+      throws Exception {
+    return new FSAction<FileStatus>() {
+      @Override
+      public FileStatus run() throws Exception {
+        return getFileStatus(path);
+      }
+    }.runWithRetries();
+  }
+
+  private boolean existsWithRetries(final Path path) throws Exception {
+    return new FSAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        return fs.exists(path);
+      }
+    }.runWithRetries();
+  }
+
+  private byte[] readFileWithRetries(final Path inputPath, final long len)
+      throws Exception {
+    return new FSAction<byte[]>() {
+      @Override
+      public byte[] run() throws Exception {
+        return readFile(inputPath, len);
+      }
+    }.runWithRetries();
+  }
+
+  private FileStatus[] listStatusWithRetries(final Path path)
+      throws Exception {
+    return new FSAction<FileStatus[]>() {
+      @Override
+      public FileStatus[] run() throws Exception {
+        return fs.listStatus(path);
+      }
+    }.runWithRetries();
+  }
+
+  private FileStatus[] listStatusWithRetries(final Path path,
+      final PathFilter filter) throws Exception {
+    return new FSAction<FileStatus[]>() {
+      @Override
+      public FileStatus[] run() throws Exception {
+        return fs.listStatus(path, filter);
+      }
+    }.runWithRetries();
+  }
+
+  private void closeWithRetries() throws Exception {
+    new FSAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        fs.close();
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private abstract class FSAction<T> {
+    abstract T run() throws Exception;
+
+    T runWithRetries() throws Exception {
+      int retry = 0;
+      while (true) {
+        try {
+          return run();
+        } catch (IOException e) {
+          LOG.info("Exception while executing a FS operation.", e);
+          if (++retry > fsNumRetries) {
+            LOG.info("Maxed out FS retries. Giving up!");
+            throw e;
+          }
+          LOG.info("Retrying operation on FS. Retry no. " + retry);
+          Thread.sleep(fsRetryInterval);
+        }
+      }
+    }
+  }
+
   private void deleteFile(Path deletePath) throws Exception {
     if(!fs.delete(deletePath, true)) {
       throw new Exception("Failed to delete " + deletePath);
@@ -595,18 +749,18 @@ public class FileSystemRMStateStore extends RMStateStore {
    */
   protected void updateFile(Path outputPath, byte[] data) throws Exception {
     Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
-    // use writeFile to make sure .new file is created atomically
-    writeFile(newPath, data);
+    // use writeFileWithRetries to make sure .new file is created atomically
+    writeFileWithRetries(newPath, data);
     replaceFile(newPath, outputPath);
   }
 
   protected void replaceFile(Path srcPath, Path dstPath) throws Exception {
-    if (fs.exists(dstPath)) {
-      deleteFile(dstPath);
+    if (existsWithRetries(dstPath)) {
+      deleteFileWithRetries(dstPath);
     } else {
       LOG.info("File doesn't exist. Skip deleting the file " + dstPath);
     }
-    fs.rename(srcPath, dstPath);
+    renameFileWithRetries(srcPath, dstPath);
   }
 
   @Private
@@ -637,8 +791,17 @@ public class FileSystemRMStateStore extends RMStateStore {
     if (isUpdate) {
       updateFile(nodeCreatePath, stateData);
     } else {
-      writeFile(nodeCreatePath, stateData);
+      writeFileWithRetries(nodeCreatePath, stateData);
     }
   }
 
+  @VisibleForTesting
+  public int getNumRetries() {
+    return fsNumRetries;
+  }
+
+  @VisibleForTesting
+  public long getRetryInterval() {
+    return fsRetryInterval;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79f73f46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index d0d19e3..675d73c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -100,7 +100,12 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
           workingDirPathURI.toString());
       conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
         "100,6000");
+      conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 5);
+      conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
+          900L);
       this.store = new TestFileSystemRMStore(conf);
+      Assert.assertEquals(store.getNumRetries(), 5);
+      Assert.assertEquals(store.getRetryInterval(), 900L);
       return store;
     }
 


Mime
View raw message