hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject [2/2] git commit: YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev
Date Tue, 21 Oct 2014 17:32:19 GMT
YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f2028bd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f2028bd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f2028bd

Branch: refs/heads/trunk
Commit: 6f2028bd1514d90b831f889fd0ee7f2ba5c15000
Parents: b6f9d55
Author: Jason Lowe <jlowe@apache.org>
Authored: Tue Oct 21 17:29:22 2014 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Tue Oct 21 17:31:13 2014 +0000

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../server/nodemanager/DirectoryCollection.java | 219 ++++++++---
 .../nodemanager/LocalDirsHandlerService.java    | 176 +++++++--
 .../nodemanager/NodeHealthCheckerService.java   |   4 +-
 .../launcher/ContainerLaunch.java               |   2 +-
 .../localizer/ResourceLocalizationService.java  | 270 ++++++++++---
 .../logaggregation/AppLogAggregatorImpl.java    |  35 +-
 .../logaggregation/LogAggregationService.java   |  13 +-
 .../loghandler/NonAggregatingLogHandler.java    |  47 ++-
 .../nodemanager/TestDirectoryCollection.java    |  75 +++-
 .../TestLocalDirsHandlerService.java            |  43 ++-
 .../nodemanager/TestNodeHealthService.java      |   2 +-
 .../TestResourceLocalizationService.java        | 312 ++++++++++++++-
 .../TestLogAggregationService.java              | 147 ++++---
 .../TestNonAggregatingLogHandler.java           | 382 +++++++++++++++----
 15 files changed, 1441 insertions(+), 289 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index b130ecf..af056b3 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -379,6 +379,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan
     Gong via zjshen)
 
+    YARN-90. NodeManager should identify failed disks becoming good again
+    (Varun Vasudev via jlowe)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
index f6ee128..279787b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java
@@ -21,18 +21,23 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 /**
  * Manages a list of local storage directories.
@@ -40,9 +45,38 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 class DirectoryCollection {
   private static final Log LOG = LogFactory.getLog(DirectoryCollection.class);
 
+  public enum DiskErrorCause {
+    DISK_FULL, OTHER
+  }
+
+  static class DiskErrorInformation {
+    DiskErrorCause cause;
+    String message;
+
+    DiskErrorInformation(DiskErrorCause cause, String message) {
+      this.cause = cause;
+      this.message = message;
+    }
+  }
+
+  /**
+   * Returns a merged list which contains all the elements of l1 and l2
+   * @param l1 the first list to be included
+   * @param l2 the second list to be included
+   * @return a new list containing all the elements of the first and second list
+   */
+  static List<String> concat(List<String> l1, List<String> l2) {
+    List<String> ret = new ArrayList<String>(l1.size() + l2.size());
+    ret.addAll(l1);
+    ret.addAll(l2);
+    return ret;
+  }
+
   // Good local storage directories
   private List<String> localDirs;
-  private List<String> failedDirs;
+  private List<String> errorDirs;
+  private List<String> fullDirs;
+
   private int numFailures;
   
   private float diskUtilizationPercentageCutoff;
