Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8FFE77EDB for ; Fri, 2 Sep 2011 22:11:47 +0000 (UTC) Received: (qmail 63588 invoked by uid 500); 2 Sep 2011 22:11:47 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 63472 invoked by uid 500); 2 Sep 2011 22:11:46 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 63465 invoked by uid 99); 2 Sep 2011 22:11:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2011 22:11:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 02 Sep 2011 22:11:38 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D0B192388A68 for ; Fri, 2 Sep 2011 22:11:16 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1164742 - in /hadoop/common/branches/branch-0.20-security: ./ src/core/org/apache/hadoop/fs/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/ Date: Fri, 02 Sep 2011 22:11:16 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110902221116.D0B192388A68@eris.apache.org> Author: omalley Date: Fri Sep 2 22:11:15 2011 New Revision: 1164742 URL: http://svn.apache.org/viewvc?rev=1164742&view=rev Log: MAPREDUCE-2804. Fixed a race condition in setting up the log directories for tasks that are starting at the same time. (omalley) Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164742&r1=1164741&r2=1164742&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep 2 22:11:15 2011 @@ -140,6 +140,9 @@ Release 0.20.204.0 - 2011-8-25 BUG FIXES + MAPREDUCE-2804. Fixed a race condition in setting up the log directories + for tasks that are starting at the same time. (omalley) + MAPREDUCE-2846. Fixed a race condition in writing the log index file that caused tasks to fail. (omalley) Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java?rev=1164742&r1=1164741&r2=1164742&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java (original) +++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/FileUtil.java Fri Sep 2 22:11:15 2011 @@ -25,7 +25,10 @@ import java.util.zip.ZipFile; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell.ShellCommandExecutor; @@ -561,9 +564,25 @@ public class FileUtil { } catch(InterruptedException e){ //do nothing as of yet } + if (returnVal != 0) { + LOG.warn("Command '" + cmd + "' failed " + returnVal + + " with: " + copyStderr(p)); + } return returnVal; } + private static String copyStderr(Process p) throws IOException { + InputStream err = p.getErrorStream(); + StringBuilder result = new StringBuilder(); + byte[] buff = new byte[4096]; + int len = err.read(buff); + while (len > 0) { + result.append(new String(buff, 0 , len)); + len = err.read(buff); + } + return result.toString(); + } + /** * Change the permissions on a filename. * @param filename the name of the file to change @@ -608,6 +627,82 @@ public class FileUtil { } return shExec.getExitCode(); } + + /** + * Set permissions to the required value. Uses the java primitives instead + * of forking if group == other. + * @param f the file to change + * @param permission the new permissions + * @throws IOException + */ + public static void setPermission(File f, FsPermission permission + ) throws IOException { + FsAction user = permission.getUserAction(); + FsAction group = permission.getGroupAction(); + FsAction other = permission.getOtherAction(); + + // use the native/fork if the group/other permissions are different + // or if the native is available + if (group != other || NativeIO.isAvailable()) { + execSetPermission(f, permission); + return; + } + + boolean rv = true; + + // read perms + rv = f.setReadable(group.implies(FsAction.READ), false); + checkReturnValue(rv, f, permission); + if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) { + f.setReadable(user.implies(FsAction.READ), true); + checkReturnValue(rv, f, permission); + } + + // write perms + rv = f.setWritable(group.implies(FsAction.WRITE), false); + checkReturnValue(rv, f, permission); + if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) { + f.setWritable(user.implies(FsAction.WRITE), true); + checkReturnValue(rv, f, permission); + } + + // exec perms + rv = f.setExecutable(group.implies(FsAction.EXECUTE), false); + checkReturnValue(rv, f, permission); + if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) { + f.setExecutable(user.implies(FsAction.EXECUTE), true); + checkReturnValue(rv, f, permission); + } + } + + private static void checkReturnValue(boolean rv, File p, + FsPermission permission + ) throws IOException { + if (!rv) { + throw new IOException("Failed to set permissions of path: " + p + + " to " + + String.format("%04o", permission.toShort())); + } + } + + private static void execSetPermission(File f, + FsPermission permission + ) throws IOException { + if (NativeIO.isAvailable()) { + NativeIO.chmod(f.getCanonicalPath(), permission.toShort()); + } else { + execCommand(f, Shell.SET_PERMISSION_COMMAND, + String.format("%04o", permission.toShort())); + } + } + + static String execCommand(File f, String... cmd) throws IOException { + String[] args = new String[cmd.length + 1]; + System.arraycopy(cmd, 0, args, 0, cmd.length); + args[cmd.length] = f.getCanonicalPath(); + String output = Shell.execCommand(args); + return output; + } /** * Create a tmp file for a base file. Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java?rev=1164742&r1=1164741&r2=1164742&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java (original) +++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/fs/RawLocalFileSystem.java Fri Sep 2 22:11:15 2011 @@ -21,7 +21,6 @@ package org.apache.hadoop.fs; import java.io.*; import java.net.URI; import java.nio.ByteBuffer; -import java.nio.channels.FileLock; import java.util.*; import org.apache.hadoop.conf.Configuration; @@ -259,6 +258,7 @@ public class RawLocalFileSystem extends if (pathToFile(src).renameTo(pathToFile(dst))) { return true; } + LOG.debug("Falling through to a copy of " + src + " to " + dst); return FileUtil.copy(this, src, this, dst, true, getConf()); } @@ -416,8 +416,8 @@ public class RawLocalFileSystem extends IOException e = null; try { StringTokenizer t = new StringTokenizer( - execCommand(new File(getPath().toUri()), - Shell.getGET_PERMISSION_COMMAND())); + FileUtil.execCommand(new File(getPath().toUri()), + Shell.getGET_PERMISSION_COMMAND())); //expected format //-rw------- 1 username groupname ... String permission = t.nextToken(); @@ -467,11 +467,11 @@ public class RawLocalFileSystem extends } if (username == null) { - execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); + FileUtil.execCommand(pathToFile(p), Shell.SET_GROUP_COMMAND, groupname); } else { //OWNER[:[GROUP]] String s = username + (groupname == null? "": ":" + groupname); - execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s); + FileUtil.execCommand(pathToFile(p), Shell.SET_OWNER_COMMAND, s); } } @@ -480,70 +480,7 @@ public class RawLocalFileSystem extends */ @Override public void setPermission(Path p, FsPermission permission - ) throws IOException { - FsAction user = permission.getUserAction(); - FsAction group = permission.getGroupAction(); - FsAction other = permission.getOtherAction(); - - File f = pathToFile(p); - - // Fork chmod if group and other permissions are different... - if (group != other) { - execSetPermission(f, permission); - return; - } - - boolean rv = true; - - // read perms - rv = f.setReadable(group.implies(FsAction.READ), false); - checkReturnValue(rv, p, permission); - if (group.implies(FsAction.READ) != user.implies(FsAction.READ)) { - f.setReadable(user.implies(FsAction.READ), true); - checkReturnValue(rv, p, permission); - } - - // write perms - rv = f.setWritable(group.implies(FsAction.WRITE), false); - checkReturnValue(rv, p, permission); - if (group.implies(FsAction.WRITE) != user.implies(FsAction.WRITE)) { - f.setWritable(user.implies(FsAction.WRITE), true); - checkReturnValue(rv, p, permission); - } - - // exec perms - rv = f.setExecutable(group.implies(FsAction.EXECUTE), false); - checkReturnValue(rv, p, permission); - if (group.implies(FsAction.EXECUTE) != user.implies(FsAction.EXECUTE)) { - f.setExecutable(user.implies(FsAction.EXECUTE), true); - checkReturnValue(rv, p, permission); - } - } - - private void checkReturnValue(boolean rv, Path p, FsPermission permission) - throws IOException { - if (!rv) { - throw new IOException("Failed to set permissions of path: " + p + " to " + - String.format("%04o", permission.toShort())); - } - } - - private void execSetPermission(File f, FsPermission permission) - throws IOException { - if (NativeIO.isAvailable()) { - NativeIO.chmod(f.getCanonicalPath(), - permission.toShort()); - } else { - execCommand(f, Shell.SET_PERMISSION_COMMAND, - String.format("%04o", permission.toShort())); - } - } - - private static String execCommand(File f, String... cmd) throws IOException { - String[] args = new String[cmd.length + 1]; - System.arraycopy(cmd, 0, args, 0, cmd.length); - args[cmd.length] = f.getCanonicalPath(); - String output = Shell.execCommand(args); - return output; + ) throws IOException { + FileUtil.setPermission(pathToFile(p), permission); } } Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java?rev=1164742&r1=1164741&r2=1164742&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/DefaultTaskController.java Fri Sep 2 22:11:15 2011 @@ -68,12 +68,8 @@ public class DefaultTaskController exten @Override public void createLogDir(TaskAttemptID taskID, boolean isCleanup) throws IOException { - boolean b = TaskLog.createTaskAttemptLogDir(taskID, isCleanup, - localStorage.getGoodLocalDirs()); - if (!b) { - LOG.warn("Creation of attempt log dir for " + taskID - + " failed. Ignoring"); - } + TaskLog.createTaskAttemptLogDir(taskID, isCleanup, + localStorage.getGoodLocalDirs()); } /** Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=1164742&r1=1164741&r2=1164742&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskLog.java Fri Sep 2 22:11:15 2011 @@ -23,7 +23,6 @@ import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -50,7 +49,6 @@ import org.apache.hadoop.mapreduce.JobID import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.util.ProcessTree; import org.apache.hadoop.util.Shell; -import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Appender; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -92,48 +90,29 @@ public class TaskLog { * @param taskID attempt-id for which log dir is to be created * @param isCleanup Is this attempt a cleanup attempt ? * @param localDirs mapred local directories - * @return true if attempt log directory creation is succeeded * @throws IOException */ - public static boolean createTaskAttemptLogDir(TaskAttemptID taskID, + public static void createTaskAttemptLogDir(TaskAttemptID taskID, boolean isCleanup, String[] localDirs) throws IOException{ String cleanupSuffix = isCleanup ? ".cleanup" : ""; String strAttemptLogDir = getTaskAttemptLogDir(taskID, cleanupSuffix, localDirs); File attemptLogDir = new File(strAttemptLogDir); - boolean isSucceeded = attemptLogDir.mkdirs(); - if(isSucceeded) { - String strLinkAttemptLogDir = getJobDir( - taskID.getJobID()).getAbsolutePath() + File.separatorChar + - taskID.toString() + cleanupSuffix; - if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) { - LOG.warn("Creation of symlink to attempt log dir failed."); - isSucceeded = false; - } - - File linkAttemptLogDir = new File(strLinkAttemptLogDir); - if (!linkAttemptLogDir.isDirectory() && !linkAttemptLogDir.mkdirs()) { - LOG.warn("Unable to create linkAttemptLogDir directory : " - + linkAttemptLogDir.getPath()); - isSucceeded = false; - } - - FileSystem localFS = FileSystem.getLocal(new Configuration()); - - //Set permissions for target attempt log dir - localFS.setPermission(new Path(attemptLogDir.getCanonicalPath()), - new FsPermission((short)0700)); - - //Set permissions for target job log dir - localFS.setPermission(new Path(attemptLogDir.getParentFile().getCanonicalPath()), - new FsPermission((short)0700)); - - // Set permissions for job dir in userlogs - localFS.setPermission( - new Path(linkAttemptLogDir.getParentFile().getCanonicalPath()), - new FsPermission((short)0700)); + if (!attemptLogDir.mkdirs()) { + throw new IOException("Creation of " + attemptLogDir + " failed."); } - return isSucceeded; + String strLinkAttemptLogDir = + getJobDir(taskID.getJobID()).getAbsolutePath() + File.separatorChar + + taskID.toString() + cleanupSuffix; + if (FileUtil.symLink(strAttemptLogDir, strLinkAttemptLogDir) != 0) { + throw new IOException("Creation of symlink from " + + strLinkAttemptLogDir + " to " + strAttemptLogDir + + " failed."); + } + + //Set permissions for target attempt log dir + FsPermission userOnly = new FsPermission((short) 0700); + FileUtil.setPermission(attemptLogDir, userOnly); } /** Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1164742&r1=1164741&r2=1164742&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (original) +++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Sep 2 22:11:15 2011 @@ -32,7 +32,6 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.mapred.TaskController; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskTracker; -import org.apache.hadoop.mapreduce.JobID; /** * @@ -112,15 +111,17 @@ public class Localizer { if (fs.exists(userDir) || fs.mkdirs(userDir)) { // Set permissions on the user-directory - fs.setPermission(userDir, new FsPermission((short)0700)); + FsPermission userOnly = new FsPermission((short) 0700); + FileUtil.setPermission(new File(userDir.toUri().getPath()), + userOnly); userDirStatus = true; // Set up the jobcache directory - Path jobCacheDir = - new Path(localDir, TaskTracker.getJobCacheSubdir(user)); - if (fs.exists(jobCacheDir) || fs.mkdirs(jobCacheDir)) { + File jobCacheDir = + new File(localDir, TaskTracker.getJobCacheSubdir(user)); + if (jobCacheDir.exists() || jobCacheDir.mkdirs()) { // Set permissions on the jobcache-directory - fs.setPermission(jobCacheDir, new FsPermission((short)0700)); + FileUtil.setPermission(jobCacheDir, userOnly); jobCacheDirStatus = true; } else { LOG.warn("Unable to create job cache directory : " @@ -128,11 +129,12 @@ public class Localizer { } // Set up the cache directory used for distributed cache files - Path distributedCacheDir = - new Path(localDir, TaskTracker.getPrivateDistributedCacheDir(user)); - if (fs.exists(distributedCacheDir) || fs.mkdirs(distributedCacheDir)) { + File distributedCacheDir = + new File(localDir, + TaskTracker.getPrivateDistributedCacheDir(user)); + if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) { // Set permissions on the distcache-directory - fs.setPermission(distributedCacheDir, new FsPermission((short)0700)); + FileUtil.setPermission(distributedCacheDir, userOnly); distributedCacheDirStatus = true; } else { LOG.warn("Unable to create distributed-cache directory : " @@ -164,51 +166,6 @@ public class Localizer { } /** - * Prepare the job directories for a given job. To be called by the job - * localization code, only if the job is not already localized. - * - *
- * Here, we set 700 permissions on the job directories created on all disks. - * This we do so as to avoid any misuse by other users till the time - * {@link TaskController#initializeJob} is run at a - * later time to set proper private permissions on the job directories.
- * - * @param user - * @param jobId - * @throws IOException - */ - public void initializeJobDirs(String user, JobID jobId) - throws IOException { - boolean initJobDirStatus = false; - String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString()); - for (String localDir : localDirs) { - Path jobDir = new Path(localDir, jobDirPath); - if (fs.exists(jobDir)) { - // this will happen on a partial execution of localizeJob. Sometimes - // copying job.xml to the local disk succeeds but copying job.jar might - // throw out an exception. We should clean up and then try again. - fs.delete(jobDir, true); - } - - boolean jobDirStatus = fs.mkdirs(jobDir); - if (!jobDirStatus) { - LOG.warn("Not able to create job directory " + jobDir.toString()); - } - - initJobDirStatus = initJobDirStatus || jobDirStatus; - - // job-dir has to be private to the TT - fs.setPermission(jobDir, new FsPermission((short)0700)); - } - - if (!initJobDirStatus) { - throw new IOException("Not able to initialize job directories " - + "in any of the configured local directories for job " - + jobId.toString()); - } - } - - /** * Create taskDirs on all the disks. Otherwise, in some cases, like when * LinuxTaskController is in use, child might wish to balance load across * disks but cannot itself create attempt directory because of the fact that @@ -244,18 +201,4 @@ public class Localizer { + attemptId); } } - - /** - * Create job log directory and set appropriate permissions for the directory. - * - * @param jobId - */ - public void initializeJobLogDir(JobID jobId) throws IOException { - Path jobUserLogDir = new Path(TaskLog.getJobDir(jobId).getCanonicalPath()); - if (!fs.mkdirs(jobUserLogDir)) { - throw new IOException("Could not create job user log directory: " + - jobUserLogDir); - } - fs.setPermission(jobUserLogDir, new FsPermission((short)0700)); - } }