hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject [2/2] hadoop git commit: Fixing HDFS state-store. Contributed by Arun Suresh.
Date Fri, 15 May 2015 01:38:57 GMT
Fixing HDFS state-store. Contributed by Arun Suresh.

(cherry picked from commit 9a2a9553eee454ecd18120535d3e845f86fc3584)
(cherry picked from commit ad3196e01667bd6798a1988fddb0c0ae32f6687c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3c51654d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3c51654d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3c51654d

Branch: refs/heads/branch-2.7
Commit: 3c51654d574e80e245ff4a5c184a8dca17782516
Parents: d9d7bbd
Author: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Authored: Thu May 14 16:13:51 2015 -0700
Committer: Vinod Kumar Vavilapalli <vinodkv@apache.org>
Committed: Thu May 14 18:33:51 2015 -0700

----------------------------------------------------------------------
 .../recovery/FileSystemRMStateStore.java        |  82 +++++++---
 .../recovery/RMStateStoreTestBase.java          |  16 +-
 .../recovery/TestFSRMStateStore.java            | 151 ++++++++++++++++---
 .../src/site/markdown/ResourceManagerHA.md      |   2 +-
 4 files changed, 207 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c51654d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
index 8147597..0f68365 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
@@ -25,6 +25,7 @@ import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -84,6 +86,8 @@ public class FileSystemRMStateStore extends RMStateStore {
   protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
       "AMRMTokenSecretManagerNode";
 
+  private static final String UNREADABLE_BY_SUPERUSER_XATTRIB =
+          "security.hdfs.unreadable.by.superuser";
   protected FileSystem fs;
 
   private Path rootDirPath;
@@ -94,6 +98,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   private Path dtSequenceNumberPath = null;
   private int fsNumRetries;
   private long fsRetryInterval;
+  private boolean isHDFS;
 
   @VisibleForTesting
   Path fsWorkingPath;
@@ -129,11 +134,17 @@ public class FileSystemRMStateStore extends RMStateStore {
     conf.set("dfs.client.retry.policy.spec", retryPolicy);
 
     fs = fsWorkingPath.getFileSystem(conf);
+    isHDFS = fs.getScheme().toLowerCase().contains("hdfs");
     mkdirsWithRetries(rmDTSecretManagerRoot);
     mkdirsWithRetries(rmAppRoot);
     mkdirsWithRetries(amrmTokenSecretManagerRoot);
   }
 
+  @VisibleForTesting
+  void setIsHDFS(boolean isHDFS) {
+    this.isHDFS = isHDFS;
+  }
+
   @Override
   protected synchronized void closeInternal() throws Exception {
     closeWithRetries();
@@ -163,9 +174,9 @@ public class FileSystemRMStateStore extends RMStateStore {
     byte[] data =
         ((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
     if (existsWithRetries(versionNodePath)) {
-      updateFile(versionNodePath, data);
+      updateFile(versionNodePath, data, false);
     } else {
-      writeFileWithRetries(versionNodePath, data);
+      writeFileWithRetries(versionNodePath, data, false);
     }
   }
   
@@ -182,12 +193,12 @@ public class FileSystemRMStateStore extends RMStateStore {
       // increment epoch and store it
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      updateFile(epochNodePath, storeData);
+      updateFile(epochNodePath, storeData, false);
     } else {
       // initialize epoch file with 1 for the next time.
       byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
           .toByteArray();
-      writeFileWithRetries(epochNodePath, storeData);
+      writeFileWithRetries(epochNodePath, storeData, false);
     }
     return currentEpoch;
   }
@@ -241,7 +252,9 @@ public class FileSystemRMStateStore extends RMStateStore {
             continue;
           }
           byte[] childData = readFileWithRetries(childNodeStatus.getPath(),
-              childNodeStatus.getLen());
+                  childNodeStatus.getLen());
+          // Set attribute if not already set
+          setUnreadableBySuperuserXattrib(childNodeStatus.getPath());
           if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
             // application
             if (LOG.isDebugEnabled()) {
@@ -314,7 +327,7 @@ public class FileSystemRMStateStore extends RMStateStore {
       assert newChildNodeStatus.isFile();
       String newChildNodeName = newChildNodeStatus.getPath().getName();
       String childNodeName = newChildNodeName.substring(
-          0, newChildNodeName.length() - ".new".length());
+              0, newChildNodeName.length() - ".new".length());
       Path childNodePath =
           new Path(newChildNodeStatus.getPath().getParent(), childNodeName);
       replaceFile(newChildNodeStatus.getPath(), childNodePath);
@@ -382,7 +395,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      writeFileWithRetries(nodeCreatePath, appStateData);
+      writeFileWithRetries(nodeCreatePath, appStateData, true);
     } catch (Exception e) {
       LOG.info("Error storing info for app: " + appId, e);
       throw e;
@@ -400,7 +413,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      updateFile(nodeCreatePath, appStateData);
+      updateFile(nodeCreatePath, appStateData, true);
     } catch (Exception e) {
       LOG.info("Error updating info for app: " + appId, e);
       throw e;
@@ -421,7 +434,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      writeFileWithRetries(nodeCreatePath, attemptStateData);
+      writeFileWithRetries(nodeCreatePath, attemptStateData, true);
     } catch (Exception e) {
       LOG.info("Error storing info for attempt: " + appAttemptId, e);
       throw e;
@@ -442,7 +455,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try {
       // currently throw all exceptions. May need to respond differently for HA
       // based on whether we have lost the right to write to FS
-      updateFile(nodeCreatePath, attemptStateData);
+      updateFile(nodeCreatePath, attemptStateData, true);
     } catch (Exception e) {
       LOG.info("Error updating info for attempt: " + appAttemptId, e);
       throw e;
@@ -471,7 +484,7 @@ public class FileSystemRMStateStore extends RMStateStore {
   public synchronized void removeRMDelegationTokenState(
       RMDelegationTokenIdentifier identifier) throws Exception {
     Path nodeCreatePath = getNodePath(rmDTSecretManagerRoot,
-      DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
+            DELEGATION_TOKEN_PREFIX + identifier.getSequenceNumber());
     LOG.info("Removing RMDelegationToken_" + identifier.getSequenceNumber());
     deleteFileWithRetries(nodeCreatePath);
   }
@@ -493,10 +506,10 @@ public class FileSystemRMStateStore extends RMStateStore {
         new RMDelegationTokenIdentifierData(identifier, renewDate);
     if (isUpdate) {
       LOG.info("Updating RMDelegationToken_" + identifier.getSequenceNumber());
-      updateFile(nodeCreatePath, identifierData.toByteArray());
+      updateFile(nodeCreatePath, identifierData.toByteArray(), true);
     } else {
       LOG.info("Storing RMDelegationToken_" + identifier.getSequenceNumber());
-      writeFileWithRetries(nodeCreatePath, identifierData.toByteArray());
+      writeFileWithRetries(nodeCreatePath, identifierData.toByteArray(), true);
 
       // store sequence number
       Path latestSequenceNumberPath = getNodePath(rmDTSecretManagerRoot,
@@ -527,7 +540,7 @@ public class FileSystemRMStateStore extends RMStateStore {
     try (DataOutputStream fsOut = new DataOutputStream(os)) {
       LOG.info("Storing RMDelegationKey_" + masterKey.getKeyId());
       masterKey.write(fsOut);
-      writeFileWithRetries(nodeCreatePath, os.toByteArray());
+      writeFileWithRetries(nodeCreatePath, os.toByteArray(), true);
     }
   }
 
@@ -551,6 +564,16 @@ public class FileSystemRMStateStore extends RMStateStore {
     return getNodePath(root, appId.toString());
   }
 
+  @VisibleForTesting
+  protected Path getAppDir(ApplicationId appId) {
+    return getAppDir(rmAppRoot, appId);
+  }
+
+  @VisibleForTesting
+  protected Path getAppAttemptDir(ApplicationAttemptId appAttId) {
+    return getNodePath(getAppDir(appAttId.getApplicationId()), appAttId
+            .toString());
+  }
   // FileSystem related code
 
   private boolean checkAndRemovePartialRecordWithRetries(final Path record)
@@ -573,12 +596,13 @@ public class FileSystemRMStateStore extends RMStateStore {
     }.runWithRetries();
   }
 
-  private void writeFileWithRetries(final Path outputPath,final byte[] data)
-      throws Exception {
+  private void writeFileWithRetries(final Path outputPath, final byte[] data,
+                                    final boolean makeUnreadableByAdmin)
+          throws Exception {
     new FSAction<Void>() {
       @Override
       public Void run() throws Exception {
-        writeFile(outputPath, data);
+        writeFile(outputPath, data, makeUnreadableByAdmin);
         return null;
       }
     }.runWithRetries();
@@ -725,7 +749,8 @@ public class FileSystemRMStateStore extends RMStateStore {
    * data to .tmp file and then rename it. Here we are assuming that rename is
    * atomic for underlying file system.
    */
-  private void writeFile(Path outputPath, byte[] data) throws Exception {
+  protected void writeFile(Path outputPath, byte[] data, boolean
+          makeUnradableByAdmin) throws Exception {
     Path tempPath =
         new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
     FSDataOutputStream fsOut = null;
@@ -733,6 +758,9 @@ public class FileSystemRMStateStore extends RMStateStore {
     // final status.
     try {
       fsOut = fs.create(tempPath, true);
+      if (makeUnradableByAdmin) {
+        setUnreadableBySuperuserXattrib(tempPath);
+      }
       fsOut.write(data);
       fsOut.close();
       fsOut = null;
@@ -747,10 +775,11 @@ public class FileSystemRMStateStore extends RMStateStore {
    * data to .new file and then rename it. Here we are assuming that rename is
    * atomic for underlying file system.
    */
-  protected void updateFile(Path outputPath, byte[] data) throws Exception {
+  protected void updateFile(Path outputPath, byte[] data, boolean
+          makeUnradableByAdmin) throws Exception {
     Path newPath = new Path(outputPath.getParent(), outputPath.getName() + ".new");
     // use writeFileWithRetries to make sure .new file is created atomically
-    writeFileWithRetries(newPath, data);
+    writeFileWithRetries(newPath, data, makeUnradableByAdmin);
     replaceFile(newPath, outputPath);
   }
 
@@ -789,9 +818,9 @@ public class FileSystemRMStateStore extends RMStateStore {
         AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
     byte[] stateData = data.getProto().toByteArray();
     if (isUpdate) {
-      updateFile(nodeCreatePath, stateData);
+      updateFile(nodeCreatePath, stateData, true);
     } else {
-      writeFileWithRetries(nodeCreatePath, stateData);
+      writeFileWithRetries(nodeCreatePath, stateData, true);
     }
   }
 
@@ -804,4 +833,13 @@ public class FileSystemRMStateStore extends RMStateStore {
   public long getRetryInterval() {
     return fsRetryInterval;
   }
+
+  private void setUnreadableBySuperuserXattrib(Path p)
+          throws IOException {
+    if (isHDFS &&
+            !fs.getXAttrs(p).containsKey(UNREADABLE_BY_SUPERUSER_XATTRIB)) {
+      fs.setXAttr(p, UNREADABLE_BY_SUPERUSER_XATTRIB, null,
+              EnumSet.of(XAttrSetFlag.CREATE));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c51654d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
index 5b53a02..bb53f8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java
@@ -112,6 +112,12 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
 
   }
 
+  public static class StoreStateVerifier {
+    void afterStoreApp(RMStateStore store, ApplicationId appId) {}
+    void afterStoreAppAttempt(RMStateStore store, ApplicationAttemptId
+            appAttId) {}
+  }
+
   interface RMStateStoreHelper {
     RMStateStore getRMStateStore() throws Exception;
     boolean isFinalStateValid() throws Exception;
@@ -173,7 +179,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     when(mockAttempt.getRMAppAttemptMetrics())
         .thenReturn(mockRmAppAttemptMetrics);
     when(mockRmAppAttemptMetrics.getAggregateAppResourceUsage())
-        .thenReturn(new AggregateAppResourceUsage(0,0));
+        .thenReturn(new AggregateAppResourceUsage(0, 0));
     dispatcher.attemptId = attemptId;
     store.storeNewApplicationAttempt(mockAttempt);
     waitNotify(dispatcher);
@@ -181,6 +187,12 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
   }
 
   void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
+          throws Exception {
+    testRMAppStateStore(stateStoreHelper, new StoreStateVerifier());
+  }
+
+  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper,
+                           StoreStateVerifier verifier)
       throws Exception {
     long submitTime = System.currentTimeMillis();
     long startTime = System.currentTimeMillis() + 1234;
@@ -205,6 +217,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
         .toApplicationAttemptId("appattempt_1352994193343_0001_000001");
     ApplicationId appId1 = attemptId1.getApplicationId();
     storeApp(store, appId1, submitTime, startTime);
+    verifier.afterStoreApp(store, appId1);
 
     // create application token and client token key for attempt1
     Token<AMRMTokenIdentifier> appAttemptToken1 =
@@ -236,6 +249,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
     storeApp(store, appIdRemoved, submitTime, startTime);
     storeAttempt(store, attemptIdRemoved,
         "container_1352994193343_0002_01_000001", null, null, dispatcher);
+    verifier.afterStoreAppAttempt(store, attemptIdRemoved);
 
     RMApp mockRemovedApp = mock(RMApp.class);
     RMAppAttemptMetrics mockRmAppAttemptMetrics = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c51654d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
index d2eddd6..aeac822 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestFSRMStateStore.java
@@ -21,8 +21,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
+import java.util.Queue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStore.TestZKRMStateStoreTester;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -56,6 +62,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     Path workingDirPathURI;
     TestFileSystemRMStore store;
     MiniDFSCluster cluster;
+    boolean adminCheckEnable;
 
     class TestFileSystemRMStore extends FileSystemRMStateStore {
 
@@ -83,8 +90,9 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       }
     }
 
-    public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
-      Path workingDirPath = new Path("/Test");
+    public TestFSRMStateStoreTester(MiniDFSCluster cluster, boolean adminCheckEnable) throws
Exception {
+      Path workingDirPath = new Path("/yarn/Test");
+      this.adminCheckEnable = adminCheckEnable;
       this.cluster = cluster;
       FileSystem fs = cluster.getFileSystem();
       fs.mkdirs(workingDirPath);
@@ -99,13 +107,18 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
       conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
           workingDirPathURI.toString());
       conf.set(YarnConfiguration.FS_RM_STATE_STORE_RETRY_POLICY_SPEC,
-        "100,6000");
+              "100,6000");
       conf.setInt(YarnConfiguration.FS_RM_STATE_STORE_NUM_RETRIES, 8);
       conf.setLong(YarnConfiguration.FS_RM_STATE_STORE_RETRY_INTERVAL_MS,
-          900L);
+              900L);
       this.store = new TestFileSystemRMStore(conf);
       Assert.assertEquals(store.getNumRetries(), 8);
       Assert.assertEquals(store.getRetryInterval(), 900L);
+      if (adminCheckEnable) {
+        store.setIsHDFS(true);
+      } else {
+        store.setIsHDFS(false);
+      }
       return store;
     }
 
@@ -118,8 +131,9 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
 
     @Override
     public void writeVersion(Version version) throws Exception {
-      store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
-        .getProto().toByteArray());
+      store.updateFile(store.getVersionNode(), ((VersionPBImpl)
+              version)
+              .getProto().toByteArray(), false);
     }
 
     @Override
@@ -130,7 +144,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     public boolean appExists(RMApp app) throws IOException {
       FileSystem fs = cluster.getFileSystem();
       Path nodePath =
-          store.getAppDir(app.getApplicationId().toString());
+              store.getAppDir(app.getApplicationId().toString());
       return fs.exists(nodePath);
     }
   }
@@ -139,28 +153,28 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
   public void testFSRMStateStore() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster =
-        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+            new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     try {
-      fsTester = new TestFSRMStateStoreTester(cluster);
+      fsTester = new TestFSRMStateStoreTester(cluster, false);
       // If the state store is FileSystemRMStateStore then add corrupted entry.
       // It should discard the entry and remove it from file system.
       FSDataOutputStream fsOut = null;
       FileSystemRMStateStore fileSystemRMStateStore =
-          (FileSystemRMStateStore) fsTester.getRMStateStore();
+              (FileSystemRMStateStore) fsTester.getRMStateStore();
       String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
       ApplicationAttemptId attemptId3 =
-          ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
+              ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
       Path appDir =
-          fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
+              fsTester.store.getAppDir(attemptId3.getApplicationId().toString());
       Path tempAppAttemptFile =
-          new Path(appDir, attemptId3.toString() + ".tmp");
+              new Path(appDir, attemptId3.toString() + ".tmp");
       fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
       fsOut.write("Some random data ".getBytes());
       fsOut.close();
 
       testRMAppStateStore(fsTester);
       Assert.assertFalse(fsTester.workingDirPathURI
-          .getFileSystem(conf).exists(tempAppAttemptFile));
+              .getFileSystem(conf).exists(tempAppAttemptFile));
       testRMDTSecretManagerStateStore(fsTester);
       testCheckVersion(fsTester);
       testEpoch(fsTester);
@@ -173,12 +187,109 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
   }
 
   @Test(timeout = 60000)
+  public void testHDFSRMStateStore() throws Exception {
+    final HdfsConfiguration conf = new HdfsConfiguration();
+    UserGroupInformation yarnAdmin =
+            UserGroupInformation.createUserForTesting("yarn",
+                    new String[]{"admin"});
+    final MiniDFSCluster cluster =
+            new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    cluster.getFileSystem().mkdir(new Path("/yarn"),
+            FsPermission.valueOf("-rwxrwxrwx"));
+    cluster.getFileSystem().setOwner(new Path("/yarn"), "yarn", "admin");
+    final UserGroupInformation hdfsAdmin = UserGroupInformation.getCurrentUser();
+    final StoreStateVerifier verifier = new StoreStateVerifier() {
+      @Override
+      void afterStoreApp(final RMStateStore store, final ApplicationId appId) {
+        try {
+          // Wait for things to settle
+          Thread.sleep(5000);
+          hdfsAdmin.doAs(
+                  new PrivilegedExceptionAction<Void>() {
+                    @Override
+                    public Void run() throws Exception {
+                      verifyFilesUnreadablebyHDFS(cluster,
+                              ((FileSystemRMStateStore) store).getAppDir
+                                      (appId));
+                      return null;
+                    }
+                  });
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      void afterStoreAppAttempt(final RMStateStore store,
+                                final ApplicationAttemptId appAttId) {
+        try {
+          // Wait for things to settle
+          Thread.sleep(5000);
+          hdfsAdmin.doAs(
+                  new PrivilegedExceptionAction<Void>() {
+                    @Override
+                    public Void run() throws Exception {
+                      verifyFilesUnreadablebyHDFS(cluster,
+                              ((FileSystemRMStateStore) store)
+                                      .getAppAttemptDir(appAttId));
+                      return null;
+                    }
+                  });
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+    try {
+      yarnAdmin.doAs(new PrivilegedExceptionAction<Void>() {
+        @Override
+        public Void run() throws Exception {
+          fsTester = new TestFSRMStateStoreTester(cluster, true);
+          testRMAppStateStore(fsTester, verifier);
+          return null;
+        }
+      });
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private void verifyFilesUnreadablebyHDFS(MiniDFSCluster cluster,
+                                                     Path root) throws Exception{
+    DistributedFileSystem fs = cluster.getFileSystem();
+    Queue<Path> paths = new LinkedList<>();
+    paths.add(root);
+    while (!paths.isEmpty()) {
+      Path p = paths.poll();
+      FileStatus stat = fs.getFileStatus(p);
+      if (!stat.isDirectory()) {
+        try {
+          LOG.warn("\n\n ##Testing path [" + p + "]\n\n");
+          fs.open(p);
+          Assert.fail("Super user should not be able to read ["+ UserGroupInformation.getCurrentUser()
+ "] [" + p.getName() + "]");
+        } catch (AccessControlException e) {
+          Assert.assertTrue(e.getMessage().contains("superuser is not allowed to perform
this operation"));
+        } catch (Exception e) {
+          Assert.fail("Should get an AccessControlException here");
+        }
+      }
+      if (stat.isDirectory()) {
+        FileStatus[] ls = fs.listStatus(p);
+        for (FileStatus f : ls) {
+          paths.add(f.getPath());
+        }
+      }
+    }
+
+  }
+
+  @Test(timeout = 60000)
   public void testCheckMajorVersionChange() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
     MiniDFSCluster cluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     try {
-      fsTester = new TestFSRMStateStoreTester(cluster) {
+      fsTester = new TestFSRMStateStoreTester(cluster, false) {
         Version VERSION_INFO = Version.newInstance(Integer.MAX_VALUE, 0);
 
         @Override
@@ -232,14 +343,14 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
     ApplicationAttemptId attemptId1 =
         ConverterUtils.toApplicationAttemptId(appAttemptIdStr1);
     Path appDir =
-        fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
+            fsTester.store.getAppDir(attemptId1.getApplicationId().toString());
     Path appAttemptFile1 =
         new Path(appDir, attemptId1.toString() + ".new");
     FileSystemRMStateStore fileSystemRMStateStore =
         (FileSystemRMStateStore) fsTester.getRMStateStore();
     fileSystemRMStateStore.renameFile(appAttemptFile1,
-        new Path(appAttemptFile1.getParent(),
-            appAttemptFile1.getName() + ".new"));
+            new Path(appAttemptFile1.getParent(),
+                    appAttemptFile1.getName() + ".new"));
   }
 
   @Override
@@ -262,7 +373,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
         new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
     cluster.waitActive();
     try {
-      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
+      TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster, false);
       final RMStateStore store = fsTester.getRMStateStore();
       store.setRMDispatcher(new TestDispatcher());
       final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3c51654d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
index 491b885..596cba7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerHA.md
@@ -53,7 +53,7 @@ When there are multiple RMs, the configuration (yarn-site.xml) used by clients
a
 
 ### Recovering prevous active-RM's state
 
-With the [ResourceManger Restart](./ResourceManagerRestart.html) enabled, the RM being promoted
to an active state loads the RM internal state and continues to operate from where the previous
active left off as much as possible depending on the RM restart feature. A new attempt is
spawned for each managed application previously submitted to the RM. Applications can checkpoint
periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby
RMs. Currently, there are two RMStateStore implementations for persistence - FileSystemRMStateStore
and ZKRMStateStore. The `ZKRMStateStore` implicitly allows write access to a single RM at
any point in time, and hence is the recommended store to use in an HA cluster. When using
the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential
split-brain situation where multiple RMs can potentially assume the Active role.
+With the [ResourceManger Restart](./ResourceManagerRestart.html) enabled, the RM being promoted
to an active state loads the RM internal state and continues to operate from where the previous
active left off as much as possible depending on the RM restart feature. A new attempt is
spawned for each managed application previously submitted to the RM. Applications can checkpoint
periodically to avoid losing any work. The state-store must be visible from the both of Active/Standby
RMs. Currently, there are two RMStateStore implementations for persistence - FileSystemRMStateStore
and ZKRMStateStore. The `ZKRMStateStore` implicitly allows write access to a single RM at
any point in time, and hence is the recommended store to use in an HA cluster. When using
the ZKRMStateStore, there is no need for a separate fencing mechanism to address a potential
split-brain situation where multiple RMs can potentially assume the Active role. When using
the ZKRMStateStore, it is advisable to NOT set the
  "`zookeeper.DigestAuthenticationProvider.superDigest`" property on the Zookeeper cluster
to ensure that the zookeeper admin does not have access to YARN application/user credential
information.
 
 Deployment
 ----------


Mime
View raw message