@@ -109,7 +143,9 @@ class DirectoryCollection {
       float utilizationPercentageCutOff,
       long utilizationSpaceCutOff) {
     localDirs = new CopyOnWriteArrayList<String>(dirs);
-    failedDirs = new CopyOnWriteArrayList<String>();
+    errorDirs = new CopyOnWriteArrayList<String>();
+    fullDirs = new CopyOnWriteArrayList<String>();
+
     diskUtilizationPercentageCutoff = utilizationPercentageCutOff;
     diskUtilizationSpaceCutoff = utilizationSpaceCutOff;
     diskUtilizationPercentageCutoff =
@@ -131,7 +167,16 @@ class DirectoryCollection {
    * @return the failed directories
    */
   synchronized List<String> getFailedDirs() {
-    return Collections.unmodifiableList(failedDirs);
+    return Collections.unmodifiableList(
+        DirectoryCollection.concat(errorDirs, fullDirs));
+  }
+
+  /**
+   * @return the directories that have used all disk space
+   */
+
+  synchronized List<String> getFullDirs() {
+    return fullDirs;
   }
 
   /**
@@ -158,7 +203,7 @@ class DirectoryCollection {
         LOG.warn("Unable to create directory " + dir + " error " +
             e.getMessage() + ", removing from the list of valid directories.");
         localDirs.remove(dir);
-        failedDirs.add(dir);
+        errorDirs.add(dir);
         numFailures++;
         failed = true;
       }
@@ -167,61 +212,147 @@ class DirectoryCollection {
   }
 
   /**
-   * Check the health of current set of local directories, updating the list
-   * of valid directories if necessary.
-   * @return <em>true</em> if there is a new disk-failure identified in
-   *         this checking. <em>false</em> otherwise.
+   * Check the health of current set of local directories(good and failed),
+   * updating the list of valid directories if necessary.
+   *
+   * @return <em>true</em> if there is a new disk-failure identified in this
+   *         checking or a failed directory passes the disk check <em>false</em>
+   *         otherwise.
    */
   synchronized boolean checkDirs() {
-    int oldNumFailures = numFailures;
-    HashSet<String> checkFailedDirs = new HashSet<String>();
-    for (final String dir : localDirs) {
+    boolean setChanged = false;
+    Set<String> preCheckGoodDirs = new HashSet<String>(localDirs);
+    Set<String> preCheckFullDirs = new HashSet<String>(fullDirs);
+    Set<String> preCheckOtherErrorDirs = new HashSet<String>(errorDirs);
+    List<String> failedDirs = DirectoryCollection.concat(errorDirs, fullDirs);
+    List<String> allLocalDirs =
+        DirectoryCollection.concat(localDirs, failedDirs);
+
+    Map<String, DiskErrorInformation> dirsFailedCheck = testDirs(allLocalDirs);
+
+    localDirs.clear();
+    errorDirs.clear();
+    fullDirs.clear();
+
+    for (Map.Entry<String, DiskErrorInformation> entry : dirsFailedCheck
+      .entrySet()) {
+      String dir = entry.getKey();
+      DiskErrorInformation errorInformation = entry.getValue();
+      switch (entry.getValue().cause) {
+      case DISK_FULL:
+        fullDirs.add(entry.getKey());
+        break;
+      case OTHER:
+        errorDirs.add(entry.getKey());
+        break;
+      }
+      if (preCheckGoodDirs.contains(dir)) {
+        LOG.warn("Directory " + dir + " error, " + errorInformation.message
+            + ", removing from list of valid directories");
+        setChanged = true;
+        numFailures++;
+      }
+    }
+    for (String dir : allLocalDirs) {
+      if (!dirsFailedCheck.containsKey(dir)) {
+        localDirs.add(dir);
+        if (preCheckFullDirs.contains(dir)
+            || preCheckOtherErrorDirs.contains(dir)) {
+          setChanged = true;
+          LOG.info("Directory " + dir
+              + " passed disk check, adding to list of valid directories.");
+        }
+      }
+    }
+    Set<String> postCheckFullDirs = new HashSet<String>(fullDirs);
+    Set<String> postCheckOtherDirs = new HashSet<String>(errorDirs);
+    for (String dir : preCheckFullDirs) {
+      if (postCheckOtherDirs.contains(dir)) {
+        LOG.warn("Directory " + dir + " error "
+            + dirsFailedCheck.get(dir).message);
+      }
+    }
+
+    for (String dir : preCheckOtherErrorDirs) {
+      if (postCheckFullDirs.contains(dir)) {
+        LOG.warn("Directory " + dir + " error "
+            + dirsFailedCheck.get(dir).message);
+      }
+    }
+    return setChanged;
+  }
+
+  Map<String, DiskErrorInformation> testDirs(List<String> dirs) {
+    HashMap<String, DiskErrorInformation> ret =
+        new HashMap<String, DiskErrorInformation>();
+    for (final String dir : dirs) {
+      String msg;
       try {
         File testDir = new File(dir);
         DiskChecker.checkDir(testDir);
-        if (isDiskUsageUnderPercentageLimit(testDir)) {
-          LOG.warn("Directory " + dir
-              + " error, used space above threshold of "
-              + diskUtilizationPercentageCutoff
-              + "%, removing from the list of valid directories.");
-          checkFailedDirs.add(dir);
-        } else if (isDiskFreeSpaceWithinLimit(testDir)) {
-          LOG.warn("Directory " + dir + " error, free space below limit of "
-              + diskUtilizationSpaceCutoff
-              + "MB, removing from the list of valid directories.");
-          checkFailedDirs.add(dir);
+        if (isDiskUsageOverPercentageLimit(testDir)) {
+          msg =
+              "used space above threshold of "
+                  + diskUtilizationPercentageCutoff
+                  + "%";
+          ret.put(dir,
+            new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
+          continue;
+        } else if (isDiskFreeSpaceUnderLimit(testDir)) {
+          msg =
+              "free space below limit of " + diskUtilizationSpaceCutoff
+                  + "MB";
+          ret.put(dir,
+            new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg));
+          continue;
         }
-      } catch (DiskErrorException de) {
-        LOG.warn("Directory " + dir + " error " + de.getMessage()
-            + ", removing from the list of valid directories.");
-        checkFailedDirs.add(dir);
+
+        // create a random dir to make sure fs isn't in read-only mode
+        verifyDirUsingMkdir(testDir);
+      } catch (IOException ie) {
+        ret.put(dir,
+          new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage()));
       }
     }
-    for (String dir : checkFailedDirs) {
-      localDirs.remove(dir);
-      failedDirs.add(dir);
-      numFailures++;
+    return ret;
+  }
+
+  /**
+   * Function to test whether a dir is working correctly by actually creating a
+   * random directory.
+   *
+   * @param dir
+   *          the dir to test
+   */
+  private void verifyDirUsingMkdir(File dir) throws IOException {
+
+    String randomDirName = RandomStringUtils.randomAlphanumeric(5);
+    File target = new File(dir, randomDirName);
+    int i = 0;
+    while (target.exists()) {
+
+      randomDirName = RandomStringUtils.randomAlphanumeric(5) + i;
+      target = new File(dir, randomDirName);
+      i++;
+    }
+    try {
+      DiskChecker.checkDir(target);
+    } finally {
+      FileUtils.deleteQuietly(target);
     }
-    return numFailures > oldNumFailures;
   }
-  
-  private boolean isDiskUsageUnderPercentageLimit(File dir) {
+
+  private boolean isDiskUsageOverPercentageLimit(File dir) {
     float freePercentage =
         100 * (dir.getUsableSpace() / (float) dir.getTotalSpace());
     float usedPercentage = 100.0F - freePercentage;
-    if (usedPercentage > diskUtilizationPercentageCutoff
-        || usedPercentage >= 100.0F) {
-      return true;
-    }
-    return false;
+    return (usedPercentage > diskUtilizationPercentageCutoff
+        || usedPercentage >= 100.0F);
   }
 
-  private boolean isDiskFreeSpaceWithinLimit(File dir) {
+  private boolean isDiskFreeSpaceUnderLimit(File dir) {
     long freeSpace = dir.getUsableSpace() / (1024 * 1024);
-    if (freeSpace < this.diskUtilizationSpaceCutoff) {
-      return true;
-    }
-    return false;
+    return freeSpace < this.diskUtilizationSpaceCutoff;
   }
 
   private void createDir(FileContext localFs, Path dir, FsPermission perm)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
index b053941..7d1aa53 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
@@ -150,7 +152,7 @@ public class LocalDirsHandlerService extends AbstractService {
     boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm);
     createSucceeded &= logDirs.createNonExistentDirs(localFs, perm);
     if (!createSucceeded) {
-      updateDirsAfterFailure();
+      updateDirsAfterTest();
     }
 
     // Check the disk health immediately to weed out bad directories
@@ -197,9 +199,52 @@ public class LocalDirsHandlerService extends AbstractService {
   }
 
   /**
+   * @return the local directories which have no disk space
+   */
+  public List<String> getDiskFullLocalDirs() {
+    return localDirs.getFullDirs();
+  }
+
+  /**
+   * @return the log directories that have no disk space
+   */
+  public List<String> getDiskFullLogDirs() {
+    return logDirs.getFullDirs();
+  }
+
+  /**
+   * Function to get the local dirs which should be considered when cleaning up
+   * resources. Contains the good local dirs and the local dirs that have reached
+   * the disk space limit
+   *
+   * @return the local dirs which should be considered for cleaning up
+   */
+  public List<String> getLocalDirsForCleanup() {
+    return DirectoryCollection.concat(localDirs.getGoodDirs(),
+        localDirs.getFullDirs());
+  }
+
+  /**
+   * Function to get the log dirs which should be considered when cleaning up
+   * resources. Contains the good log dirs and the log dirs that have reached
+   * the disk space limit
+   *
+   * @return the log dirs which should be considered for cleaning up
+   */
+  public List<String> getLogDirsForCleanup() {
+    return DirectoryCollection.concat(logDirs.getGoodDirs(),
+        logDirs.getFullDirs());
+  }
+
+  /**
+   * Function to generate a report on the state of the disks.
+   *
+   * @param listGoodDirs
+   *          flag to determine whether the report should report the state of
+   *          good dirs or failed dirs
    * @return the health report of nm-local-dirs and nm-log-dirs
    */
-  public String getDisksHealthReport() {
+  public String getDisksHealthReport(boolean listGoodDirs) {
     if (!isDiskHealthCheckerEnabled) {
       return "";
     }
@@ -207,20 +252,31 @@ public class LocalDirsHandlerService extends AbstractService {
     StringBuilder report = new StringBuilder();
     List<String> failedLocalDirsList = localDirs.getFailedDirs();
     List<String> failedLogDirsList = logDirs.getFailedDirs();
-    int numLocalDirs = localDirs.getGoodDirs().size()
-        + failedLocalDirsList.size();
-    int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size();
-    if (!failedLocalDirsList.isEmpty()) {
-      report.append(failedLocalDirsList.size() + "/" + numLocalDirs
-          + " local-dirs turned bad: "
-          + StringUtils.join(",", failedLocalDirsList) + ";");
-    }
-    if (!failedLogDirsList.isEmpty()) {
-      report.append(failedLogDirsList.size() + "/" + numLogDirs
-          + " log-dirs turned bad: "
-          + StringUtils.join(",", failedLogDirsList));
+    List<String> goodLocalDirsList = localDirs.getGoodDirs();
+    List<String> goodLogDirsList = logDirs.getGoodDirs();
+    int numLocalDirs = goodLocalDirsList.size() + failedLocalDirsList.size();
+    int numLogDirs = goodLogDirsList.size() + failedLogDirsList.size();
+    if (!listGoodDirs) {
+      if (!failedLocalDirsList.isEmpty()) {
+        report.append(failedLocalDirsList.size() + "/" + numLocalDirs
+            + " local-dirs are bad: "
+            + StringUtils.join(",", failedLocalDirsList) + "; ");
+      }
+      if (!failedLogDirsList.isEmpty()) {
+        report.append(failedLogDirsList.size() + "/" + numLogDirs
+            + " log-dirs are bad: " + StringUtils.join(",", failedLogDirsList));
+      }
+    } else {
+      report.append(goodLocalDirsList.size() + "/" + numLocalDirs
+          + " local-dirs are good: " + StringUtils.join(",", goodLocalDirsList)
+          + "; ");
+      report.append(goodLogDirsList.size() + "/" + numLogDirs
+          + " log-dirs are good: " + StringUtils.join(",", goodLogDirsList));
+
     }
+
     return report.toString();
+
   }
 
   /**
@@ -262,8 +318,8 @@ public class LocalDirsHandlerService extends AbstractService {
    * Set good local dirs and good log dirs in the configuration so that the
    * LocalDirAllocator objects will use this updated configuration only.
    */
-  private void updateDirsAfterFailure() {
-    LOG.info("Disk(s) failed. " + getDisksHealthReport());
+  private void updateDirsAfterTest() {
+
     Configuration conf = getConfig();
     List<String> localDirs = getLocalDirs();
     conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS,
@@ -273,23 +329,91 @@ public class LocalDirsHandlerService extends AbstractService {
                       logDirs.toArray(new String[logDirs.size()]));
     if (!areDisksHealthy()) {
       // Just log.
-      LOG.error("Most of the disks failed. " + getDisksHealthReport());
+      LOG.error("Most of the disks failed. " + getDisksHealthReport(false));
     }
   }
 
+  private void logDiskStatus(boolean newDiskFailure, boolean diskTurnedGood) {
+    if (newDiskFailure) {
+      String report = getDisksHealthReport(false);
+      LOG.info("Disk(s) failed: " + report);
+    }
+    if (diskTurnedGood) {
+      String report = getDisksHealthReport(true);
+      LOG.info("Disk(s) turned good: " + report);
+    }
+
+  }
+
   private void checkDirs() {
-      boolean newFailure = false;
-      if (localDirs.checkDirs()) {
-        newFailure = true;
-      }
-      if (logDirs.checkDirs()) {
-        newFailure = true;
+    boolean disksStatusChange = false;
+    Set<String> failedLocalDirsPreCheck =
+        new HashSet<String>(localDirs.getFailedDirs());
+    Set<String> failedLogDirsPreCheck =
+        new HashSet<String>(logDirs.getFailedDirs());
+
+    if (localDirs.checkDirs()) {
+      disksStatusChange = true;
+    }
+    if (logDirs.checkDirs()) {
+      disksStatusChange = true;
+    }
+
+    Set<String> failedLocalDirsPostCheck =
+        new HashSet<String>(localDirs.getFailedDirs());
+    Set<String> failedLogDirsPostCheck =
+        new HashSet<String>(logDirs.getFailedDirs());
+
+    boolean disksFailed = false;
+    boolean disksTurnedGood = false;
+
+    disksFailed =
+        disksTurnedBad(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
+    disksTurnedGood =
+        disksTurnedGood(failedLocalDirsPreCheck, failedLocalDirsPostCheck);
+
+    // skip check if we have new failed or good local dirs since we're going to
+    // log anyway
+    if (!disksFailed) {
+      disksFailed =
+          disksTurnedBad(failedLogDirsPreCheck, failedLogDirsPostCheck);
+    }
+    if (!disksTurnedGood) {
+      disksTurnedGood =
+          disksTurnedGood(failedLogDirsPreCheck, failedLogDirsPostCheck);
+    }
+
+    logDiskStatus(disksFailed, disksTurnedGood);
+
+    if (disksStatusChange) {
+      updateDirsAfterTest();
+    }
+
+    lastDisksCheckTime = System.currentTimeMillis();
+  }
+
+  private boolean disksTurnedBad(Set<String> preCheckFailedDirs,
+      Set<String> postCheckDirs) {
+    boolean disksFailed = false;
+    for (String dir : postCheckDirs) {
+      if (!preCheckFailedDirs.contains(dir)) {
+        disksFailed = true;
+        break;
       }
+    }
+    return disksFailed;
+  }
 
-      if (newFailure) {
-        updateDirsAfterFailure();
+  private boolean disksTurnedGood(Set<String> preCheckDirs,
+      Set<String> postCheckDirs) {
+    boolean disksTurnedGood = false;
+    for (String dir : preCheckDirs) {
+      if (!postCheckDirs.contains(dir)) {
+        disksTurnedGood = true;
+        break;
       }
-      lastDisksCheckTime = System.currentTimeMillis();
+    }
+    return disksTurnedGood;
   }
 
   public Path getLocalPathForWrite(String pathStr) throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
index 446d05c..6d6001a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java
@@ -55,9 +55,9 @@ public class NodeHealthCheckerService extends CompositeService {
     String scriptReport = (nodeHealthScriptRunner == null) ? ""
         : nodeHealthScriptRunner.getHealthReport();
     if (scriptReport.equals("")) {
-      return dirsHandler.getDisksHealthReport();
+      return dirsHandler.getDisksHealthReport(false);
     } else {
-      return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport());
+      return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport(false));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
index 87a36c4..f87ed6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
@@ -240,7 +240,7 @@ public class ContainerLaunch implements Callable<Integer> {
       if (!dirsHandler.areDisksHealthy()) {
         ret = ContainerExitStatus.DISKS_FAILED;
         throw new IOException("Most of the disks failed. "
-            + dirsHandler.getDisksHealthReport());
+            + dirsHandler.getDisksHealthReport(false));
       }
 
       try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index d3b33e8..371684b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -55,11 +55,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.Credentials;
@@ -170,6 +172,8 @@ public class ResourceLocalizationService extends CompositeService
    */
   private final ConcurrentMap<String,LocalResourcesTracker> appRsrc =
     new ConcurrentHashMap<String,LocalResourcesTracker>();
+  
+  FileContext lfs;
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService,
@@ -219,32 +223,17 @@ public class ResourceLocalizationService extends CompositeService
     this.recordFactory = RecordFactoryProvider.getRecordFactory(conf);
 
     try {
-      FileContext lfs = getLocalFileContext(conf);
-      lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
-
-      if (!stateStore.canRecover() || stateStore.isNewlyCreated()) {
-        cleanUpLocalDir(lfs,delService);
-      }
-
-      List<String> localDirs = dirsHandler.getLocalDirs();
-      for (String localDir : localDirs) {
-        // $local/usercache
-        Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
-        lfs.mkdir(userDir, null, true);
-        // $local/filecache
-        Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
-        lfs.mkdir(fileDir, null, true);
-        // $local/nmPrivate
-        Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
-        lfs.mkdir(sysDir, NM_PRIVATE_PERM, true);
-      }
+      lfs = getLocalFileContext(conf);
+      lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK));
 
-      List<String> logDirs = dirsHandler.getLogDirs();
-      for (String logDir : logDirs) {
-        lfs.mkdir(new Path(logDir), null, true);
+      if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) {
+        cleanUpLocalDirs(lfs, delService);
+        initializeLocalDirs(lfs);
+        initializeLogDirs(lfs);
       }
-    } catch (IOException e) {
-      throw new YarnRuntimeException("Failed to initialize LocalizationService", e);
+    } catch (Exception e) {
+      throw new YarnRuntimeException(
+        "Failed to initialize LocalizationService", e);
     }
 
     cacheTargetSize =
@@ -497,28 +486,45 @@ public class ResourceLocalizationService extends CompositeService
     String containerIDStr = c.toString();
     String appIDStr = ConverterUtils.toString(
         c.getContainerId().getApplicationAttemptId().getApplicationId());
-    for (String localDir : dirsHandler.getLocalDirs()) {
+    
+    // Try deleting from good local dirs and full local dirs because a dir might
+    // have gone bad while the app was running(disk full). In addition
+    // a dir might have become good while the app was running.
+    // Check if the container dir exists and if it does, try to delete it
 
+    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
       // Delete the user-owned container-dir
       Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
       Path userdir = new Path(usersdir, userName);
       Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
       Path appDir = new Path(allAppsdir, appIDStr);
       Path containerDir = new Path(appDir, containerIDStr);
-      delService.delete(userName, containerDir, new Path[] {});
+      submitDirForDeletion(userName, containerDir);
 
       // Delete the nmPrivate container-dir
-      
+
       Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
       Path appSysDir = new Path(sysDir, appIDStr);
       Path containerSysDir = new Path(appSysDir, containerIDStr);
-      delService.delete(null, containerSysDir,  new Path[] {});
+      submitDirForDeletion(null, containerSysDir);
     }
 
     dispatcher.getEventHandler().handle(
         new ContainerEvent(c.getContainerId(),
             ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP));
   }
+  
+  private void submitDirForDeletion(String userName, Path dir) {
+    try {
+      lfs.getFileStatus(dir);
+      delService.delete(userName, dir, new Path[] {});
+    } catch (UnsupportedFileSystemException ue) {
+      LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue);
+    } catch (IOException ie) {
+      // ignore
+      return;
+    }
+  }
 
 
   @SuppressWarnings({"unchecked"})
@@ -545,19 +551,22 @@ public class ResourceLocalizationService extends CompositeService
     }
 
     // Delete the application directories
-    for (String localDir : dirsHandler.getLocalDirs()) {
+    userName = application.getUser();
+    appIDStr = application.toString();
+
+    for (String localDir : dirsHandler.getLocalDirsForCleanup()) {
 
       // Delete the user-owned app-dir
       Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE);
       Path userdir = new Path(usersdir, userName);
       Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
       Path appDir = new Path(allAppsdir, appIDStr);
-      delService.delete(userName, appDir, new Path[] {});
+      submitDirForDeletion(userName, appDir);
 
       // Delete the nmPrivate app-dir
       Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
       Path appSysDir = new Path(sysDir, appIDStr);
-      delService.delete(null, appSysDir, new Path[] {});
+      submitDirForDeletion(null, appSysDir);
     }
 
     // TODO: decrement reference counts of all resources associated with this
@@ -590,8 +599,8 @@ public class ResourceLocalizationService extends CompositeService
 
   private String getAppFileCachePath(String user, String appId) {
     return StringUtils.join(Path.SEPARATOR, Arrays.asList(".",
-      ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
-      ContainerLocalizer.FILECACHE));
+        ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId,
+        ContainerLocalizer.FILECACHE));
   }
   
   @VisibleForTesting
@@ -868,7 +877,7 @@ public class ResourceLocalizationService extends CompositeService
     /**
      * Find next resource to be given to a spawned localizer.
      * 
-     * @return
+     * @return the next resource to be localized
      */
     private LocalResource findNextResource() {
       synchronized (pending) {
@@ -1071,8 +1080,8 @@ public class ResourceLocalizationService extends CompositeService
         // 1) write credentials to private dir
         writeCredentials(nmPrivateCTokensPath);
         // 2) exec initApplication and wait
-        List<String> localDirs = dirsHandler.getLocalDirs();
-        List<String> logDirs = dirsHandler.getLogDirs();
+        List<String> localDirs = getInitializedLocalDirs();
+        List<String> logDirs = getInitializedLogDirs();
         if (dirsHandler.areDisksHealthy()) {
           exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress,
               context.getUser(),
@@ -1082,7 +1091,7 @@ public class ResourceLocalizationService extends CompositeService
               localizerId, localDirs, logDirs);
         } else {
           throw new IOException("All disks failed. "
-              + dirsHandler.getDisksHealthReport());
+              + dirsHandler.getDisksHealthReport(false));
         }
       // TODO handle ExitCodeException separately?
       } catch (Exception e) {
@@ -1151,24 +1160,95 @@ public class ResourceLocalizationService extends CompositeService
 
   }
 
-  private void cleanUpLocalDir(FileContext lfs, DeletionService del) {
-    long currentTimeStamp = System.currentTimeMillis();
-    for (String localDir : dirsHandler.getLocalDirs()) {
-      renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
-          currentTimeStamp);
-      renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
-          currentTimeStamp);
-      renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
-          currentTimeStamp);
+  private void initializeLocalDirs(FileContext lfs) {
+    List<String> localDirs = dirsHandler.getLocalDirs();
+    for (String localDir : localDirs) {
+      initializeLocalDir(lfs, localDir);
+    }
+  }
+
+  private void initializeLocalDir(FileContext lfs, String localDir) {
+
+    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
+    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
+      FileStatus status;
       try {
-        deleteLocalDir(lfs, del, localDir);
-      } catch (IOException e) {
-        // Do nothing, just give the warning
-        LOG.warn("Failed to delete localDir: " + localDir);
+        status = lfs.getFileStatus(entry.getKey());
+      }
+      catch(FileNotFoundException fs) {
+        status = null;
+      }
+      catch(IOException ie) {
+        String msg = "Could not get file status for local dir " + entry.getKey();
+        LOG.warn(msg, ie);
+        throw new YarnRuntimeException(msg, ie);
+      }
+      if(status == null) {
+        try {
+          lfs.mkdir(entry.getKey(), entry.getValue(), true);
+          status = lfs.getFileStatus(entry.getKey());
+        } catch (IOException e) {
+          String msg = "Could not initialize local dir " + entry.getKey();
+          LOG.warn(msg, e);
+          throw new YarnRuntimeException(msg, e);
+        }
+      }
+      FsPermission perms = status.getPermission();
+      if(!perms.equals(entry.getValue())) {
+        try {
+          lfs.setPermission(entry.getKey(), entry.getValue());
+        }
+        catch(IOException ie) {
+          String msg = "Could not set permissions for local dir " + entry.getKey();
+          LOG.warn(msg, ie);
+          throw new YarnRuntimeException(msg, ie);
+        }
       }
     }
   }
 
+  private void initializeLogDirs(FileContext lfs) {
+    List<String> logDirs = dirsHandler.getLogDirs();
+    for (String logDir : logDirs) {
+      initializeLogDir(lfs, logDir);
+    }
+  }
+
+  private void initializeLogDir(FileContext lfs, String logDir) {
+    try {
+      lfs.mkdir(new Path(logDir), null, true);
+    } catch (FileAlreadyExistsException fe) {
+      // do nothing
+    } catch (IOException e) {
+      String msg = "Could not initialize log dir " + logDir;
+      LOG.warn(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+
+  private void cleanUpLocalDirs(FileContext lfs, DeletionService del) {
+    for (String localDir : dirsHandler.getLocalDirs()) {
+      cleanUpLocalDir(lfs, del, localDir);
+    }
+  }
+
+  private void cleanUpLocalDir(FileContext lfs, DeletionService del,
+      String localDir) {
+    long currentTimeStamp = System.currentTimeMillis();
+    renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE,
+      currentTimeStamp);
+    renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE,
+      currentTimeStamp);
+    renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR,
+      currentTimeStamp);
+    try {
+      deleteLocalDir(lfs, del, localDir);
+    } catch (IOException e) {
+      // Do nothing, just give the warning
+      LOG.warn("Failed to delete localDir: " + localDir);
+    }
+  }
+
   private void renameLocalDir(FileContext lfs, String localDir,
       String localSubDir, long currentTimeStamp) {
     try {
@@ -1234,5 +1314,95 @@ public class ResourceLocalizationService extends CompositeService
       del.scheduleFileDeletionTask(dependentDeletionTask);
     }
   }
+  
+  /**
+   * Synchronized method to get a list of initialized local dirs. Method will
+   * check each local dir to ensure it has been setup correctly and will attempt
+   * to fix any issues it finds.
+   * 
+   * @return list of initialized local dirs
+   */
+  synchronized private List<String> getInitializedLocalDirs() {
+    List<String> dirs = dirsHandler.getLocalDirs();
+    List<String> checkFailedDirs = new ArrayList<String>();
+    for (String dir : dirs) {
+      try {
+        checkLocalDir(dir);
+      } catch (YarnRuntimeException e) {
+        checkFailedDirs.add(dir);
+      }
+    }
+    for (String dir : checkFailedDirs) {
+      LOG.info("Attempting to initialize " + dir);
+      initializeLocalDir(lfs, dir);
+      try {
+        checkLocalDir(dir);
+      } catch (YarnRuntimeException e) {
+        String msg =
+            "Failed to setup local dir " + dir + ", which was marked as good.";
+        LOG.warn(msg, e);
+        throw new YarnRuntimeException(msg, e);
+      }
+    }
+    return dirs;
+  }
+
+  private boolean checkLocalDir(String localDir) {
+
+    Map<Path, FsPermission> pathPermissionMap = getLocalDirsPathPermissionsMap(localDir);
+
+    for (Map.Entry<Path, FsPermission> entry : pathPermissionMap.entrySet()) {
+      FileStatus status;
+      try {
+        status = lfs.getFileStatus(entry.getKey());
+      } catch (Exception e) {
+        String msg =
+            "Could not carry out resource dir checks for " + localDir
+                + ", which was marked as good";
+        LOG.warn(msg, e);
+        throw new YarnRuntimeException(msg, e);
+      }
+
+      if (!status.getPermission().equals(entry.getValue())) {
+        String msg =
+            "Permissions incorrectly set for dir " + entry.getKey()
+                + ", should be " + entry.getValue() + ", actual value = "
+                + status.getPermission();
+        LOG.warn(msg);
+        throw new YarnRuntimeException(msg);
+      }
+    }
+    return true;
+  }
+
+  private Map<Path, FsPermission> getLocalDirsPathPermissionsMap(String localDir) {
+    Map<Path, FsPermission> localDirPathFsPermissionsMap = new HashMap<Path, FsPermission>();
+
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPrivatePermission =
+        NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
 
+    Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE);
+    Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE);
+    Path sysDir = new Path(localDir, NM_PRIVATE_DIR);
+
+    localDirPathFsPermissionsMap.put(userDir, defaultPermission);
+    localDirPathFsPermissionsMap.put(fileDir, defaultPermission);
+    localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission);
+    return localDirPathFsPermissionsMap;
+  }
+  
+  /**
+   * Synchronized method to get a list of initialized log dirs. Method will
+   * check each local dir to ensure it has been setup correctly and will attempt
+   * to fix any issues it finds.
+   * 
+   * @return list of initialized log dirs
+   */
+  synchronized private List<String> getInitializedLogDirs() {
+    List<String> dirs = dirsHandler.getLogDirs();
+    initializeLogDirs(lfs);
+    return dirs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 6e196bb..43cd7b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -37,9 +38,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -107,6 +110,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
   private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
   private final AtomicBoolean aborted = new AtomicBoolean();
   private final Map<ApplicationAccessType, String> appAcls;
+  private final FileContext lfs;
   private final LogAggregationContext logAggregationContext;
   private final Context context;
   private final int retentionSize;
@@ -122,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
       LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
       ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls,
-      LogAggregationContext logAggregationContext,
-      Context context) {
+      LogAggregationContext logAggregationContext, Context context,
+      FileContext lfs) {
     this.dispatcher = dispatcher;
     this.conf = conf;
     this.delService = deletionService;
@@ -136,6 +140,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     this.retentionPolicy = retentionPolicy;
     this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
     this.appAcls = appAcls;
+    this.lfs = lfs;
     this.logAggregationContext = logAggregationContext;
     this.context = context;
     this.nodeId = nodeId;
@@ -395,15 +400,25 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
     uploadLogsForContainers();
 
     // Remove the local app-log-dirs
-    List<String> rootLogDirs = dirsHandler.getLogDirs();
-    Path[] localAppLogDirs = new Path[rootLogDirs.size()];
-    int index = 0;
-    for (String rootLogDir : rootLogDirs) {
-      localAppLogDirs[index] = new Path(rootLogDir, this.applicationId);
-      index++;
+    List<Path> localAppLogDirs = new ArrayList<Path>();
+    for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+      Path logPath = new Path(rootLogDir, applicationId);
+      try {
+        // check if log dir exists
+        lfs.getFileStatus(logPath);
+        localAppLogDirs.add(logPath);
+      } catch (UnsupportedFileSystemException ue) {
+        LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue);
+        continue;
+      } catch (IOException fe) {
+        continue;
+      }
+    }
+
+    if (localAppLogDirs.size() > 0) {
+      this.delService.delete(this.userUgi.getShortUserName(), null,
+        localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
     }
-    this.delService.delete(this.userUgi.getShortUserName(), null,
-        localAppLogDirs);
     
     this.dispatcher.getEventHandler().handle(
         new ApplicationEvent(this.appId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 1d6a9e1..77176b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -326,6 +327,15 @@ public class LogAggregationService extends AbstractService implements
     }
     this.dispatcher.getEventHandler().handle(eventResponse);
   }
+  
+  FileContext getLocalFileContext(Configuration conf) {
+    try {
+      return FileContext.getLocalFSFileContext(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Failed to access local fs");
+    }
+  }
+
 
   protected void initAppAggregator(final ApplicationId appId, String user,
       Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
@@ -344,7 +354,8 @@ public class LogAggregationService extends AbstractService implements
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
             getConfig(), appId, userUgi, this.nodeId, dirsHandler,
             getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
-            appAcls, logAggregationContext, this.context);
+            appAcls, logAggregationContext, this.context,
+            getLocalFileContext(getConfig()));
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
       throw new YarnRuntimeException("Duplicate initApp for " + appId);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 40173e1..0422ef9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,11 +30,14 @@ import java.util.concurrent.RejectedExecutionException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -96,6 +101,15 @@ public class NonAggregatingLogHandler extends AbstractService implements
     }
     super.serviceStop();
   }
