Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5570F17FC7 for ; Tue, 21 Oct 2014 17:32:19 +0000 (UTC) Received: (qmail 5436 invoked by uid 500); 21 Oct 2014 17:32:18 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 5201 invoked by uid 500); 21 Oct 2014 17:32:18 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 5186 invoked by uid 99); 21 Oct 2014 17:32:18 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Oct 2014 17:32:18 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 837748B7D7C; Tue, 21 Oct 2014 17:32:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jlowe@apache.org To: common-commits@hadoop.apache.org Date: Tue, 21 Oct 2014 17:32:19 -0000 Message-Id: <3e584f23ec354ed5a5a9096b15ef24d6@git.apache.org> In-Reply-To: <8ba5713da2724bd0aeff5b56130ffcff@git.apache.org> References: <8ba5713da2724bd0aeff5b56130ffcff@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev YARN-90. NodeManager should identify failed disks becoming good again. Contributed by Varun Vasudev Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6f2028bd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6f2028bd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6f2028bd Branch: refs/heads/trunk Commit: 6f2028bd1514d90b831f889fd0ee7f2ba5c15000 Parents: b6f9d55 Author: Jason Lowe Authored: Tue Oct 21 17:29:22 2014 +0000 Committer: Jason Lowe Committed: Tue Oct 21 17:31:13 2014 +0000 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../server/nodemanager/DirectoryCollection.java | 219 ++++++++--- .../nodemanager/LocalDirsHandlerService.java | 176 +++++++-- .../nodemanager/NodeHealthCheckerService.java | 4 +- .../launcher/ContainerLaunch.java | 2 +- .../localizer/ResourceLocalizationService.java | 270 ++++++++++--- .../logaggregation/AppLogAggregatorImpl.java | 35 +- .../logaggregation/LogAggregationService.java | 13 +- .../loghandler/NonAggregatingLogHandler.java | 47 ++- .../nodemanager/TestDirectoryCollection.java | 75 +++- .../TestLocalDirsHandlerService.java | 43 ++- .../nodemanager/TestNodeHealthService.java | 2 +- .../TestResourceLocalizationService.java | 312 ++++++++++++++- .../TestLogAggregationService.java | 147 ++++--- .../TestNonAggregatingLogHandler.java | 382 +++++++++++++++---- 15 files changed, 1441 insertions(+), 289 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index b130ecf..af056b3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -379,6 +379,9 @@ Release 2.6.0 - UNRELEASED YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan Gong via zjshen) + YARN-90. NodeManager should identify failed disks becoming good again + (Varun Vasudev via jlowe) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java index f6ee128..279787b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DirectoryCollection.java @@ -21,18 +21,23 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.DiskChecker.DiskErrorException; /** * Manages a list of local storage directories. @@ -40,9 +45,38 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; class DirectoryCollection { private static final Log LOG = LogFactory.getLog(DirectoryCollection.class); + public enum DiskErrorCause { + DISK_FULL, OTHER + } + + static class DiskErrorInformation { + DiskErrorCause cause; + String message; + + DiskErrorInformation(DiskErrorCause cause, String message) { + this.cause = cause; + this.message = message; + } + } + + /** + * Returns a merged list which contains all the elements of l1 and l2 + * @param l1 the first list to be included + * @param l2 the second list to be included + * @return a new list containing all the elements of the first and second list + */ + static List concat(List l1, List l2) { + List ret = new ArrayList(l1.size() + l2.size()); + ret.addAll(l1); + ret.addAll(l2); + return ret; + } + // Good local storage directories private List localDirs; - private List failedDirs; + private List errorDirs; + private List fullDirs; + private int numFailures; private float diskUtilizationPercentageCutoff; @@ -109,7 +143,9 @@ class DirectoryCollection { float utilizationPercentageCutOff, long utilizationSpaceCutOff) { localDirs = new CopyOnWriteArrayList(dirs); - failedDirs = new CopyOnWriteArrayList(); + errorDirs = new CopyOnWriteArrayList(); + fullDirs = new CopyOnWriteArrayList(); + diskUtilizationPercentageCutoff = utilizationPercentageCutOff; diskUtilizationSpaceCutoff = utilizationSpaceCutOff; diskUtilizationPercentageCutoff = @@ -131,7 +167,16 @@ class DirectoryCollection { * @return the failed directories */ synchronized List getFailedDirs() { - return Collections.unmodifiableList(failedDirs); + return Collections.unmodifiableList( + DirectoryCollection.concat(errorDirs, fullDirs)); + } + + /** + * @return the directories that have used all disk space + */ + + synchronized List getFullDirs() { + return fullDirs; } /** @@ -158,7 +203,7 @@ class DirectoryCollection { LOG.warn("Unable to create directory " + dir + " error " + e.getMessage() + ", removing from the list of valid directories."); localDirs.remove(dir); - failedDirs.add(dir); + errorDirs.add(dir); numFailures++; failed = true; } @@ -167,61 +212,147 @@ class DirectoryCollection { } /** - * Check the health of current set of local directories, updating the list - * of valid directories if necessary. - * @return true if there is a new disk-failure identified in - * this checking. false otherwise. + * Check the health of current set of local directories(good and failed), + * updating the list of valid directories if necessary. + * + * @return true if there is a new disk-failure identified in this + * checking or a failed directory passes the disk check false + * otherwise. */ synchronized boolean checkDirs() { - int oldNumFailures = numFailures; - HashSet checkFailedDirs = new HashSet(); - for (final String dir : localDirs) { + boolean setChanged = false; + Set preCheckGoodDirs = new HashSet(localDirs); + Set preCheckFullDirs = new HashSet(fullDirs); + Set preCheckOtherErrorDirs = new HashSet(errorDirs); + List failedDirs = DirectoryCollection.concat(errorDirs, fullDirs); + List allLocalDirs = + DirectoryCollection.concat(localDirs, failedDirs); + + Map dirsFailedCheck = testDirs(allLocalDirs); + + localDirs.clear(); + errorDirs.clear(); + fullDirs.clear(); + + for (Map.Entry entry : dirsFailedCheck + .entrySet()) { + String dir = entry.getKey(); + DiskErrorInformation errorInformation = entry.getValue(); + switch (entry.getValue().cause) { + case DISK_FULL: + fullDirs.add(entry.getKey()); + break; + case OTHER: + errorDirs.add(entry.getKey()); + break; + } + if (preCheckGoodDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error, " + errorInformation.message + + ", removing from list of valid directories"); + setChanged = true; + numFailures++; + } + } + for (String dir : allLocalDirs) { + if (!dirsFailedCheck.containsKey(dir)) { + localDirs.add(dir); + if (preCheckFullDirs.contains(dir) + || preCheckOtherErrorDirs.contains(dir)) { + setChanged = true; + LOG.info("Directory " + dir + + " passed disk check, adding to list of valid directories."); + } + } + } + Set postCheckFullDirs = new HashSet(fullDirs); + Set postCheckOtherDirs = new HashSet(errorDirs); + for (String dir : preCheckFullDirs) { + if (postCheckOtherDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } + } + + for (String dir : preCheckOtherErrorDirs) { + if (postCheckFullDirs.contains(dir)) { + LOG.warn("Directory " + dir + " error " + + dirsFailedCheck.get(dir).message); + } + } + return setChanged; + } + + Map testDirs(List dirs) { + HashMap ret = + new HashMap(); + for (final String dir : dirs) { + String msg; try { File testDir = new File(dir); DiskChecker.checkDir(testDir); - if (isDiskUsageUnderPercentageLimit(testDir)) { - LOG.warn("Directory " + dir - + " error, used space above threshold of " - + diskUtilizationPercentageCutoff - + "%, removing from the list of valid directories."); - checkFailedDirs.add(dir); - } else if (isDiskFreeSpaceWithinLimit(testDir)) { - LOG.warn("Directory " + dir + " error, free space below limit of " - + diskUtilizationSpaceCutoff - + "MB, removing from the list of valid directories."); - checkFailedDirs.add(dir); + if (isDiskUsageOverPercentageLimit(testDir)) { + msg = + "used space above threshold of " + + diskUtilizationPercentageCutoff + + "%"; + ret.put(dir, + new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg)); + continue; + } else if (isDiskFreeSpaceUnderLimit(testDir)) { + msg = + "free space below limit of " + diskUtilizationSpaceCutoff + + "MB"; + ret.put(dir, + new DiskErrorInformation(DiskErrorCause.DISK_FULL, msg)); + continue; } - } catch (DiskErrorException de) { - LOG.warn("Directory " + dir + " error " + de.getMessage() - + ", removing from the list of valid directories."); - checkFailedDirs.add(dir); + + // create a random dir to make sure fs isn't in read-only mode + verifyDirUsingMkdir(testDir); + } catch (IOException ie) { + ret.put(dir, + new DiskErrorInformation(DiskErrorCause.OTHER, ie.getMessage())); } } - for (String dir : checkFailedDirs) { - localDirs.remove(dir); - failedDirs.add(dir); - numFailures++; + return ret; + } + + /** + * Function to test whether a dir is working correctly by actually creating a + * random directory. + * + * @param dir + * the dir to test + */ + private void verifyDirUsingMkdir(File dir) throws IOException { + + String randomDirName = RandomStringUtils.randomAlphanumeric(5); + File target = new File(dir, randomDirName); + int i = 0; + while (target.exists()) { + + randomDirName = RandomStringUtils.randomAlphanumeric(5) + i; + target = new File(dir, randomDirName); + i++; + } + try { + DiskChecker.checkDir(target); + } finally { + FileUtils.deleteQuietly(target); } - return numFailures > oldNumFailures; } - - private boolean isDiskUsageUnderPercentageLimit(File dir) { + + private boolean isDiskUsageOverPercentageLimit(File dir) { float freePercentage = 100 * (dir.getUsableSpace() / (float) dir.getTotalSpace()); float usedPercentage = 100.0F - freePercentage; - if (usedPercentage > diskUtilizationPercentageCutoff - || usedPercentage >= 100.0F) { - return true; - } - return false; + return (usedPercentage > diskUtilizationPercentageCutoff + || usedPercentage >= 100.0F); } - private boolean isDiskFreeSpaceWithinLimit(File dir) { + private boolean isDiskFreeSpaceUnderLimit(File dir) { long freeSpace = dir.getUsableSpace() / (1024 * 1024); - if (freeSpace < this.diskUtilizationSpaceCutoff) { - return true; - } - return false; + return freeSpace < this.diskUtilizationSpaceCutoff; } private void createDir(FileContext localFs, Path dir, FsPermission perm) http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java index b053941..7d1aa53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/LocalDirsHandlerService.java @@ -21,7 +21,9 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.IOException; import java.net.URI; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; @@ -150,7 +152,7 @@ public class LocalDirsHandlerService extends AbstractService { boolean createSucceeded = localDirs.createNonExistentDirs(localFs, perm); createSucceeded &= logDirs.createNonExistentDirs(localFs, perm); if (!createSucceeded) { - updateDirsAfterFailure(); + updateDirsAfterTest(); } // Check the disk health immediately to weed out bad directories @@ -197,9 +199,52 @@ public class LocalDirsHandlerService extends AbstractService { } /** + * @return the local directories which have no disk space + */ + public List getDiskFullLocalDirs() { + return localDirs.getFullDirs(); + } + + /** + * @return the log directories that have no disk space + */ + public List getDiskFullLogDirs() { + return logDirs.getFullDirs(); + } + + /** + * Function to get the local dirs which should be considered when cleaning up + * resources. Contains the good local dirs and the local dirs that have reached + * the disk space limit + * + * @return the local dirs which should be considered for cleaning up + */ + public List getLocalDirsForCleanup() { + return DirectoryCollection.concat(localDirs.getGoodDirs(), + localDirs.getFullDirs()); + } + + /** + * Function to get the log dirs which should be considered when cleaning up + * resources. Contains the good log dirs and the log dirs that have reached + * the disk space limit + * + * @return the log dirs which should be considered for cleaning up + */ + public List getLogDirsForCleanup() { + return DirectoryCollection.concat(logDirs.getGoodDirs(), + logDirs.getFullDirs()); + } + + /** + * Function to generate a report on the state of the disks. + * + * @param listGoodDirs + * flag to determine whether the report should report the state of + * good dirs or failed dirs * @return the health report of nm-local-dirs and nm-log-dirs */ - public String getDisksHealthReport() { + public String getDisksHealthReport(boolean listGoodDirs) { if (!isDiskHealthCheckerEnabled) { return ""; } @@ -207,20 +252,31 @@ public class LocalDirsHandlerService extends AbstractService { StringBuilder report = new StringBuilder(); List failedLocalDirsList = localDirs.getFailedDirs(); List failedLogDirsList = logDirs.getFailedDirs(); - int numLocalDirs = localDirs.getGoodDirs().size() - + failedLocalDirsList.size(); - int numLogDirs = logDirs.getGoodDirs().size() + failedLogDirsList.size(); - if (!failedLocalDirsList.isEmpty()) { - report.append(failedLocalDirsList.size() + "/" + numLocalDirs - + " local-dirs turned bad: " - + StringUtils.join(",", failedLocalDirsList) + ";"); - } - if (!failedLogDirsList.isEmpty()) { - report.append(failedLogDirsList.size() + "/" + numLogDirs - + " log-dirs turned bad: " - + StringUtils.join(",", failedLogDirsList)); + List goodLocalDirsList = localDirs.getGoodDirs(); + List goodLogDirsList = logDirs.getGoodDirs(); + int numLocalDirs = goodLocalDirsList.size() + failedLocalDirsList.size(); + int numLogDirs = goodLogDirsList.size() + failedLogDirsList.size(); + if (!listGoodDirs) { + if (!failedLocalDirsList.isEmpty()) { + report.append(failedLocalDirsList.size() + "/" + numLocalDirs + + " local-dirs are bad: " + + StringUtils.join(",", failedLocalDirsList) + "; "); + } + if (!failedLogDirsList.isEmpty()) { + report.append(failedLogDirsList.size() + "/" + numLogDirs + + " log-dirs are bad: " + StringUtils.join(",", failedLogDirsList)); + } + } else { + report.append(goodLocalDirsList.size() + "/" + numLocalDirs + + " local-dirs are good: " + StringUtils.join(",", goodLocalDirsList) + + "; "); + report.append(goodLogDirsList.size() + "/" + numLogDirs + + " log-dirs are good: " + StringUtils.join(",", goodLogDirsList)); + } + return report.toString(); + } /** @@ -262,8 +318,8 @@ public class LocalDirsHandlerService extends AbstractService { * Set good local dirs and good log dirs in the configuration so that the * LocalDirAllocator objects will use this updated configuration only. */ - private void updateDirsAfterFailure() { - LOG.info("Disk(s) failed. " + getDisksHealthReport()); + private void updateDirsAfterTest() { + Configuration conf = getConfig(); List localDirs = getLocalDirs(); conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, @@ -273,23 +329,91 @@ public class LocalDirsHandlerService extends AbstractService { logDirs.toArray(new String[logDirs.size()])); if (!areDisksHealthy()) { // Just log. - LOG.error("Most of the disks failed. " + getDisksHealthReport()); + LOG.error("Most of the disks failed. " + getDisksHealthReport(false)); } } + private void logDiskStatus(boolean newDiskFailure, boolean diskTurnedGood) { + if (newDiskFailure) { + String report = getDisksHealthReport(false); + LOG.info("Disk(s) failed: " + report); + } + if (diskTurnedGood) { + String report = getDisksHealthReport(true); + LOG.info("Disk(s) turned good: " + report); + } + + } + private void checkDirs() { - boolean newFailure = false; - if (localDirs.checkDirs()) { - newFailure = true; - } - if (logDirs.checkDirs()) { - newFailure = true; + boolean disksStatusChange = false; + Set failedLocalDirsPreCheck = + new HashSet(localDirs.getFailedDirs()); + Set failedLogDirsPreCheck = + new HashSet(logDirs.getFailedDirs()); + + if (localDirs.checkDirs()) { + disksStatusChange = true; + } + if (logDirs.checkDirs()) { + disksStatusChange = true; + } + + Set failedLocalDirsPostCheck = + new HashSet(localDirs.getFailedDirs()); + Set failedLogDirsPostCheck = + new HashSet(logDirs.getFailedDirs()); + + boolean disksFailed = false; + boolean disksTurnedGood = false; + + disksFailed = + disksTurnedBad(failedLocalDirsPreCheck, failedLocalDirsPostCheck); + disksTurnedGood = + disksTurnedGood(failedLocalDirsPreCheck, failedLocalDirsPostCheck); + + // skip check if we have new failed or good local dirs since we're going to + // log anyway + if (!disksFailed) { + disksFailed = + disksTurnedBad(failedLogDirsPreCheck, failedLogDirsPostCheck); + } + if (!disksTurnedGood) { + disksTurnedGood = + disksTurnedGood(failedLogDirsPreCheck, failedLogDirsPostCheck); + } + + logDiskStatus(disksFailed, disksTurnedGood); + + if (disksStatusChange) { + updateDirsAfterTest(); + } + + lastDisksCheckTime = System.currentTimeMillis(); + } + + private boolean disksTurnedBad(Set preCheckFailedDirs, + Set postCheckDirs) { + boolean disksFailed = false; + for (String dir : postCheckDirs) { + if (!preCheckFailedDirs.contains(dir)) { + disksFailed = true; + break; } + } + return disksFailed; + } - if (newFailure) { - updateDirsAfterFailure(); + private boolean disksTurnedGood(Set preCheckDirs, + Set postCheckDirs) { + boolean disksTurnedGood = false; + for (String dir : preCheckDirs) { + if (!postCheckDirs.contains(dir)) { + disksTurnedGood = true; + break; } - lastDisksCheckTime = System.currentTimeMillis(); + } + return disksTurnedGood; } public Path getLocalPathForWrite(String pathStr) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java index 446d05c..6d6001a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeHealthCheckerService.java @@ -55,9 +55,9 @@ public class NodeHealthCheckerService extends CompositeService { String scriptReport = (nodeHealthScriptRunner == null) ? "" : nodeHealthScriptRunner.getHealthReport(); if (scriptReport.equals("")) { - return dirsHandler.getDisksHealthReport(); + return dirsHandler.getDisksHealthReport(false); } else { - return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport()); + return scriptReport.concat(SEPARATOR + dirsHandler.getDisksHealthReport(false)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java index 87a36c4..f87ed6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java @@ -240,7 +240,7 @@ public class ContainerLaunch implements Callable { if (!dirsHandler.areDisksHealthy()) { ret = ContainerExitStatus.DISKS_FAILED; throw new IOException("Most of the disks failed. " - + dirsHandler.getDisksHealthReport()); + + dirsHandler.getDisksHealthReport(false)); } try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java index d3b33e8..371684b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java @@ -55,11 +55,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.security.Credentials; @@ -170,6 +172,8 @@ public class ResourceLocalizationService extends CompositeService */ private final ConcurrentMap appRsrc = new ConcurrentHashMap(); + + FileContext lfs; public ResourceLocalizationService(Dispatcher dispatcher, ContainerExecutor exec, DeletionService delService, @@ -219,32 +223,17 @@ public class ResourceLocalizationService extends CompositeService this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); try { - FileContext lfs = getLocalFileContext(conf); - lfs.setUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); - - if (!stateStore.canRecover() || stateStore.isNewlyCreated()) { - cleanUpLocalDir(lfs,delService); - } - - List localDirs = dirsHandler.getLocalDirs(); - for (String localDir : localDirs) { - // $local/usercache - Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); - lfs.mkdir(userDir, null, true); - // $local/filecache - Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); - lfs.mkdir(fileDir, null, true); - // $local/nmPrivate - Path sysDir = new Path(localDir, NM_PRIVATE_DIR); - lfs.mkdir(sysDir, NM_PRIVATE_PERM, true); - } + lfs = getLocalFileContext(conf); + lfs.setUMask(new FsPermission((short) FsPermission.DEFAULT_UMASK)); - List logDirs = dirsHandler.getLogDirs(); - for (String logDir : logDirs) { - lfs.mkdir(new Path(logDir), null, true); + if (!stateStore.canRecover()|| stateStore.isNewlyCreated()) { + cleanUpLocalDirs(lfs, delService); + initializeLocalDirs(lfs); + initializeLogDirs(lfs); } - } catch (IOException e) { - throw new YarnRuntimeException("Failed to initialize LocalizationService", e); + } catch (Exception e) { + throw new YarnRuntimeException( + "Failed to initialize LocalizationService", e); } cacheTargetSize = @@ -497,28 +486,45 @@ public class ResourceLocalizationService extends CompositeService String containerIDStr = c.toString(); String appIDStr = ConverterUtils.toString( c.getContainerId().getApplicationAttemptId().getApplicationId()); - for (String localDir : dirsHandler.getLocalDirs()) { + + // Try deleting from good local dirs and full local dirs because a dir might + // have gone bad while the app was running(disk full). In addition + // a dir might have become good while the app was running. + // Check if the container dir exists and if it does, try to delete it + for (String localDir : dirsHandler.getLocalDirsForCleanup()) { // Delete the user-owned container-dir Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); Path containerDir = new Path(appDir, containerIDStr); - delService.delete(userName, containerDir, new Path[] {}); + submitDirForDeletion(userName, containerDir); // Delete the nmPrivate container-dir - + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); Path containerSysDir = new Path(appSysDir, containerIDStr); - delService.delete(null, containerSysDir, new Path[] {}); + submitDirForDeletion(null, containerSysDir); } dispatcher.getEventHandler().handle( new ContainerEvent(c.getContainerId(), ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)); } + + private void submitDirForDeletion(String userName, Path dir) { + try { + lfs.getFileStatus(dir); + delService.delete(userName, dir, new Path[] {}); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Local dir " + dir + " is an unsupported filesystem", ue); + } catch (IOException ie) { + // ignore + return; + } + } @SuppressWarnings({"unchecked"}) @@ -545,19 +551,22 @@ public class ResourceLocalizationService extends CompositeService } // Delete the application directories - for (String localDir : dirsHandler.getLocalDirs()) { + userName = application.getUser(); + appIDStr = application.toString(); + + for (String localDir : dirsHandler.getLocalDirsForCleanup()) { // Delete the user-owned app-dir Path usersdir = new Path(localDir, ContainerLocalizer.USERCACHE); Path userdir = new Path(usersdir, userName); Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); Path appDir = new Path(allAppsdir, appIDStr); - delService.delete(userName, appDir, new Path[] {}); + submitDirForDeletion(userName, appDir); // Delete the nmPrivate app-dir Path sysDir = new Path(localDir, NM_PRIVATE_DIR); Path appSysDir = new Path(sysDir, appIDStr); - delService.delete(null, appSysDir, new Path[] {}); + submitDirForDeletion(null, appSysDir); } // TODO: decrement reference counts of all resources associated with this @@ -590,8 +599,8 @@ public class ResourceLocalizationService extends CompositeService private String getAppFileCachePath(String user, String appId) { return StringUtils.join(Path.SEPARATOR, Arrays.asList(".", - ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId, - ContainerLocalizer.FILECACHE)); + ContainerLocalizer.USERCACHE, user, ContainerLocalizer.APPCACHE, appId, + ContainerLocalizer.FILECACHE)); } @VisibleForTesting @@ -868,7 +877,7 @@ public class ResourceLocalizationService extends CompositeService /** * Find next resource to be given to a spawned localizer. * - * @return + * @return the next resource to be localized */ private LocalResource findNextResource() { synchronized (pending) { @@ -1071,8 +1080,8 @@ public class ResourceLocalizationService extends CompositeService // 1) write credentials to private dir writeCredentials(nmPrivateCTokensPath); // 2) exec initApplication and wait - List localDirs = dirsHandler.getLocalDirs(); - List logDirs = dirsHandler.getLogDirs(); + List localDirs = getInitializedLocalDirs(); + List logDirs = getInitializedLogDirs(); if (dirsHandler.areDisksHealthy()) { exec.startLocalizer(nmPrivateCTokensPath, localizationServerAddress, context.getUser(), @@ -1082,7 +1091,7 @@ public class ResourceLocalizationService extends CompositeService localizerId, localDirs, logDirs); } else { throw new IOException("All disks failed. " - + dirsHandler.getDisksHealthReport()); + + dirsHandler.getDisksHealthReport(false)); } // TODO handle ExitCodeException separately? } catch (Exception e) { @@ -1151,24 +1160,95 @@ public class ResourceLocalizationService extends CompositeService } - private void cleanUpLocalDir(FileContext lfs, DeletionService del) { - long currentTimeStamp = System.currentTimeMillis(); - for (String localDir : dirsHandler.getLocalDirs()) { - renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE, - currentTimeStamp); - renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE, - currentTimeStamp); - renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, - currentTimeStamp); + private void initializeLocalDirs(FileContext lfs) { + List localDirs = dirsHandler.getLocalDirs(); + for (String localDir : localDirs) { + initializeLocalDir(lfs, localDir); + } + } + + private void initializeLocalDir(FileContext lfs, String localDir) { + + Map pathPermissionMap = getLocalDirsPathPermissionsMap(localDir); + for (Map.Entry entry : pathPermissionMap.entrySet()) { + FileStatus status; try { - deleteLocalDir(lfs, del, localDir); - } catch (IOException e) { - // Do nothing, just give the warning - LOG.warn("Failed to delete localDir: " + localDir); + status = lfs.getFileStatus(entry.getKey()); + } + catch(FileNotFoundException fs) { + status = null; + } + catch(IOException ie) { + String msg = "Could not get file status for local dir " + entry.getKey(); + LOG.warn(msg, ie); + throw new YarnRuntimeException(msg, ie); + } + if(status == null) { + try { + lfs.mkdir(entry.getKey(), entry.getValue(), true); + status = lfs.getFileStatus(entry.getKey()); + } catch (IOException e) { + String msg = "Could not initialize local dir " + entry.getKey(); + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + FsPermission perms = status.getPermission(); + if(!perms.equals(entry.getValue())) { + try { + lfs.setPermission(entry.getKey(), entry.getValue()); + } + catch(IOException ie) { + String msg = "Could not set permissions for local dir " + entry.getKey(); + LOG.warn(msg, ie); + throw new YarnRuntimeException(msg, ie); + } } } } + private void initializeLogDirs(FileContext lfs) { + List logDirs = dirsHandler.getLogDirs(); + for (String logDir : logDirs) { + initializeLogDir(lfs, logDir); + } + } + + private void initializeLogDir(FileContext lfs, String logDir) { + try { + lfs.mkdir(new Path(logDir), null, true); + } catch (FileAlreadyExistsException fe) { + // do nothing + } catch (IOException e) { + String msg = "Could not initialize log dir " + logDir; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + + private void cleanUpLocalDirs(FileContext lfs, DeletionService del) { + for (String localDir : dirsHandler.getLocalDirs()) { + cleanUpLocalDir(lfs, del, localDir); + } + } + + private void cleanUpLocalDir(FileContext lfs, DeletionService del, + String localDir) { + long currentTimeStamp = System.currentTimeMillis(); + renameLocalDir(lfs, localDir, ContainerLocalizer.USERCACHE, + currentTimeStamp); + renameLocalDir(lfs, localDir, ContainerLocalizer.FILECACHE, + currentTimeStamp); + renameLocalDir(lfs, localDir, ResourceLocalizationService.NM_PRIVATE_DIR, + currentTimeStamp); + try { + deleteLocalDir(lfs, del, localDir); + } catch (IOException e) { + // Do nothing, just give the warning + LOG.warn("Failed to delete localDir: " + localDir); + } + } + private void renameLocalDir(FileContext lfs, String localDir, String localSubDir, long currentTimeStamp) { try { @@ -1234,5 +1314,95 @@ public class ResourceLocalizationService extends CompositeService del.scheduleFileDeletionTask(dependentDeletionTask); } } + + /** + * Synchronized method to get a list of initialized local dirs. Method will + * check each local dir to ensure it has been setup correctly and will attempt + * to fix any issues it finds. + * + * @return list of initialized local dirs + */ + synchronized private List getInitializedLocalDirs() { + List dirs = dirsHandler.getLocalDirs(); + List checkFailedDirs = new ArrayList(); + for (String dir : dirs) { + try { + checkLocalDir(dir); + } catch (YarnRuntimeException e) { + checkFailedDirs.add(dir); + } + } + for (String dir : checkFailedDirs) { + LOG.info("Attempting to initialize " + dir); + initializeLocalDir(lfs, dir); + try { + checkLocalDir(dir); + } catch (YarnRuntimeException e) { + String msg = + "Failed to setup local dir " + dir + ", which was marked as good."; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + return dirs; + } + + private boolean checkLocalDir(String localDir) { + + Map pathPermissionMap = getLocalDirsPathPermissionsMap(localDir); + + for (Map.Entry entry : pathPermissionMap.entrySet()) { + FileStatus status; + try { + status = lfs.getFileStatus(entry.getKey()); + } catch (Exception e) { + String msg = + "Could not carry out resource dir checks for " + localDir + + ", which was marked as good"; + LOG.warn(msg, e); + throw new YarnRuntimeException(msg, e); + } + + if (!status.getPermission().equals(entry.getValue())) { + String msg = + "Permissions incorrectly set for dir " + entry.getKey() + + ", should be " + entry.getValue() + ", actual value = " + + status.getPermission(); + LOG.warn(msg); + throw new YarnRuntimeException(msg); + } + } + return true; + } + + private Map getLocalDirsPathPermissionsMap(String localDir) { + Map localDirPathFsPermissionsMap = new HashMap(); + + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPrivatePermission = + NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + Path userDir = new Path(localDir, ContainerLocalizer.USERCACHE); + Path fileDir = new Path(localDir, ContainerLocalizer.FILECACHE); + Path sysDir = new Path(localDir, NM_PRIVATE_DIR); + + localDirPathFsPermissionsMap.put(userDir, defaultPermission); + localDirPathFsPermissionsMap.put(fileDir, defaultPermission); + localDirPathFsPermissionsMap.put(sysDir, nmPrivatePermission); + return localDirPathFsPermissionsMap; + } + + /** + * Synchronized method to get a list of initialized log dirs. Method will + * check each local dir to ensure it has been setup correctly and will attempt + * to fix any issues it finds. + * + * @return list of initialized log dirs + */ + synchronized private List getInitializedLogDirs() { + List dirs = dirsHandler.getLogDirs(); + initializeLogDirs(lfs); + return dirs; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java index 6e196bb..43cd7b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -37,9 +38,11 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -107,6 +110,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { private final AtomicBoolean appAggregationFinished = new AtomicBoolean(); private final AtomicBoolean aborted = new AtomicBoolean(); private final Map appAcls; + private final FileContext lfs; private final LogAggregationContext logAggregationContext; private final Context context; private final int retentionSize; @@ -122,8 +126,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator { LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, Map appAcls, - LogAggregationContext logAggregationContext, - Context context) { + LogAggregationContext logAggregationContext, Context context, + FileContext lfs) { this.dispatcher = dispatcher; this.conf = conf; this.delService = deletionService; @@ -136,6 +140,7 @@ public class AppLogAggregatorImpl implements AppLogAggregator { this.retentionPolicy = retentionPolicy; this.pendingContainers = new LinkedBlockingQueue(); this.appAcls = appAcls; + this.lfs = lfs; this.logAggregationContext = logAggregationContext; this.context = context; this.nodeId = nodeId; @@ -395,15 +400,25 @@ public class AppLogAggregatorImpl implements AppLogAggregator { uploadLogsForContainers(); // Remove the local app-log-dirs - List rootLogDirs = dirsHandler.getLogDirs(); - Path[] localAppLogDirs = new Path[rootLogDirs.size()]; - int index = 0; - for (String rootLogDir : rootLogDirs) { - localAppLogDirs[index] = new Path(rootLogDir, this.applicationId); - index++; + List localAppLogDirs = new ArrayList(); + for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { + Path logPath = new Path(rootLogDir, applicationId); + try { + // check if log dir exists + lfs.getFileStatus(logPath); + localAppLogDirs.add(logPath); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Log dir " + rootLogDir + "is an unsupported file system", ue); + continue; + } catch (IOException fe) { + continue; + } + } + + if (localAppLogDirs.size() > 0) { + this.delService.delete(this.userUgi.getShortUserName(), null, + localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); } - this.delService.delete(this.userUgi.getShortUserName(), null, - localAppLogDirs); this.dispatcher.getEventHandler().handle( new ApplicationEvent(this.appId, http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 1d6a9e1..77176b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -32,6 +32,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -326,6 +327,15 @@ public class LogAggregationService extends AbstractService implements } this.dispatcher.getEventHandler().handle(eventResponse); } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + protected void initAppAggregator(final ApplicationId appId, String user, Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy, @@ -344,7 +354,8 @@ public class LogAggregationService extends AbstractService implements new AppLogAggregatorImpl(this.dispatcher, this.deletionService, getConfig(), appId, userUgi, this.nodeId, dirsHandler, getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy, - appAcls, logAggregationContext, this.context); + appAcls, logAggregationContext, this.context, + getLocalFileContext(getConfig())); if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) { throw new YarnRuntimeException("Duplicate initApp for " + appId); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java index 40173e1..0422ef9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -28,11 +30,14 @@ import java.util.concurrent.RejectedExecutionException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; @@ -96,6 +101,15 @@ public class NonAggregatingLogHandler extends AbstractService implements } super.serviceStop(); } + + FileContext getLocalFileContext(Configuration conf) { + try { + return FileContext.getLocalFSFileContext(conf); + } catch (IOException e) { + throw new YarnRuntimeException("Failed to access local fs"); + } + } + @SuppressWarnings("unchecked") @Override @@ -160,21 +174,30 @@ public class NonAggregatingLogHandler extends AbstractService implements @Override @SuppressWarnings("unchecked") public void run() { - List rootLogDirs = - NonAggregatingLogHandler.this.dirsHandler.getLogDirs(); - Path[] localAppLogDirs = new Path[rootLogDirs.size()]; - int index = 0; - for (String rootLogDir : rootLogDirs) { - localAppLogDirs[index] = new Path(rootLogDir, applicationId.toString()); - index++; + List localAppLogDirs = new ArrayList(); + FileContext lfs = getLocalFileContext(getConfig()); + for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { + Path logDir = new Path(rootLogDir, applicationId.toString()); + try { + lfs.getFileStatus(logDir); + localAppLogDirs.add(logDir); + } catch (UnsupportedFileSystemException ue) { + LOG.warn("Unsupported file system used for log dir " + logDir, ue); + continue; + } catch (IOException ie) { + continue; + } } + // Inform the application before the actual delete itself, so that links - // to logs will no longer be there on NM web-UI. + // to logs will no longer be there on NM web-UI. NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle( - new ApplicationEvent(this.applicationId, - ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); - NonAggregatingLogHandler.this.delService.delete(user, null, - localAppLogDirs); + new ApplicationEvent(this.applicationId, + ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); + if (localAppLogDirs.size() > 0) { + NonAggregatingLogHandler.this.delService.delete(user, null, + (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); + } } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java index f19731f..e435375 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestDirectoryCollection.java @@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Before; import org.junit.Test; public class TestDirectoryCollection { @@ -42,8 +44,13 @@ public class TestDirectoryCollection { TestDirectoryCollection.class.getName()).getAbsoluteFile(); private static final File testFile = new File(testDir, "testfile"); + private Configuration conf; + private FileContext localFs; + @Before - public void setup() throws IOException { + public void setupForTests() throws IOException { + conf = new Configuration(); + localFs = FileContext.getLocalFSFileContext(conf); testDir.mkdirs(); testFile.createNewFile(); } @@ -56,11 +63,12 @@ public class TestDirectoryCollection { @Test public void testConcurrentAccess() throws IOException { // Initialize DirectoryCollection with a file instead of a directory - Configuration conf = new Configuration(); + String[] dirs = {testFile.getPath()}; - DirectoryCollection dc = new DirectoryCollection(dirs, - conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, - YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); + DirectoryCollection dc = + new DirectoryCollection(dirs, conf.getFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); // Create an iterator before checkDirs is called to reliable test case List list = dc.getGoodDirs(); @@ -78,9 +86,8 @@ public class TestDirectoryCollection { @Test public void testCreateDirectories() throws IOException { - Configuration conf = new Configuration(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); - FileContext localFs = FileContext.getLocalFSFileContext(conf); String dirA = new File(testDir, "dirA").getPath(); String dirB = new File(dirA, "dirB").getPath(); @@ -92,9 +99,10 @@ public class TestDirectoryCollection { localFs.setPermission(pathC, permDirC); String[] dirs = { dirA, dirB, dirC }; - DirectoryCollection dc = new DirectoryCollection(dirs, - conf.getFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, - YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); + DirectoryCollection dc = + new DirectoryCollection(dirs, conf.getFloat( + YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + YarnConfiguration.DEFAULT_NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE)); FsPermission defaultPerm = FsPermission.getDefault() .applyUMask(new FsPermission((short)FsPermission.DEFAULT_UMASK)); boolean createResult = dc.createNonExistentDirs(localFs, defaultPerm); @@ -120,25 +128,29 @@ public class TestDirectoryCollection { dc.checkDirs(); Assert.assertEquals(0, dc.getGoodDirs().size()); Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(1, dc.getFullDirs().size()); dc = new DirectoryCollection(dirs, 100.0F); dc.checkDirs(); Assert.assertEquals(1, dc.getGoodDirs().size()); Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); dc = new DirectoryCollection(dirs, testDir.getTotalSpace() / (1024 * 1024)); dc.checkDirs(); Assert.assertEquals(0, dc.getGoodDirs().size()); Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(1, dc.getFullDirs().size()); dc = new DirectoryCollection(dirs, 100.0F, 0); dc.checkDirs(); Assert.assertEquals(1, dc.getGoodDirs().size()); Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); } @Test - public void testDiskLimitsCutoffSetters() { + public void testDiskLimitsCutoffSetters() throws IOException { String[] dirs = { "dir" }; DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F, 100); @@ -163,6 +175,47 @@ public class TestDirectoryCollection { } @Test + public void testFailedDisksBecomingGoodAgain() throws Exception { + + String dirA = new File(testDir, "dirA").getPath(); + String[] dirs = { dirA }; + DirectoryCollection dc = new DirectoryCollection(dirs, 0.0F); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(1, dc.getFullDirs().size()); + + dc.setDiskUtilizationPercentageCutoff(100.0F); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + + String dirB = new File(testDir, "dirB").getPath(); + Path pathB = new Path(dirB); + FsPermission permDirB = new FsPermission((short) 0400); + + localFs.mkdir(pathB, null, true); + localFs.setPermission(pathB, permDirB); + + String[] dirs2 = { dirB }; + + dc = new DirectoryCollection(dirs2, 100.0F); + dc.checkDirs(); + Assert.assertEquals(0, dc.getGoodDirs().size()); + Assert.assertEquals(1, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); + permDirB = new FsPermission((short) 0700); + localFs.setPermission(pathB, permDirB); + dc.checkDirs(); + Assert.assertEquals(1, dc.getGoodDirs().size()); + Assert.assertEquals(0, dc.getFailedDirs().size()); + Assert.assertEquals(0, dc.getFullDirs().size()); + } + + @Test public void testConstructors() { String[] dirs = { "dir" }; http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java index 057ea91..e22b7f9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestLocalDirsHandlerService.java @@ -21,8 +21,13 @@ package org.apache.hadoop.yarn.server.nodemanager; import java.io.File; import java.io.IOException; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.service.Service.STATE; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -57,10 +62,11 @@ public class TestLocalDirsHandlerService { LocalDirsHandlerService dirSvc = new LocalDirsHandlerService(); dirSvc.init(conf); Assert.assertEquals(1, dirSvc.getLocalDirs().size()); + dirSvc.close(); } @Test - public void testValidPathsDirHandlerService() { + public void testValidPathsDirHandlerService() throws Exception { Configuration conf = new YarnConfiguration(); String localDir1 = new File("file:///" + testDir, "localDir1").getPath(); String localDir2 = new File("hdfs:///" + testDir, "localDir2").getPath(); @@ -76,5 +82,40 @@ public class TestLocalDirsHandlerService { Assert.assertEquals("Service should not be inited", STATE.STOPPED, dirSvc.getServiceState()); + dirSvc.close(); + } + + @Test + public void testGetFullDirs() throws Exception { + Configuration conf = new YarnConfiguration(); + + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, "077"); + FileContext localFs = FileContext.getLocalFSFileContext(conf); + + String localDir1 = new File(testDir, "localDir1").getPath(); + String localDir2 = new File(testDir, "localDir2").getPath(); + String logDir1 = new File(testDir, "logDir1").getPath(); + String logDir2 = new File(testDir, "logDir2").getPath(); + Path localDir1Path = new Path(localDir1); + Path logDir1Path = new Path(logDir1); + FsPermission dirPermissions = new FsPermission((short) 0410); + localFs.mkdir(localDir1Path, dirPermissions, true); + localFs.mkdir(logDir1Path, dirPermissions, true); + + conf.set(YarnConfiguration.NM_LOCAL_DIRS, localDir1 + "," + localDir2); + conf.set(YarnConfiguration.NM_LOG_DIRS, logDir1 + "," + logDir2); + conf.setFloat(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, + 0.0f); + LocalDirsHandlerService dirSvc = new LocalDirsHandlerService(); + dirSvc.init(conf); + Assert.assertEquals(0, dirSvc.getLocalDirs().size()); + Assert.assertEquals(0, dirSvc.getLogDirs().size()); + Assert.assertEquals(1, dirSvc.getDiskFullLocalDirs().size()); + Assert.assertEquals(1, dirSvc.getDiskFullLogDirs().size()); + FileUtils.deleteDirectory(new File(localDir1)); + FileUtils.deleteDirectory(new File(localDir2)); + FileUtils.deleteDirectory(new File(logDir1)); + FileUtils.deleteDirectory(new File(logDir1)); + dirSvc.close(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java index 6a28605..3542196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeHealthService.java @@ -196,7 +196,7 @@ public class TestNodeHealthService { healthStatus.getHealthReport().equals( NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG + NodeHealthCheckerService.SEPARATOR - + nodeHealthChecker.getDiskHandler().getDisksHealthReport())); + + nodeHealthChecker.getDiskHandler().getDisksHealthReport(false))); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f2028bd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java index fa5a4fc..d569fa7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java @@ -42,6 +42,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.net.InetSocketAddress; @@ -68,13 +69,16 @@ import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.security.AccessControlException; import org.junit.Assert; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.AbstractFileSystem; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; @@ -167,15 +171,15 @@ public class TestResourceLocalizationService { conf = new Configuration(); spylfs = spy(FileContext.getLocalFSFileContext().getDefaultFileSystem()); lfs = FileContext.getFileContext(spylfs, conf); - doNothing().when(spylfs).mkdir( - isA(Path.class), isA(FsPermission.class), anyBoolean()); + String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString(); conf.set(YarnConfiguration.NM_LOG_DIRS, logDir); } @After - public void cleanup() { + public void cleanup() throws IOException { conf = null; + FileUtils.deleteDirectory(new File(basedir.toString())); } @Test @@ -752,6 +756,39 @@ public class TestResourceLocalizationService { ResourceLocalizationService spyService = spy(rawService); doReturn(mockServer).when(spyService).createServer(); doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final Path userDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.USERCACHE); + final Path fileDir = + new Path(sDirs[0].substring("file:".length()), + ContainerLocalizer.FILECACHE); + final Path sysDir = + new Path(sDirs[0].substring("file:".length()), + ResourceLocalizationService.NM_PRIVATE_DIR); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", new Path(sDirs[0])); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", sysDir); + + doAnswer(new Answer() { + @Override + public FileStatus answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + if (args.length > 0) { + if (args[0].equals(userDir) || args[0].equals(fileDir)) { + return fs; + } + } + return nmFs; + } + }).when(spylfs).getFileStatus(isA(Path.class)); + try { spyService.init(conf); spyService.start(); @@ -1775,5 +1812,274 @@ public class TestResourceLocalizationService { return new Token(("ident" + id).getBytes(), ("passwd" + id).getBytes(), new Text("kind" + id), new Text("service" + id)); } + + /* + * Test to ensure ResourceLocalizationService can handle local dirs going bad. + * Test first sets up all the components required, then sends events to fetch + * a private, app and public resource. It then sends events to clean up the + * container and the app and ensures the right delete calls were made. + */ + @Test + @SuppressWarnings("unchecked") + // mocked generics + public void testFailedDirsResourceRelease() throws Exception { + // setup components + File f = new File(basedir.toString()); + String[] sDirs = new String[4]; + List localDirs = new ArrayList(sDirs.length); + for (int i = 0; i < 4; ++i) { + sDirs[i] = f.getAbsolutePath() + i; + localDirs.add(new Path(sDirs[i])); + } + List containerLocalDirs = new ArrayList(localDirs.size()); + List appLocalDirs = new ArrayList(localDirs.size()); + List nmLocalContainerDirs = new ArrayList(localDirs.size()); + List nmLocalAppDirs = new ArrayList(localDirs.size()); + conf.setStrings(YarnConfiguration.NM_LOCAL_DIRS, sDirs); + conf.setLong(YarnConfiguration.NM_DISK_HEALTH_CHECK_INTERVAL_MS, 500); + + LocalizerTracker mockLocallilzerTracker = mock(LocalizerTracker.class); + DrainDispatcher dispatcher = new DrainDispatcher(); + dispatcher.init(conf); + dispatcher.start(); + EventHandler applicationBus = mock(EventHandler.class); + dispatcher.register(ApplicationEventType.class, applicationBus); + EventHandler containerBus = mock(EventHandler.class); + dispatcher.register(ContainerEventType.class, containerBus); + // Ignore actual localization + EventHandler localizerBus = mock(EventHandler.class); + dispatcher.register(LocalizerEventType.class, localizerBus); + + ContainerExecutor exec = mock(ContainerExecutor.class); + LocalDirsHandlerService mockDirsHandler = + mock(LocalDirsHandlerService.class); + doReturn(new ArrayList(Arrays.asList(sDirs))).when( + mockDirsHandler).getLocalDirsForCleanup(); + + DeletionService delService = mock(DeletionService.class); + + // setup mocks + ResourceLocalizationService rawService = + new ResourceLocalizationService(dispatcher, exec, delService, + mockDirsHandler, new NMNullStateStoreService()); + ResourceLocalizationService spyService = spy(rawService); + doReturn(mockServer).when(spyService).createServer(); + doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker( + isA(Configuration.class)); + doReturn(lfs).when(spyService) + .getLocalFileContext(isA(Configuration.class)); + FsPermission defaultPermission = + FsPermission.getDirDefault().applyUMask(lfs.getUMask()); + FsPermission nmPermission = + ResourceLocalizationService.NM_PRIVATE_PERM.applyUMask(lfs.getUMask()); + final FileStatus fs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + defaultPermission, "", "", localDirs.get(0)); + final FileStatus nmFs = + new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0, + nmPermission, "", "", localDirs.get(0)); + + final String user = "user0"; + // init application + final Application app = mock(Application.class); + final ApplicationId appId = + BuilderUtils.newApplicationId(314159265358979L, 3); + when(app.getUser()).thenReturn(user); + when(app.getAppId()).thenReturn(appId); + when(app.toString()).thenReturn(ConverterUtils.toString(appId)); + + // init container. + final Container c = getMockContainer(appId, 42, user); + + // setup local app dirs + List tmpDirs = mockDirsHandler.getLocalDirs(); + for (int i = 0; i < tmpDirs.size(); ++i) { + Path usersdir = new Path(tmpDirs.get(i), ContainerLocalizer.USERCACHE); + Path userdir = new Path(usersdir, user); + Path allAppsdir = new Path(userdir, ContainerLocalizer.APPCACHE); + Path appDir = new Path(allAppsdir, ConverterUtils.toString(appId)); + Path containerDir = + new Path(appDir, ConverterUtils.toString(c.getContainerId())); + containerLocalDirs.add(containerDir); + appLocalDirs.add(appDir); + + Path sysDir = + new Path(tmpDirs.get(i), ResourceLocalizationService.NM_PRIVATE_DIR); + Path appSysDir = new Path(sysDir, ConverterUtils.toString(appId)); + Path containerSysDir = + new Path(appSysDir, ConverterUtils.toString(c.getContainerId())); + + nmLocalContainerDirs.add(containerSysDir); + nmLocalAppDirs.add(appSysDir); + } + + try { + spyService.init(conf); + spyService.start(); + + spyService.handle(new ApplicationLocalizationEvent( + LocalizationEventType.INIT_APPLICATION_RESOURCES, app)); + dispatcher.await(); + + // Get a handle on the trackers after they're setup with + // INIT_APP_RESOURCES + LocalResourcesTracker appTracker = + spyService.getLocalResourcesTracker( + LocalResourceVisibility.APPLICATION, user, appId); + LocalResourcesTracker privTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PRIVATE, + user, appId); + LocalResourcesTracker pubTracker = + spyService.getLocalResourcesTracker(LocalResourceVisibility.PUBLIC, + user, appId); + + // init resources + Random r = new Random(); + long seed = r.nextLong(); + r.setSeed(seed); + + // Send localization requests, one for each type of resource + final LocalResource privResource = getPrivateMockedResource(r); + final LocalResourceRequest privReq = + new LocalResourceRequest(privResource); + + final LocalResource appResource = getAppMockedResource(r); + final LocalResourceRequest appReq = new LocalResourceRequest(appResource); + + final LocalResource pubResource = getPublicMockedResource(r); + final LocalResourceRequest pubReq = new LocalResourceRequest(pubResource); + + Map> req = + new HashMap>(); + req.put(LocalResourceVisibility.PRIVATE, + Collections.singletonList(privReq)); + req.put(LocalResourceVisibility.APPLICATION, + Collections.singletonList(appReq)); + req + .put(LocalResourceVisibility.PUBLIC, Collections.singletonList(pubReq)); + + Map> req2 = + new HashMap>(); + req2.put(LocalResourceVisibility.PRIVATE, + Collections.singletonList(privReq)); + + // Send Request event + spyService.handle(new ContainerLocalizationRequestEvent(c, req)); + spyService.handle(new ContainerLocalizationRequestEvent(c, req2)); + dispatcher.await(); + + int privRsrcCount = 0; + for (LocalizedResource lr : privTracker) { + privRsrcCount++; + Assert.assertEquals("Incorrect reference count", 2, lr.getRefCount()); + Assert.assertEquals(privReq, lr.getRequest()); + } + Assert.assertEquals(1, privRsrcCount); + + int appRsrcCount = 0; + for (LocalizedResource lr : appTracker) { + appRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + Assert.assertEquals(appReq, lr.getRequest()); + } + Assert.assertEquals(1, appRsrcCount); + + int pubRsrcCount = 0; + for (LocalizedResource lr : pubTracker) { + pubRsrcCount++; + Assert.assertEquals("Incorrect reference count", 1, lr.getRefCount()); + Assert.assertEquals(pubReq, lr.getRequest()); + } + Assert.assertEquals(1, pubRsrcCount); + + // setup mocks for test, a set of dirs with IOExceptions and let the rest + // go through + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 2) { + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(containerLocalDirs.get(i))); + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(nmLocalContainerDirs.get(i))); + } else { + doReturn(fs).when(spylfs) + .getFileStatus(eq(containerLocalDirs.get(i))); + doReturn(nmFs).when(spylfs).getFileStatus( + eq(nmLocalContainerDirs.get(i))); + } + } + + // Send Cleanup Event + spyService.handle(new ContainerLocalizationCleanupEvent(c, req)); + verify(mockLocallilzerTracker).cleanupPrivLocalizers( + "container_314159265358979_0003_01_000042"); + + // match cleanup events with the mocks we setup earlier + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 2) { + try { + verify(delService).delete(user, containerLocalDirs.get(i)); + verify(delService).delete(null, nmLocalContainerDirs.get(i)); + Assert.fail("deletion attempts for invalid dirs"); + } catch (Throwable e) { + continue; + } + } else { + verify(delService).delete(user, containerLocalDirs.get(i)); + verify(delService).delete(null, nmLocalContainerDirs.get(i)); + } + } + + ArgumentMatcher matchesAppDestroy = + new ArgumentMatcher() { + @Override + public boolean matches(Object o) { + ApplicationEvent evt = (ApplicationEvent) o; + return (evt.getType() == ApplicationEventType.APPLICATION_RESOURCES_CLEANEDUP) + && appId == evt.getApplicationID(); + } + }; + + dispatcher.await(); + + // setup mocks again, this time throw UnsupportedFileSystemException and + // IOExceptions + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 3) { + Mockito.doThrow(new IOException()).when(spylfs) + .getFileStatus(eq(appLocalDirs.get(i))); + Mockito.doThrow(new UnsupportedFileSystemException("test")) + .when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i))); + } else { + doReturn(fs).when(spylfs).getFileStatus(eq(appLocalDirs.get(i))); + doReturn(nmFs).when(spylfs).getFileStatus(eq(nmLocalAppDirs.get(i))); + } + } + LocalizationEvent destroyApp = + new ApplicationLocalizationEvent( + LocalizationEventType.DESTROY_APPLICATION_RESOURCES, app); + spyService.handle(destroyApp); + verify(applicationBus).handle(argThat(matchesAppDestroy)); + + // verify we got the right delete calls + for (int i = 0; i < containerLocalDirs.size(); ++i) { + if (i == 3) { + try { + verify(delService).delete(user, containerLocalDirs.get(i)); + verify(delService).delete(null, nmLocalContainerDirs.get(i)); + Assert.fail("deletion attempts for invalid dirs"); + } catch (Throwable e) { + continue; + } + } else { + verify(delService).delete(user, appLocalDirs.get(i)); + verify(delService).delete(null, nmLocalAppDirs.get(i)); + } + } + + } finally { + dispatcher.stop(); + delService.stop(); + } + } }