+  
+  FileContext getLocalFileContext(Configuration conf) {
+    try {
+      return FileContext.getLocalFSFileContext(conf);
+    } catch (IOException e) {
+      throw new YarnRuntimeException("Failed to access local fs");
+    }
+  }
+
 
   @SuppressWarnings("unchecked")
   @Override
@@ -160,21 +174,30 @@ public class NonAggregatingLogHandler extends AbstractService implements
     @Override
     @SuppressWarnings("unchecked")
     public void run() {
-      List<String> rootLogDirs =
-          NonAggregatingLogHandler.this.dirsHandler.getLogDirs();
-      Path[] localAppLogDirs = new Path[rootLogDirs.size()];
-      int index = 0;
-      for (String rootLogDir : rootLogDirs) {
-        localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString());
-        index++;
+      List<Path> localAppLogDirs = new ArrayList<Path>();
+      FileContext lfs = getLocalFileContext(getConfig());
+      for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
+        Path logDir = new Path(rootLogDir, applicationId.toString());
+        try {
+          lfs.getFileStatus(logDir);
+          localAppLogDirs.add(logDir);
+        } catch (UnsupportedFileSystemException ue) {
+          LOG.warn("Unsupported file system used for log dir " + logDir, ue);
+          continue;
+        } catch (IOException ie) {
+          continue;
+        }
       }
+
       // Inform the application before the actual delete itself, so that links
-      // to logs will no longer be there on NM web-UI. 
+      // to logs will no longer be there on NM web-UI.
       NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
-          new ApplicationEvent(this.applicationId,
-              ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
-      NonAggregatingLogHandler.this.delService.delete(user, null,
-          localAppLogDirs);
+        new ApplicationEvent(this.applicationId,
+          ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
+      if (localAppLogDirs.size() > 0) {
+        NonAggregatingLogHandler.this.delService.delete(user, null,
+          (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
index f19731f..e435375 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestDirectoryCollection {
@@ -42,8 +44,13 @@ public class TestDirectoryCollection {
       TestDirectoryCollection.class.getName()).getAbsoluteFile();
   private static final File testFile = new File(testDir, "testfile");
 
+  private Configuration conf;
+  private FileContext localFs;
+
   @Before
-  public void setup() throws IOException {
+  public void setupForTests() throws IOException {
+    conf = new Configuration();
+    localFs = FileContext.getLocalFSFileContext(conf);
     testDir.mkdirs();
     testFile.createNewFile();
   }
@@ -56,11 +63,12 @@ public class TestDirectoryCollection {
   @Test
   public void testConcurrentAccess() throws IOException {
     // Initialize DirectoryCollection with a file instead of a directory
-    Configuration conf = new Configuration();
+    
     String[] dirs = {testFile.getPath()};
-    DirectoryCollection dc = new DirectoryCollection(dirs,
-      conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
-        YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
+    DirectoryCollection dc =
+        new DirectoryCollection(dirs, conf.getFloat(
+          YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+          YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
 
     // Create an iterator before checkDirs is called to reliable test case
     List<String> list = dc.getGoodDirs();
@@ -78,9 +86,8 @@ public class TestDirectoryCollection {
 
   @Test
   public void testCreateDirectories() throws IOException {
-    Configuration conf = new Configuration();
+    
     conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
-    FileContext localFs = FileContext.getLocalFSFileContext(conf);
 
     String dirA = new File(testDir, "dirA").getPath();
     String dirB = new File(dirA, "dirB").getPath();
@@ -92,9 +99,10 @@ public class TestDirectoryCollection {
     localFs.setPermission(pathC, permDirC);
 
     String[] dirs = { dirA, dirB, dirC };
-    DirectoryCollection dc = new DirectoryCollection(dirs,
-      conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 
-        YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
+    DirectoryCollection dc =
+        new DirectoryCollection(dirs, conf.getFloat(
+          YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+          YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE));
     FsPermission defaultPerm = FsPermission.getDefault()
         .applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK));
     boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm);
@@ -120,25 +128,29 @@ public class TestDirectoryCollection {
     dc.checkDirs();
     Assert.assertEquals(0, dc.getGoodDirs().size());
     Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(1, dc.getFullDirs().size());
 
     dc = new DirectoryCollection(dirs, 100.0F);
     dc.checkDirs();
     Assert.assertEquals(1, dc.getGoodDirs().size());
     Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
 
     dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024));
     dc.checkDirs();
     Assert.assertEquals(0, dc.getGoodDirs().size());
     Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(1, dc.getFullDirs().size());
 
     dc = new DirectoryCollection(dirs, 100.0F, 0);
     dc.checkDirs();
     Assert.assertEquals(1, dc.getGoodDirs().size());
     Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
   }
 
   @Test
-  public void testDiskLimitsCutoffSetters() {
+  public void testDiskLimitsCutoffSetters() throws IOException {
 
     String[] dirs = { "dir" };
     DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100);
@@ -163,6 +175,47 @@ public class TestDirectoryCollection {
   }
 
   @Test
+  public void testFailedDisksBecomingGoodAgain() throws Exception {
+
+    String dirA = new File(testDir, "dirA").getPath();
+    String[] dirs = { dirA };
+    DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F);
+    dc.checkDirs();
+    Assert.assertEquals(0, dc.getGoodDirs().size());
+    Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(1, dc.getFullDirs().size());
+
+    dc.setDiskUtilizationPercentageCutoff(100.0F);
+    dc.checkDirs();
+    Assert.assertEquals(1, dc.getGoodDirs().size());
+    Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+
+    String dirB = new File(testDir, "dirB").getPath();
+    Path pathB = new Path(dirB);
+    FsPermission permDirB = new FsPermission((short) 0400);
+
+    localFs.mkdir(pathB, null, true);
+    localFs.setPermission(pathB, permDirB);
+
+    String[] dirs2 = { dirB };
+
+    dc = new DirectoryCollection(dirs2, 100.0F);
+    dc.checkDirs();
+    Assert.assertEquals(0, dc.getGoodDirs().size());
+    Assert.assertEquals(1, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
+    permDirB = new FsPermission((short) 0700);
+    localFs.setPermission(pathB, permDirB);
+    dc.checkDirs();
+    Assert.assertEquals(1, dc.getGoodDirs().size());
+    Assert.assertEquals(0, dc.getFailedDirs().size());
+    Assert.assertEquals(0, dc.getFullDirs().size());
+  }
+
+  @Test
   public void testConstructors() {
 
     String[] dirs = { "dir" };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
index 057ea91..e22b7f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java
@@ -21,8 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager;
 import java.io.File;
 import java.io.IOException;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -57,10 +62,11 @@ public class TestLocalDirsHandlerService {
     LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
     dirSvc.init(conf);
     Assert.assertEquals(1, dirSvc.getLocalDirs().size());
+    dirSvc.close();
   }
 
   @Test
-  public void testValidPathsDirHandlerService() {
+  public void testValidPathsDirHandlerService() throws Exception {
     Configuration conf = new YarnConfiguration();
     String localDir1 = new File("file:///" + testDir, "localDir1").getPath();
     String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath();
@@ -76,5 +82,40 @@ public class TestLocalDirsHandlerService {
     Assert.assertEquals("Service should not be inited",
                         STATE.STOPPED,
                         dirSvc.getServiceState());
+    dirSvc.close();
+  }
+  
+  @Test
+  public void testGetFullDirs() throws Exception {
+    Configuration conf = new YarnConfiguration();
+
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077");
+    FileContext localFs = FileContext.getLocalFSFileContext(conf);
+
+    String localDir1 = new File(testDir, "localDir1").getPath();
+    String localDir2 = new File(testDir, "localDir2").getPath();
+    String logDir1 = new File(testDir, "logDir1").getPath();
+    String logDir2 = new File(testDir, "logDir2").getPath();
+    Path localDir1Path = new Path(localDir1);
+    Path logDir1Path = new Path(logDir1);
+    FsPermission dirPermissions = new FsPermission((short) 0410);
+    localFs.mkdir(localDir1Path, dirPermissions, true);
+    localFs.mkdir(logDir1Path, dirPermissions, true);
+
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2);
+    conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2);
+    conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE,
+      0.0f);
+    LocalDirsHandlerService dirSvc = new LocalDirsHandlerService();
+    dirSvc.init(conf);
+    Assert.assertEquals(0, dirSvc.getLocalDirs().size());
+    Assert.assertEquals(0, dirSvc.getLogDirs().size());
+    Assert.assertEquals(1, dirSvc.getDiskFullLocalDirs().size());
+    Assert.assertEquals(1, dirSvc.getDiskFullLogDirs().size());
+    FileUtils.deleteDirectory(new File(localDir1));
+    FileUtils.deleteDirectory(new File(localDir2));
+    FileUtils.deleteDirectory(new File(logDir1));
+    FileUtils.deleteDirectory(new File(logDir1));
+    dirSvc.close();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
index 6a28605..3542196 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java
@@ -196,7 +196,7 @@ public class TestNodeHealthService {
         healthStatus.getHealthReport().equals(
             NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG
             + NodeHealthCheckerService.SEPARATOR
-            + nodeHealthChecker.getDiskHandler().getDisksHealthReport()));
+            + nodeHealthChecker.getDiskHandler().getDisksHealthReport(false)));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index fa5a4fc..d569fa7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -42,6 +42,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -68,13 +69,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.security.AccessControlException;
 import org.junit.Assert;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
@@ -167,15 +171,15 @@ public class TestResourceLocalizationService {
     conf = new Configuration();
     spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem());
     lfs = FileContext.getFileContext(spylfs, conf);
-    doNothing().when(spylfs).mkdir(
-        isA(Path.class), isA(FsPermission.class), anyBoolean());
+
     String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
   }
 
   @After
-  public void cleanup() {
+  public void cleanup() throws IOException {
     conf = null;
+    FileUtils.deleteDirectory(new File(basedir.toString()));
   }
   
   @Test
@@ -752,6 +756,39 @@ public class TestResourceLocalizationService {
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final Path userDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.USERCACHE);
+    final Path fileDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ContainerLocalizer.FILECACHE);
+    final Path sysDir =
+        new Path(sDirs[0].substring("file:".length()),
+          ResourceLocalizationService.NM_PRIVATE_DIR);
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "", new Path(sDirs[0]));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          nmPermission, "", "", sysDir);
+
+    doAnswer(new Answer<FileStatus>() {
+      @Override
+      public FileStatus answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+        if (args.length > 0) {
+          if (args[0].equals(userDir) || args[0].equals(fileDir)) {
+            return fs;
+          }
+        }
+        return nmFs;
+      }
+    }).when(spylfs).getFileStatus(isA(Path.class));
+
     try {
       spyService.init(conf);
       spyService.start();
@@ -1775,5 +1812,274 @@ public class TestResourceLocalizationService {
     return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(),
         new Text("kind" + id), new Text("service" + id));
   }
+  
+  /*
+   * Test to ensure ResourceLocalizationService can handle local dirs going bad.
+   * Test first sets up all the components required, then sends events to fetch
+   * a private, app and public resource. It then sends events to clean up the
+   * container and the app and ensures the right delete calls were made.
+   */
+  @Test
+  @SuppressWarnings("unchecked")
+  // mocked generics
+  public void testFailedDirsResourceRelease() throws Exception {
+    // setup components
+    File f = new File(basedir.toString());
+    String[] sDirs = new String[4];
+    List<Path> localDirs = new ArrayList<Path>(sDirs.length);
+    for (int i = 0; i < 4; ++i) {
+      sDirs[i] = f.getAbsolutePath() + i;
+      localDirs.add(new Path(sDirs[i]));
+    }
+    List<Path> containerLocalDirs = new ArrayList<Path>(localDirs.size());
+    List<Path> appLocalDirs = new ArrayList<Path>(localDirs.size());
+    List<Path> nmLocalContainerDirs = new ArrayList<Path>(localDirs.size());
+    List<Path> nmLocalAppDirs = new ArrayList<Path>(localDirs.size());
+    conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs);
+    conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500);
+
+    LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class);
+    DrainDispatcher dispatcher = new DrainDispatcher();
+    dispatcher.init(conf);
+    dispatcher.start();
+    EventHandler<ApplicationEvent> applicationBus = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, applicationBus);
+    EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
+    dispatcher.register(ContainerEventType.class, containerBus);
+    // Ignore actual localization
+    EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
+    dispatcher.register(LocalizerEventType.class, localizerBus);
+
+    ContainerExecutor exec = mock(ContainerExecutor.class);
+    LocalDirsHandlerService mockDirsHandler =
+        mock(LocalDirsHandlerService.class);
+    doReturn(new ArrayList<String>(Arrays.asList(sDirs))).when(
+        mockDirsHandler).getLocalDirsForCleanup();
+
+    DeletionService delService = mock(DeletionService.class);
+
+    // setup mocks
+    ResourceLocalizationService rawService =
+        new ResourceLocalizationService(dispatcher, exec, delService,
+          mockDirsHandler, new NMNullStateStoreService());
+    ResourceLocalizationService spyService = spy(rawService);
+    doReturn(mockServer).when(spyService).createServer();
+    doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
+      isA(Configuration.class));
+    doReturn(lfs).when(spyService)
+      .getLocalFileContext(isA(Configuration.class));
+    FsPermission defaultPermission =
+        FsPermission.getDirDefault().applyUMask(lfs.getUMask());
+    FsPermission nmPermission =
+        ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask());
+    final FileStatus fs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          defaultPermission, "", "", localDirs.get(0));
+    final FileStatus nmFs =
+        new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+          nmPermission, "", "", localDirs.get(0));
+
+    final String user = "user0";
+    // init application
+    final Application app = mock(Application.class);
+    final ApplicationId appId =
+        BuilderUtils.newApplicationId(314159265358979L, 3);
+    when(app.getUser()).thenReturn(user);
+    when(app.getAppId()).thenReturn(appId);
+    when(app.toString()).thenReturn(ConverterUtils.toString(appId));
+
+    // init container.
+    final Container c = getMockContainer(appId, 42, user);
+
+    // setup local app dirs
+    List<String> tmpDirs = mockDirsHandler.getLocalDirs();
+    for (int i = 0; i < tmpDirs.size(); ++i) {
+      Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE);
+      Path userdir = new Path(usersdir, user);
+      Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE);
+      Path appDir = new Path(allAppsdir, ConverterUtils.toString(appId));
+      Path containerDir =
+          new Path(appDir, ConverterUtils.toString(c.getContainerId()));
+      containerLocalDirs.add(containerDir);
+      appLocalDirs.add(appDir);
+
+      Path sysDir =
+          new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR);
+      Path appSysDir = new Path(sysDir, ConverterUtils.toString(appId));
+      Path containerSysDir =
+          new Path(appSysDir, ConverterUtils.toString(c.getContainerId()));
+
+      nmLocalContainerDirs.add(containerSysDir);
+      nmLocalAppDirs.add(appSysDir);
+    }
+
+    try {
+      spyService.init(conf);
+      spyService.start();
+
+      spyService.handle(new ApplicationLocalizationEvent(
+        LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
+      dispatcher.await();
+
+      // Get a handle on the trackers after they're setup with
+      // INIT_APP_RESOURCES
+      LocalResourcesTracker appTracker =
+          spyService.getLocalResourcesTracker(
+            LocalResourceVisibility.APPLICATION, user, appId);
+      LocalResourcesTracker privTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE,
+            user, appId);
+      LocalResourcesTracker pubTracker =
+          spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC,
+            user, appId);
+
+      // init resources
+      Random r = new Random();
+      long seed = r.nextLong();
+      r.setSeed(seed);
+
+      // Send localization requests, one for each type of resource
+      final LocalResource privResource = getPrivateMockedResource(r);
+      final LocalResourceRequest privReq =
+          new LocalResourceRequest(privResource);
+
+      final LocalResource appResource = getAppMockedResource(r);
+      final LocalResourceRequest appReq = new LocalResourceRequest(appResource);
+
+      final LocalResource pubResource = getPublicMockedResource(r);
+      final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource);
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req =
+          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+      req.put(LocalResourceVisibility.PRIVATE,
+        Collections.singletonList(privReq));
+      req.put(LocalResourceVisibility.APPLICATION,
+        Collections.singletonList(appReq));
+      req
+        .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq));
+
+      Map<LocalResourceVisibility, Collection<LocalResourceRequest>> req2 =
+          new HashMap<LocalResourceVisibility, Collection<LocalResourceRequest>>();
+      req2.put(LocalResourceVisibility.PRIVATE,
+        Collections.singletonList(privReq));
+
+      // Send Request event
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req));
+      spyService.handle(new ContainerLocalizationRequestEvent(c, req2));
+      dispatcher.await();
+
+      int privRsrcCount = 0;
+      for (LocalizedResource lr : privTracker) {
+        privRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount());
+        Assert.assertEquals(privReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, privRsrcCount);
+
+      int appRsrcCount = 0;
+      for (LocalizedResource lr : appTracker) {
+        appRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        Assert.assertEquals(appReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, appRsrcCount);
+
+      int pubRsrcCount = 0;
+      for (LocalizedResource lr : pubTracker) {
+        pubRsrcCount++;
+        Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount());
+        Assert.assertEquals(pubReq, lr.getRequest());
+      }
+      Assert.assertEquals(1, pubRsrcCount);
+
+      // setup mocks for test, a set of dirs with IOExceptions and let the rest
+      // go through
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 2) {
+          Mockito.doThrow(new IOException()).when(spylfs)
+            .getFileStatus(eq(containerLocalDirs.get(i)));
+          Mockito.doThrow(new IOException()).when(spylfs)
+            .getFileStatus(eq(nmLocalContainerDirs.get(i)));
+        } else {
+          doReturn(fs).when(spylfs)
+            .getFileStatus(eq(containerLocalDirs.get(i)));
+          doReturn(nmFs).when(spylfs).getFileStatus(
+            eq(nmLocalContainerDirs.get(i)));
+        }
+      }
+
+      // Send Cleanup Event
+      spyService.handle(new ContainerLocalizationCleanupEvent(c, req));
+      verify(mockLocallilzerTracker).cleanupPrivLocalizers(
+        "container_314159265358979_0003_01_000042");
+
+      // match cleanup events with the mocks we setup earlier
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 2) {
+          try {
+            verify(delService).delete(user, containerLocalDirs.get(i));
+            verify(delService).delete(null, nmLocalContainerDirs.get(i));
+            Assert.fail("deletion attempts for invalid dirs");
+          } catch (Throwable e) {
+            continue;
+          }
+        } else {
+          verify(delService).delete(user, containerLocalDirs.get(i));
+          verify(delService).delete(null, nmLocalContainerDirs.get(i));
+        }
+      }
+
+      ArgumentMatcher<ApplicationEvent> matchesAppDestroy =
+          new ArgumentMatcher<ApplicationEvent>() {
+            @Override
+            public boolean matches(Object o) {
+              ApplicationEvent evt = (ApplicationEvent) o;
+              return (evt.getType() == ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP)
+                  && appId == evt.getApplicationID();
+            }
+          };
+
+      dispatcher.await();
+
+      // setup mocks again, this time throw UnsupportedFileSystemException and
+      // IOExceptions
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 3) {
+          Mockito.doThrow(new IOException()).when(spylfs)
+            .getFileStatus(eq(appLocalDirs.get(i)));
+          Mockito.doThrow(new UnsupportedFileSystemException("test"))
+            .when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
+        } else {
+          doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i)));
+          doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i)));
+        }
+      }
+      LocalizationEvent destroyApp =
+          new ApplicationLocalizationEvent(
+            LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app);
+      spyService.handle(destroyApp);
+      verify(applicationBus).handle(argThat(matchesAppDestroy));
+
+      // verify we got the right delete calls
+      for (int i = 0; i < containerLocalDirs.size(); ++i) {
+        if (i == 3) {
+          try {
+            verify(delService).delete(user, containerLocalDirs.get(i));
+            verify(delService).delete(null, nmLocalContainerDirs.get(i));
+            Assert.fail("deletion attempts for invalid dirs");
+          } catch (Throwable e) {
+            continue;
+          }
+        } else {
+          verify(delService).delete(user, appLocalDirs.get(i));
+          verify(delService).delete(null, nmLocalAppDirs.get(i));
+        }
+      }
+
+    } finally {
+      dispatcher.stop();
+      delService.stop();
+    }
+  }
 
 }


Mime
View raw message