Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 45953 invoked from network); 4 Mar 2011 03:43:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:43:06 -0000 Received: (qmail 79570 invoked by uid 500); 4 Mar 2011 03:43:06 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 79541 invoked by uid 500); 4 Mar 2011 03:43:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 79516 invoked by uid 99); 4 Mar 2011 03:43:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:43:05 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:43:00 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2D5012388C29; Fri, 4 Mar 2011 03:42:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077116 [2/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: c++/task-controller/ c++/task-controller/tests/ docs/src/documentation/content/xdocs/ mapred/org/apache/hadoop/filecache/ mapred/org/apache/hadoop/mapred/ mapred/... Date: Fri, 04 Mar 2011 03:42:39 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304034240.2D5012388C29@eris.apache.org> Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077116&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 03:42:38 2011 @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.server.tasktracker; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskController; +import org.apache.hadoop.mapred.TaskTracker; +import org.apache.hadoop.mapred.TaskController.InitializationContext; + +/** + * + * NOTE: This class is internal only and not intended for users!! + */ +public class Localizer { + + static final Log LOG = LogFactory.getLog(Localizer.class); + + private FileSystem fs; + private String[] localDirs; + private TaskController taskController; + + /** + * Create a Localizer instance + * + * @param fileSys + * @param lDirs + * @param tc + */ + public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) { + fs = fileSys; + localDirs = lDirs; + taskController = tc; + } + + /** + * NOTE: This class is internal only class and not intended for users!! + * + */ + public static class PermissionsHandler { + /** + * Permission information useful for setting permissions for a given path. + * Using this, one can set all possible combinations of permissions for the + * owner of the file. But permissions for the group and all others can only + * be set together, i.e. permissions for group cannot be set different from + * those for others and vice versa. + */ + public static class PermissionsInfo { + public boolean readPermissions; + public boolean writePermissions; + public boolean executablePermissions; + public boolean readPermsOwnerOnly; + public boolean writePermsOwnerOnly; + public boolean executePermsOwnerOnly; + + /** + * Create a permissions-info object with the given attributes + * + * @param readPerms + * @param writePerms + * @param executePerms + * @param readOwnerOnly + * @param writeOwnerOnly + * @param executeOwnerOnly + */ + public PermissionsInfo(boolean readPerms, boolean writePerms, + boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly, + boolean executeOwnerOnly) { + readPermissions = readPerms; + writePermissions = writePerms; + executablePermissions = executePerms; + readPermsOwnerOnly = readOwnerOnly; + writePermsOwnerOnly = writeOwnerOnly; + executePermsOwnerOnly = executeOwnerOnly; + } + } + + /** + * Set permission on the given file path using the specified permissions + * information. We use java api to set permission instead of spawning chmod + * processes. This saves a lot of time. Using this, one can set all possible + * combinations of permissions for the owner of the file. But permissions + * for the group and all others can only be set together, i.e. permissions + * for group cannot be set different from those for others and vice versa. + * + * This method should satisfy the needs of most of the applications. For + * those it doesn't, {@link FileUtil#chmod} can be used. + * + * @param f file path + * @param pInfo permissions information + * @return true if success, false otherwise + */ + public static boolean setPermissions(File f, PermissionsInfo pInfo) { + if (pInfo == null) { + LOG.debug(" PermissionsInfo is null, returning."); + return true; + } + + LOG.debug("Setting permission for " + f.getAbsolutePath()); + + boolean ret = true; + + // Clear all the flags + ret = f.setReadable(false, false) && ret; + ret = f.setWritable(false, false) && ret; + ret = f.setExecutable(false, false) && ret; + + ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly); + LOG.debug("Readable status for " + f + " set to " + ret); + ret = + f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly) + && ret; + LOG.debug("Writable status for " + f + " set to " + ret); + ret = + f.setExecutable(pInfo.executablePermissions, + pInfo.executePermsOwnerOnly) + && ret; + + LOG.debug("Executable status for " + f + " set to " + ret); + return ret; + } + + /** + * Permissions rwxr_xr_x + */ + public static final PermissionsInfo sevenFiveFive = + new PermissionsInfo(true, true, true, false, true, false); + /** + * Completely private permissions + */ + public static final PermissionsInfo sevenZeroZero = + new PermissionsInfo(true, true, true, true, true, true); + } + + // Data-structure for synchronizing localization of user directories. + private Map localizedUsers = + new HashMap(); + + /** + * Initialize the local directories for a particular user on this TT. This + * involves creation and setting permissions of the following directories + *
    + *
  • $mapred.local.dir/taskTracker/$user
  • + *
  • $mapred.local.dir/taskTracker/$user/jobcache
  • + *
  • $mapred.local.dir/taskTracker/$user/distcache
  • + *
+ * + * @param user + * @throws IOException + */ + public void initializeUserDirs(String user) + throws IOException { + + if (user == null) { + // This shouldn't happen in general + throw new IOException( + "User is null. Cannot initialized user-directories."); + } + + AtomicBoolean localizedUser; + synchronized (localizedUsers) { + if (!localizedUsers.containsKey(user)) { + localizedUsers.put(user, new AtomicBoolean(false)); + } + localizedUser = localizedUsers.get(user); + } + + synchronized (localizedUser) { + + if (localizedUser.get()) { + // User-directories are already localized for his user. + LOG.info("User-directories for the user " + user + + " are already initialized on this TT. Not doing anything."); + return; + } + + LOG.info("Initializing user " + user + " on this TT."); + + boolean userDirStatus = false; + boolean jobCacheDirStatus = false; + boolean distributedCacheDirStatus = false; + + for (String localDir : localDirs) { + + Path userDir = new Path(localDir, TaskTracker.getUserDir(user)); + + // Set up the user-directory. + if (fs.exists(userDir) || fs.mkdirs(userDir)) { + + // Set permissions on the user-directory + PermissionsHandler.setPermissions( + new File(userDir.toUri().getPath()), + PermissionsHandler.sevenZeroZero); + userDirStatus = true; + + // Set up the jobcache directory + File jobCacheDir = + new File(localDir, TaskTracker.getJobCacheSubdir(user)); + if (jobCacheDir.exists() || jobCacheDir.mkdirs()) { + // Set permissions on the jobcache-directory + PermissionsHandler.setPermissions(jobCacheDir, + PermissionsHandler.sevenZeroZero); + jobCacheDirStatus = true; + } else { + LOG.warn("Unable to create job cache directory : " + + jobCacheDir.getPath()); + } + + // Set up the cache directory used for distributed cache files + File distributedCacheDir = + new File(localDir, TaskTracker.getDistributedCacheDir(user)); + if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) { + // Set permissions on the distcache-directory + PermissionsHandler.setPermissions(distributedCacheDir, + PermissionsHandler.sevenZeroZero); + distributedCacheDirStatus = true; + } else { + LOG.warn("Unable to create distributed-cache directory : " + + distributedCacheDir.getPath()); + } + } else { + LOG.warn("Unable to create the user directory : " + userDir); + } + } + + if (!userDirStatus) { + throw new IOException("Not able to initialize user directories " + + "in any of the configured local directories for user " + user); + } + if (!jobCacheDirStatus) { + throw new IOException("Not able to initialize job-cache directories " + + "in any of the configured local directories for user " + user); + } + if (!distributedCacheDirStatus) { + throw new IOException( + "Not able to initialize distributed-cache directories " + + "in any of the configured local directories for user " + + user); + } + + // Now, run the task-controller specific code to initialize the + // user-directories. + InitializationContext context = new InitializationContext(); + context.user = user; + context.workDir = null; + taskController.initializeUser(context); + + // Localization of the user is done + localizedUser.set(true); + } + } + + /** + * Prepare the job directories for a given job. To be called by the job + * localization code, only if the job is not already localized. + * + *
+ * Here, we set 700 permissions on the job directories created on all disks. + * This we do so as to avoid any misuse by other users till the time + * {@link TaskController#initializeJob(JobInitializationContext)} is run at a + * later time to set proper private permissions on the job directories.
+ * + * @param user + * @param jobId + * @throws IOException + */ + public void initializeJobDirs(String user, JobID jobId) + throws IOException { + boolean initJobDirStatus = false; + String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString()); + for (String localDir : localDirs) { + Path jobDir = new Path(localDir, jobDirPath); + if (fs.exists(jobDir)) { + // this will happen on a partial execution of localizeJob. Sometimes + // copying job.xml to the local disk succeeds but copying job.jar might + // throw out an exception. We should clean up and then try again. + fs.delete(jobDir, true); + } + + boolean jobDirStatus = fs.mkdirs(jobDir); + if (!jobDirStatus) { + LOG.warn("Not able to create job directory " + jobDir.toString()); + } + + initJobDirStatus = initJobDirStatus || jobDirStatus; + + // job-dir has to be private to the TT + Localizer.PermissionsHandler.setPermissions(new File(jobDir.toUri() + .getPath()), Localizer.PermissionsHandler.sevenZeroZero); + } + + if (!initJobDirStatus) { + throw new IOException("Not able to initialize job directories " + + "in any of the configured local directories for job " + + jobId.toString()); + } + } + + /** + * Create taskDirs on all the disks. Otherwise, in some cases, like when + * LinuxTaskController is in use, child might wish to balance load across + * disks but cannot itself create attempt directory because of the fact that + * job directory is writable only by the TT. + * + * @param user + * @param jobId + * @param attemptId + * @param isCleanupAttempt + * @throws IOException + */ + public void initializeAttemptDirs(String user, String jobId, + String attemptId, boolean isCleanupAttempt) + throws IOException { + + boolean initStatus = false; + String attemptDirPath = + TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt); + + for (String localDir : localDirs) { + Path localAttemptDir = new Path(localDir, attemptDirPath); + + boolean attemptDirStatus = fs.mkdirs(localAttemptDir); + if (!attemptDirStatus) { + LOG.warn("localAttemptDir " + localAttemptDir.toString() + + " couldn't be created."); + } + initStatus = initStatus || attemptDirStatus; + } + + if (!initStatus) { + throw new IOException("Not able to initialize attempt directories " + + "in any of the configured local directories for the attempt " + + attemptId.toString()); + } + } +} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 03:42:38 2011 @@ -24,33 +24,72 @@ import java.io.FileOutputStream; import java.io.IOException; import java.util.Random; +import javax.security.auth.login.LoginException; + import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.DefaultTaskController; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskController; +import org.apache.hadoop.mapred.TaskTracker; +import org.apache.hadoop.mapred.TaskController.InitializationContext; +import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.filecache.TaskDistributedCacheManager; +import org.apache.hadoop.filecache.TrackerDistributedCacheManager; +import org.apache.hadoop.security.UserGroupInformation; public class TestTrackerDistributedCacheManager extends TestCase { - private static final String TEST_LOCAL_DIR_PROP = "test.local.dir"; - private static String TEST_CACHE_BASE_DIR = - new Path(System.getProperty("test.build.data","/tmp/cachebasedir")) - .toString().replace(' ', '+'); - private static String TEST_ROOT_DIR = - System.getProperty("test.build.data", "/tmp/distributedcache"); + + protected String TEST_ROOT_DIR = + new File(System.getProperty("test.build.data", "/tmp"), + TestTrackerDistributedCacheManager.class.getSimpleName()) + .getAbsolutePath(); + + protected File ROOT_MAPRED_LOCAL_DIR; + private static String TEST_CACHE_BASE_DIR; + protected int numLocalDirs = 6; + private static final int TEST_FILE_SIZE = 4 * 1024; // 4K private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K - private Configuration conf; - private Path firstCacheFile; - private Path secondCacheFile; + protected Configuration conf; + protected Path firstCacheFile; + protected Path secondCacheFile; + + protected LocalDirAllocator localDirAllocator = + new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY); @Override protected void setUp() throws IOException { + + // Prepare the tests' root dir + File TEST_ROOT = new File(TEST_ROOT_DIR); + if (!TEST_ROOT.exists()) { + TEST_ROOT.mkdirs(); + } + + // Prepare the tests' mapred-local-dir + ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); + ROOT_MAPRED_LOCAL_DIR.mkdirs(); + String []localDirs = new String[numLocalDirs]; + for (int i = 0; i < numLocalDirs; i++) { + localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath(); + } + + TEST_CACHE_BASE_DIR = + new File(TEST_ROOT_DIR, "cachebasedir").getAbsolutePath(); + conf = new Configuration(); conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT); - conf.set(TEST_LOCAL_DIR_PROP, TEST_ROOT_DIR); + conf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localDirs); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); + + // Create the temporary cache files to be used in the tests. firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile"); secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile"); createTempFile(firstCacheFile); @@ -59,29 +98,43 @@ public class TestTrackerDistributedCache /** * This is the typical flow for using the DistributedCache classes. + * + * @throws IOException + * @throws LoginException */ - public void testManagerFlow() throws IOException { - TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); - LocalDirAllocator localDirAllocator = - new LocalDirAllocator(TEST_LOCAL_DIR_PROP); + public void testManagerFlow() throws IOException, LoginException { + // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(conf); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf); TrackerDistributedCacheManager.determineTimestamps(subConf); + // ****** End of imitating JobClient code Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); FileOutputStream os = new FileOutputStream(new File(jobFile.toString())); subConf.writeXml(os); os.close(); + String userName = getJobOwnerName(); + + // ****** Imitate TaskRunner code. + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf); TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); - handle.setup(localDirAllocator, - new File(new Path(TEST_ROOT_DIR, "workdir").toString()), "distcache"); + File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); + handle.setup(localDirAllocator, workDir, TaskTracker + .getDistributedCacheDir(userName)); + + InitializationContext context = new InitializationContext(); + context.user = userName; + context.workDir = workDir; + getTaskController().initializeDistributedCache(context); + // ****** End of imitating TaskRunner code + Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); assertNotNull(null, localCacheFiles); assertEquals(2, localCacheFiles.length); @@ -94,12 +147,39 @@ public class TestTrackerDistributedCache assertEquals(1, handle.getClassPaths().size()); assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0)); + checkFilePermissions(localCacheFiles); + // Cleanup handle.release(); manager.purgeCache(); assertFalse(pathToFile(cachedFirstFile).exists()); } + /** + * Check proper permissions on the cache files + * + * @param localCacheFiles + * @throws IOException + */ + protected void checkFilePermissions(Path[] localCacheFiles) + throws IOException { + Path cachedFirstFile = localCacheFiles[0]; + Path cachedSecondFile = localCacheFiles[1]; + // Both the files should have executable permissions on them. + assertTrue("First cache file is not executable!", new File(cachedFirstFile + .toUri().getPath()).canExecute()); + assertTrue("Second cache file is not executable!", new File( + cachedSecondFile.toUri().getPath()).canExecute()); + } + + protected TaskController getTaskController() { + return new DefaultTaskController(); + } + + protected String getJobOwnerName() throws LoginException { + UserGroupInformation ugi = UserGroupInformation.login(conf); + return ugi.getUserName(); + } /** test delete cache */ public void testDeleteCache() throws Exception { @@ -122,7 +202,7 @@ public class TestTrackerDistributedCache new Path(TEST_CACHE_BASE_DIR)); assertTrue("DistributedCache failed deleting old" + " cache when the cache store is full.", - dirStatuses.length > 1); + dirStatuses.length == 1); } public void testFileSystemOtherThanDefault() throws Exception { @@ -152,15 +232,16 @@ public class TestTrackerDistributedCache protected void tearDown() throws IOException { new File(firstCacheFile.toString()).delete(); new File(secondCacheFile.toString()).delete(); + FileUtil.fullyDelete(new File(TEST_ROOT_DIR)); } - private void assertFileLengthEquals(Path a, Path b) + protected void assertFileLengthEquals(Path a, Path b) throws FileNotFoundException { assertEquals("File sizes mismatch.", pathToFile(a).length(), pathToFile(b).length()); } - private File pathToFile(Path p) { + protected File pathToFile(Path p) { return new File(p.toString()); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java Fri Mar 4 03:42:38 2011 @@ -22,6 +22,8 @@ import java.io.File; import java.io.IOException; import java.util.UUID; +import javax.security.auth.login.LoginException; + import junit.framework.TestCase; import org.apache.hadoop.fs.FileStatus; @@ -32,6 +34,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.security.UserGroupInformation; /** * Re-runs a map task using the IsolationRunner. @@ -99,15 +102,19 @@ public class TestIsolationRunner extends } private Path getAttemptJobXml(JobConf conf, JobID jobId, boolean isMap) - throws IOException { + throws IOException, LoginException { String taskid = new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString(); return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead( - TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf); + TaskTracker.getTaskConfFile(UserGroupInformation.login(conf) + .getUserName(), jobId.toString(), taskid, false), conf); } - public void testIsolationRunOfMapTask() throws - IOException, InterruptedException, ClassNotFoundException { + public void testIsolationRunOfMapTask() + throws IOException, + InterruptedException, + ClassNotFoundException, + LoginException { MiniMRCluster mr = null; try { mr = new MiniMRCluster(1, "file:///", 4); Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 03:42:38 2011 @@ -22,16 +22,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import javax.security.auth.login.LoginException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskController.JobInitializationContext; -import org.apache.hadoop.mapred.TaskController.TaskControllerContext; -import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; -import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; /** * Test to verify localization of a job and localization of a task on a @@ -45,7 +40,6 @@ public class TestLocalizationWithLinuxTa LogFactory.getLog(TestLocalizationWithLinuxTaskController.class); private File configFile; - private MyLinuxTaskController taskController; private static String taskTrackerSpecialGroup; @@ -66,10 +60,24 @@ public class TestLocalizationWithLinuxTa ClusterWithLinuxTaskController.createTaskControllerConf(path, localDirs); String execPath = path + "/task-controller"; - taskController.setTaskControllerExe(execPath); + ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath); taskTrackerSpecialGroup = getFilePermissionAttrs(execPath)[2]; taskController.setConf(trackerFConf); taskController.setup(); + + tracker.setLocalizer(new Localizer(tracker.localFs, localDirs, + taskController)); + + // Rewrite conf so as to reflect task's correct user name. + String ugi = + System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI); + JobConf jobConf = new JobConf(task.getConf()); + jobConf.setUser(ugi.split(",")[0]); + File jobConfFile = uploadJobConf(jobConf); + // Create the task again to change the job-user + task = + new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1); + task.setConf(jobConf); } @Override @@ -91,75 +99,114 @@ public class TestLocalizationWithLinuxTa } /** + * Test the localization of a user on the TT when {@link LinuxTaskController} + * is in use. + */ + @Override + public void testUserLocalization() + throws IOException { + + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; + } + + super.testJobLocalization(); + } + + @Override + protected void checkUserLocalization() + throws IOException { + // Check the directory structure and permissions + for (String dir : localDirs) { + + File localDir = new File(dir); + assertTrue("mapred.local.dir " + localDir + " isn'task created!", + localDir.exists()); + + File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR); + assertTrue("taskTracker sub-dir in the local-dir " + localDir + + "is not created!", taskTrackerSubDir.exists()); + + File userDir = new File(taskTrackerSubDir, task.getUser()); + assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir + + "is not created!", userDir.exists()); + checkFilePermissions(userDir.getAbsolutePath(), "dr-xrws---", task + .getUser(), taskTrackerSpecialGroup); + + File jobCache = new File(userDir, TaskTracker.JOBCACHE); + assertTrue("jobcache in the userDir " + userDir + " isn't created!", + jobCache.exists()); + checkFilePermissions(jobCache.getAbsolutePath(), "dr-xrws---", task + .getUser(), taskTrackerSpecialGroup); + + // Verify the distributed cache dir. + File distributedCacheDir = + new File(localDir, TaskTracker + .getDistributedCacheDir(task.getUser())); + assertTrue("distributed cache dir " + distributedCacheDir + + " doesn't exists!", distributedCacheDir.exists()); + checkFilePermissions(distributedCacheDir.getAbsolutePath(), + "dr-xrws---", task.getUser(), taskTrackerSpecialGroup); + } + } + + /** * Test job localization with {@link LinuxTaskController}. Also check the * permissions and file ownership of the job related files. */ @Override public void testJobLocalization() - throws IOException, - LoginException { + throws IOException { if (!ClusterWithLinuxTaskController.shouldRun()) { return; } - // Do job localization - JobConf localizedJobConf = tracker.localizeJobFiles(task); - - String ugi = - System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI); - localizedJobConf.setUser(ugi.split(",")[0]); - - // Now initialize the job via task-controller so as to set - // ownership/permissions of jars, job-work-dir - JobInitializationContext context = new JobInitializationContext(); - context.jobid = jobId; - context.user = localizedJobConf.getUser(); - context.workDir = - new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); - - // /////////// The method being tested - taskController.initializeJob(context); - // /////////// + super.testJobLocalization(); + } + @Override + protected void checkJobLocalization() + throws IOException { for (String localDir : trackerFConf.getStrings("mapred.local.dir")) { File jobDir = - new File(localDir, TaskTracker.getLocalJobDir(jobId.toString())); + new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId + .toString())); // check the private permissions on the job directory - checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---", - localizedJobConf.getUser(), taskTrackerSpecialGroup); + checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---", task + .getUser(), taskTrackerSpecialGroup); } // check the private permissions of various directories List dirs = new ArrayList(); Path jarsDir = - lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId - .toString()), trackerFConf); + lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(), + jobId.toString()), trackerFConf); dirs.add(jarsDir); dirs.add(new Path(jarsDir, "lib")); for (Path dir : dirs) { checkFilePermissions(dir.toUri().getPath(), "dr-xrws---", - localizedJobConf.getUser(), taskTrackerSpecialGroup); + task.getUser(), taskTrackerSpecialGroup); } // job-work dir needs user writable permissions Path jobWorkDir = - lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId - .toString()), trackerFConf); - checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", - localizedJobConf.getUser(), taskTrackerSpecialGroup); + lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), + jobId.toString()), trackerFConf); + checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task + .getUser(), taskTrackerSpecialGroup); // check the private permissions of various files List files = new ArrayList(); - files.add(lDirAlloc.getLocalPathToRead(TaskTracker - .getLocalJobConfFile(jobId.toString()), trackerFConf)); - files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId - .toString()), trackerFConf)); + files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile( + task.getUser(), jobId.toString()), trackerFConf)); + files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task + .getUser(), jobId.toString()), trackerFConf)); files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar")); files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar")); for (Path file : files) { - checkFilePermissions(file.toUri().getPath(), "-r-xrwx---", - localizedJobConf.getUser(), taskTrackerSpecialGroup); + checkFilePermissions(file.toUri().getPath(), "-r-xrwx---", task + .getUser(), taskTrackerSpecialGroup); } } @@ -169,73 +216,50 @@ public class TestLocalizationWithLinuxTa */ @Override public void testTaskLocalization() - throws IOException, - LoginException { + throws IOException { if (!ClusterWithLinuxTaskController.shouldRun()) { return; } - JobConf localizedJobConf = tracker.localizeJobFiles(task); - String ugi = - System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI); - localizedJobConf.setUser(ugi.split(",")[0]); - - // Now initialize the job via task-controller so as to set - // ownership/permissions of jars, job-work-dir - JobInitializationContext jobContext = new JobInitializationContext(); - jobContext.jobid = jobId; - jobContext.user = localizedJobConf.getUser(); - jobContext.workDir = - new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); - taskController.initializeJob(jobContext); - - TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf); - tip.setJobConf(localizedJobConf); - - // localize the task. - tip.localizeTask(task); - TaskRunner runner = task.createRunner(tracker, tip); - runner.setupChildTaskConfiguration(lDirAlloc); - Path workDir = - lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task - .getJobID().toString(), task.getTaskID().toString(), task - .isTaskCleanupTask()), trackerFConf); - TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()), - localizedJobConf); - File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID()); - - // Initialize task - TaskControllerContext taskContext = - new TaskController.TaskControllerContext(); - taskContext.env = - new JvmEnv(null, null, null, null, -1, new File(localizedJobConf - .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf); - taskContext.task = task; - // /////////// The method being tested - taskController.initializeTask(taskContext); - // /////////// + super.testTaskLocalization(); + } + @Override + protected void checkTaskLocalization() + throws IOException { // check the private permissions of various directories List dirs = new ArrayList(); - dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId - .toString(), taskId.toString()), trackerFConf)); - dirs.add(workDir); - dirs.add(new Path(workDir, "tmp")); - dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath())); + dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task + .getUser(), jobId.toString(), taskId.toString()), trackerFConf)); + dirs.add(attemptWorkDir); + dirs.add(new Path(attemptWorkDir, "tmp")); + dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath())); for (Path dir : dirs) { checkFilePermissions(dir.toUri().getPath(), "drwxrws---", - localizedJobConf.getUser(), taskTrackerSpecialGroup); + task.getUser(), taskTrackerSpecialGroup); } // check the private permissions of various files List files = new ArrayList(); files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task - .getJobID().toString(), task.getTaskID().toString(), task - .isTaskCleanupTask()), trackerFConf)); + .getUser(), task.getJobID().toString(), task.getTaskID().toString(), + task.isTaskCleanupTask()), trackerFConf)); for (Path file : files) { - checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", - localizedJobConf.getUser(), taskTrackerSpecialGroup); + checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task + .getUser(), taskTrackerSpecialGroup); + } + } + + /** + * Test cleanup of task files with {@link LinuxTaskController}. + */ + @Override + public void testTaskCleanup() + throws IOException { + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; } + super.testTaskCleanup(); } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 03:42:38 2011 @@ -27,6 +27,8 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import javax.security.auth.login.LoginException; + import junit.framework.TestCase; import org.apache.commons.logging.Log; @@ -39,6 +41,8 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.util.StringUtils; /** * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS. @@ -118,55 +122,108 @@ public class TestMiniMRWithDFS extends T } return result.toString(); } - + /** * Make sure that there are exactly the directories that we expect to find. + * + *
+ *
+ * + * For e.g., if we want to check the existence of *only* the directories for + * user1's tasks job1-attempt1, job1-attempt2, job2-attempt1, we pass user1 as + * user, {job1, job1, job2, job3} as jobIds and {attempt1, attempt2, attempt1, + * attempt3} as taskDirs. + * * @param mr the map-reduce cluster + * @param user the name of the job-owner + * @param jobIds the list of jobs * @param taskDirs the task ids that should be present */ - static void checkTaskDirectories(MiniMRCluster mr, - String[] jobIds, - String[] taskDirs) { + static void checkTaskDirectories(MiniMRCluster mr, String user, + String[] jobIds, String[] taskDirs) { + mr.waitUntilIdle(); int trackers = mr.getNumTaskTrackers(); - List neededDirs = new ArrayList(Arrays.asList(taskDirs)); - boolean[] found = new boolean[taskDirs.length]; - for(int i=0; i < trackers; ++i) { - int numNotDel = 0; + + List observedJobDirs = new ArrayList(); + List observedFilesInsideJobDir = new ArrayList(); + + for (int i = 0; i < trackers; ++i) { + + // Verify that mapred-local-dir and it's direct contents are valid File localDir = new File(mr.getTaskTrackerLocalDir(i)); - LOG.debug("Tracker directory: " + localDir); - File trackerDir = new File(localDir, TaskTracker.SUBDIR); - assertTrue("local dir " + localDir + " does not exist.", - localDir.isDirectory()); - assertTrue("task tracker dir " + trackerDir + " does not exist.", - trackerDir.isDirectory()); - String contents[] = localDir.list(); - String trackerContents[] = trackerDir.list(); - for(int j=0; j < contents.length; ++j) { - System.out.println("Local " + localDir + ": " + contents[j]); - } - for(int j=0; j < trackerContents.length; ++j) { - System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]); - } - for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) { - String name = contents[fileIdx]; - if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) { - LOG.debug("Looking at " + name); - assertTrue("Spurious directory " + name + " found in " + - localDir, false); + assertTrue("Local dir " + localDir + " does not exist.", localDir + .isDirectory()); + LOG.info("Verifying contents of mapred.local.dir " + + localDir.getAbsolutePath()); + + // Verify contents(user-dir) of tracker-sub-dir + File trackerSubDir = new File(localDir, TaskTracker.SUBDIR); + if (trackerSubDir.isDirectory()) { + + // Verify contents of user-dir and populate the job-dirs/attempt-dirs + // lists + File userDir = new File(trackerSubDir, user); + if (userDir.isDirectory()) { + LOG.info("Verifying contents of user-dir " + + userDir.getAbsolutePath()); + verifyContents(new String[] { TaskTracker.JOBCACHE, + TaskTracker.DISTCACHEDIR }, userDir.list()); + + File jobCacheDir = + new File(localDir, TaskTracker.getJobCacheSubdir(user)); + String[] jobDirs = jobCacheDir.list(); + observedJobDirs.addAll(Arrays.asList(jobDirs)); + + for (String jobDir : jobDirs) { + String[] attemptDirs = new File(jobCacheDir, jobDir).list(); + observedFilesInsideJobDir.addAll(Arrays.asList(attemptDirs)); + } } } - for (int idx = 0; idx < neededDirs.size(); ++idx) { - String name = neededDirs.get(idx); - if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE), - jobIds[idx]), name).isDirectory()) { - found[idx] = true; - numNotDel++; - } + } + + // Now verify that only expected job-dirs and attempt-dirs are present. + LOG.info("Verifying the list of job directories"); + verifyContents(jobIds, observedJobDirs.toArray(new String[observedJobDirs + .size()])); + LOG.info("Verifying the list of task directories"); + // All taskDirs should be present in the observed list. Other files like + // job.xml etc may be present too, we are not checking them here. + for (int j = 0; j < taskDirs.length; j++) { + assertTrue( + "Expected task-directory " + taskDirs[j] + " is not present!", + observedFilesInsideJobDir.contains(taskDirs[j])); + } + } + + /** + * Check the list of expectedFiles against the list of observedFiles and make + * sure they both are the same. Duplicates can be present in either of the + * lists and all duplicate entries are treated as a single entity. + * + * @param expectedFiles + * @param observedFiles + */ + private static void verifyContents(String[] expectedFiles, + String[] observedFiles) { + boolean[] foundExpectedFiles = new boolean[expectedFiles.length]; + boolean[] validObservedFiles = new boolean[observedFiles.length]; + for (int j = 0; j < observedFiles.length; ++j) { + for (int k = 0; k < expectedFiles.length; ++k) { + if (expectedFiles[k].equals(observedFiles[j])) { + foundExpectedFiles[k] = true; + validObservedFiles[j] = true; + } } } - for(int i=0; i< found.length; i++) { - assertTrue("Directory " + taskDirs[i] + " not found", found[i]); + for (int j = 0; j < foundExpectedFiles.length; j++) { + assertTrue("Expected file " + expectedFiles[j] + " not found", + foundExpectedFiles[j]); + } + for (int j = 0; j < validObservedFiles.length; j++) { + assertTrue("Unexpected file " + observedFiles[j] + " found", + validObservedFiles[j]); } } @@ -176,7 +233,16 @@ public class TestMiniMRWithDFS extends T NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue(); double error = Math.abs(Math.PI - estimate); assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01)); - checkTaskDirectories(mr, new String[]{}, new String[]{}); + String userName = jobconf.getUser(); + if (userName == null) { + try { + userName = UnixUserGroupInformation.login(jobconf).getUserName(); + } catch (LoginException le) { + throw new IOException("Cannot get the login username : " + + StringUtils.stringifyException(le)); + } + } + checkTaskDirectories(mr, userName, new String[] {}, new String[] {}); } public static void runWordCount(MiniMRCluster mr, JobConf jobConf) @@ -195,9 +261,19 @@ public class TestMiniMRWithDFS extends T assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output); JobID jobid = result.job.getID(); - TaskAttemptID taskid = new TaskAttemptID(new TaskID(jobid, true, 1),0); - checkTaskDirectories(mr, new String[]{jobid.toString()}, - new String[]{taskid.toString()}); + TaskAttemptID taskid = new TaskAttemptID( + new TaskID(jobid, true, 1),0); + String userName = jobConf.getUser(); + if (userName == null) { + try { + userName = UnixUserGroupInformation.login(jobConf).getUserName(); + } catch (LoginException le) { + throw new IOException("Cannot get the login username : " + + StringUtils.stringifyException(le)); + } + } + checkTaskDirectories(mr, userName, new String[] { jobid.toString() }, + new String[] { taskid.toString() }); // test with maps=0 jobConf = mr.createJobConf(); input = "owen is oom"; Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Fri Mar 4 03:42:38 2011 @@ -128,6 +128,10 @@ public class TestMiniMRWithDFSWithDistin Path outDir = new Path("/testing/distinct/output"); TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, input, 2, 1, inDir, outDir); + + job1 = createJobConf(job1, PI_UGI); + runJobAsUser(job1, PI_UGI); + JobConf job2 = mr.createJobConf(); Path inDir2 = new Path("/testing/distinct/input2"); Path outDir2 = new Path("/testing/distinct/output2"); @@ -135,8 +139,6 @@ public class TestMiniMRWithDFSWithDistin input, 2, 1, inDir2, outDir2); job2 = createJobConf(job2, WC_UGI); runJobAsUser(job2, WC_UGI); - JobConf wc = createJobConf(mr, WC_UGI); - TestMiniMRWithDFS.runWordCount(mr, wc); } finally { if (dfs != null) { dfs.shutdown(); } if (mr != null) { mr.shutdown();} Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Mar 4 03:42:38 2011 @@ -482,8 +482,8 @@ public class TestQueueManager extends Te try { conf.set("mapred.job.tracker", "localhost:" + miniMRCluster.getJobTrackerPort()); - JobClient jc = new JobClient(conf); - jc.getJob(rjob.getJobID()).killJob(); + JobClient client = new JobClient(miniMRCluster.createJobConf()); + client.getJob(rjob.getID()).killJob(); if (!shouldSucceed) { fail("should fail kill operation"); } @@ -524,8 +524,8 @@ public class TestQueueManager extends Te try { conf.set("mapred.job.tracker", "localhost:" + miniMRCluster.getJobTrackerPort()); - JobClient jc = new JobClient(conf); - jc.getJob(rjob.getJobID()).setJobPriority("VERY_LOW"); + JobClient client = new JobClient(miniMRCluster.createJobConf()); + client.getJob(rjob.getID()).setJobPriority("VERY_LOW"); if (!shouldSucceed) { fail("changing priority should fail."); } @@ -605,6 +605,8 @@ public class TestQueueManager extends Te if (shouldComplete) { rJob = JobClient.runJob(jc); } else { + // Job should be submitted as 'userInfo'. So both the client as well as + // the configuration should point to the same UGI. rJob = new JobClient(jc).submitJob(jc); } return rJob; Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077116&r1=1077115&r2=1077116&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:42:38 2011 @@ -18,21 +18,27 @@ package org.apache.hadoop.mapred; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.jar.JarOutputStream; import java.util.zip.ZipEntry; -import javax.security.auth.login.LoginException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.mapred.JvmManager.JvmEnv; +import org.apache.hadoop.mapred.TaskController.JobInitializationContext; +import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import junit.framework.TestCase; @@ -53,20 +59,53 @@ public class TestTaskTrackerLocalization LogFactory.getLog(TestTaskTrackerLocalization.class); protected TaskTracker tracker; + protected UserGroupInformation taskTrackerUGI; + protected TaskController taskController; protected JobConf trackerFConf; + private JobConf localizedJobConf; protected JobID jobId; protected TaskAttemptID taskId; protected Task task; protected String[] localDirs; protected static LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); + protected Path attemptWorkDir; + protected File[] attemptLogFiles; + protected JobConf localizedTaskConf; + + class InlineCleanupQueue extends CleanupQueue { + List stalePaths = new ArrayList(); + + public InlineCleanupQueue() { + // do nothing + } + + @Override + public void addToQueue(FileSystem fs, Path... paths) { + // delete in-line + for (Path p : paths) { + try { + LOG.info("Trying to delete the path " + p); + if (!fs.delete(p, true)) { + LOG.warn("Stale path " + p.toUri().getPath()); + stalePaths.add(p); + } + } catch (IOException e) { + LOG.warn("Caught exception while deleting path " + + p.toUri().getPath()); + LOG.info(StringUtils.stringifyException(e)); + stalePaths.add(p); + } + } + } + } @Override protected void setUp() throws Exception { TEST_ROOT_DIR = - new File(System.getProperty("test.build.data", "/tmp"), - "testTaskTrackerLocalization"); + new File(System.getProperty("test.build.data", "/tmp"), getClass() + .getSimpleName()); if (!TEST_ROOT_DIR.exists()) { TEST_ROOT_DIR.mkdirs(); } @@ -86,30 +125,27 @@ public class TestTaskTrackerLocalization } trackerFConf.setStrings("mapred.local.dir", localDirs); - // Create the job jar file - File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar"); - JarOutputStream jstream = - new JarOutputStream(new FileOutputStream(jobJarFile)); - ZipEntry ze = new ZipEntry("lib/lib1.jar"); - jstream.putNextEntry(ze); - jstream.closeEntry(); - ze = new ZipEntry("lib/lib2.jar"); - jstream.putNextEntry(ze); - jstream.closeEntry(); - jstream.finish(); - jstream.close(); - trackerFConf.setJar(jobJarFile.toURI().toString()); + // Create the job configuration file. Same as trackerConf in this test. + JobConf jobConf = trackerFConf; - // Create the job configuration file - File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml"); - FileOutputStream out = new FileOutputStream(jobConfFile); - trackerFConf.writeXml(out); - out.close(); + // JobClient sets the job credentials. + new JobClient().setUGIAndUserGroupNames(jobConf); + + // JobClient uploads the job jar to the file system and sets it in the + // jobConf. + uploadJobJar(jobConf); + + // JobClient uploads the jobConf to the file system. + File jobConfFile = uploadJobConf(jobConf); // Set up the TaskTracker tracker = new TaskTracker(); tracker.setConf(trackerFConf); - tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case + + // for test case system FS is the local FS + tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf); + + taskTrackerUGI = UserGroupInformation.login(trackerFConf); // Set up the task to be localized String jtIdentifier = "200907202331"; @@ -118,12 +154,53 @@ public class TestTaskTrackerLocalization new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0); task = new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1); + task.setConf(jobConf); // Set conf. Set user name in particular. - TaskController taskController = new DefaultTaskController(); + taskController = new DefaultTaskController(); taskController.setConf(trackerFConf); taskController.setup(); + tracker.setLocalizer(new Localizer(tracker.localFs, localDirs, + taskController)); } + /** + * @param jobConf + * @throws IOException + * @throws FileNotFoundException + */ + private void uploadJobJar(JobConf jobConf) + throws IOException, + FileNotFoundException { + File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar"); + JarOutputStream jstream = + new JarOutputStream(new FileOutputStream(jobJarFile)); + ZipEntry ze = new ZipEntry("lib/lib1.jar"); + jstream.putNextEntry(ze); + jstream.closeEntry(); + ze = new ZipEntry("lib/lib2.jar"); + jstream.putNextEntry(ze); + jstream.closeEntry(); + jstream.finish(); + jstream.close(); + jobConf.setJar(jobJarFile.toURI().toString()); + } + + /** + * @param jobConf + * @return + * @throws FileNotFoundException + * @throws IOException + */ + protected File uploadJobConf(JobConf jobConf) + throws FileNotFoundException, + IOException { + File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml"); + FileOutputStream out = new FileOutputStream(jobConfFile); + jobConf.writeXml(out); + out.close(); + return jobConfFile; + } + @Override protected void tearDown() throws Exception { @@ -145,71 +222,71 @@ public class TestTaskTrackerLocalization assertTrue("Path " + path + " has the permissions " + attrs[0] + " instead of the expected " + expectedPermissions, attrs[0] .equals(expectedPermissions)); - assertTrue("Path " + path + " is not user owned not by " - + expectedOwnerUser + " but by " + attrs[1], attrs[1] - .equals(expectedOwnerUser)); - assertTrue("Path " + path + " is not group owned not by " - + expectedOwnerGroup + " but by " + attrs[2], attrs[2] - .equals(expectedOwnerGroup)); + assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser + + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser)); + assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup + + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup)); } /** * Verify the task-controller's setup functionality * * @throws IOException - * @throws LoginException */ public void testTaskControllerSetup() - throws IOException, - LoginException { + throws IOException { // Task-controller is already set up in the test's setup method. Now verify. - UserGroupInformation ugi = UserGroupInformation.login(new JobConf()); for (String localDir : localDirs) { // Verify the local-dir itself. File lDir = new File(localDir); assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists()); - checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi - .getUserName(), ugi.getGroupNames()[0]); - - // Verify the distributed cache dir. - File distributedCacheDir = - new File(localDir, TaskTracker.getDistributedCacheDir()); - assertTrue("distributed cache dir " + distributedCacheDir - + " doesn't exists!", distributedCacheDir.exists()); - checkFilePermissions(distributedCacheDir.getAbsolutePath(), - "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]); - - // Verify the job cache dir. - File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir()); - assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!", - jobCacheDir.exists()); - checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi - .getUserName(), ugi.getGroupNames()[0]); + checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task + .getUser(), taskTrackerUGI.getGroupNames()[0]); } // Verify the pemissions on the userlogs dir File taskLog = TaskLog.getUserLogDir(); - checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi - .getUserName(), ugi.getGroupNames()[0]); + checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task + .getUser(), taskTrackerUGI.getGroupNames()[0]); } /** - * Test job localization on a TT. Tests localization of job.xml, job.jar and - * corresponding setting of configuration. + * Test the localization of a user on the TT. * * @throws IOException - * @throws LoginException */ - public void testJobLocalization() - throws IOException, - LoginException { + public void testUserLocalization() + throws IOException { // /////////// The main method being tested - JobConf localizedJobConf = tracker.localizeJobFiles(task); + tracker.getLocalizer().initializeUserDirs(task.getUser()); // /////////// - // Check the directory structure + // Check the directory structure and permissions + checkUserLocalization(); + + // For the sake of testing re-entrancy of initializeUserDirs(), we remove + // the user directories now and make sure that further calls of the method + // don't create directories any more. + for (String dir : localDirs) { + File userDir = new File(dir, TaskTracker.getUserDir(task.getUser())); + FileUtil.fullyDelete(userDir); + } + + // Now call the method again. + tracker.getLocalizer().initializeUserDirs(task.getUser()); + + // Files should not be created now and so shouldn't be there anymore. + for (String dir : localDirs) { + File userDir = new File(dir, TaskTracker.getUserDir(task.getUser())); + assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath() + + " exists!", userDir.exists()); + } + } + + protected void checkUserLocalization() + throws IOException { for (String dir : localDirs) { File localDir = new File(dir); @@ -220,31 +297,87 @@ public class TestTaskTrackerLocalization assertTrue("taskTracker sub-dir in the local-dir " + localDir + "is not created!", taskTrackerSubDir.exists()); - File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE); - assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir - + " isn'task created!", jobCache.exists()); + File userDir = new File(taskTrackerSubDir, task.getUser()); + assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir + + "is not created!", userDir.exists()); + checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task + .getUser(), taskTrackerUGI.getGroupNames()[0]); + + File jobCache = new File(userDir, TaskTracker.JOBCACHE); + assertTrue("jobcache in the userDir " + userDir + " isn't created!", + jobCache.exists()); + checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task + .getUser(), taskTrackerUGI.getGroupNames()[0]); + + // Verify the distributed cache dir. + File distributedCacheDir = + new File(localDir, TaskTracker + .getDistributedCacheDir(task.getUser())); + assertTrue("distributed cache dir " + distributedCacheDir + + " doesn't exists!", distributedCacheDir.exists()); + checkFilePermissions(distributedCacheDir.getAbsolutePath(), + "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]); + } + } + + /** + * Test job localization on a TT. Tests localization of job.xml, job.jar and + * corresponding setting of configuration. Also test + * {@link TaskController#initializeJob(JobInitializationContext)} + * + * @throws IOException + */ + public void testJobLocalization() + throws IOException { + + tracker.getLocalizer().initializeUserDirs(task.getUser()); + + // /////////// The main method being tested + localizedJobConf = tracker.localizeJobFiles(task); + // /////////// + + // Now initialize the job via task-controller so as to set + // ownership/permissions of jars, job-work-dir + JobInitializationContext context = new JobInitializationContext(); + context.jobid = jobId; + context.user = task.getUser(); + context.workDir = + new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); + + // /////////// The method being tested + taskController.initializeJob(context); + // /////////// + + checkJobLocalization(); + } + + protected void checkJobLocalization() + throws IOException { + // Check the directory structure + for (String dir : localDirs) { + + File localDir = new File(dir); + File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR); + File userDir = new File(taskTrackerSubDir, task.getUser()); + File jobCache = new File(userDir, TaskTracker.JOBCACHE); File jobDir = new File(jobCache, jobId.toString()); - assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir - .exists()); + assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists()); // check the private permissions on the job directory - UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf); - checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi - .getUserName(), ugi.getGroupNames()[0]); + checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task + .getUser(), taskTrackerUGI.getGroupNames()[0]); } // check the localization of job.xml - LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir"); - assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc - .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()), - trackerFConf) != null); + .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(), + jobId.toString()), trackerFConf) != null); // check the localization of job.jar Path jarFileLocalized = - lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId - .toString()), trackerFConf); + lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(), + jobId.toString()), trackerFConf); assertTrue("job.jar is not localized on this TaskTracker!!", jarFileLocalized != null); assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File( @@ -256,8 +389,8 @@ public class TestTaskTrackerLocalization // check the creation of job work directory assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc - .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()), - trackerFConf) != null); + .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId + .toString()), trackerFConf) != null); // Check the setting of job.local.dir and job.jar which will eventually be // used by the user's task @@ -267,11 +400,11 @@ public class TestTaskTrackerLocalization String localizedJobJar = localizedJobConf.getJar(); for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) { if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR - + TaskTracker.getJobWorkDir(jobId.toString()))) { + + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) { jobLocalDirFlag = true; } if (localizedJobJar.equals(localDir + Path.SEPARATOR - + TaskTracker.getJobJarFile(jobId.toString()))) { + + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) { mapredJarFlag = true; } } @@ -287,13 +420,21 @@ public class TestTaskTrackerLocalization * Test task localization on a TT. * * @throws IOException - * @throws LoginException */ public void testTaskLocalization() - throws IOException, - LoginException { + throws IOException { + + tracker.getLocalizer().initializeUserDirs(task.getUser()); + localizedJobConf = tracker.localizeJobFiles(task); - JobConf localizedJobConf = tracker.localizeJobFiles(task); + // Now initialize the job via task-controller so as to set + // ownership/permissions of jars, job-work-dir + JobInitializationContext jobContext = new JobInitializationContext(); + jobContext.jobid = jobId; + jobContext.user = task.getUser(); + jobContext.workDir = + new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); + taskController.initializeJob(jobContext); TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf); tip.setJobConf(localizedJobConf); @@ -304,77 +445,194 @@ public class TestTaskTrackerLocalization // check the functionality of localizeTask for (String dir : trackerFConf.getStrings("mapred.local.dir")) { - assertTrue("attempt-dir in localDir " + dir + " is not created!!", - new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId - .toString())).exists()); + File attemptDir = + new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId + .toString(), taskId.toString())); + assertTrue("attempt-dir " + attemptDir + " in localDir " + dir + + " is not created!!", attemptDir.exists()); } - Path workDir = - lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task - .getJobID().toString(), task.getTaskID().toString(), task - .isTaskCleanupTask()), trackerFConf); + attemptWorkDir = + lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir( + task.getUser(), task.getJobID().toString(), task.getTaskID() + .toString(), task.isTaskCleanupTask()), trackerFConf); assertTrue("atttempt work dir for " + taskId.toString() - + " is not created in any of the configured dirs!!", workDir != null); + + " is not created in any of the configured dirs!!", + attemptWorkDir != null); TaskRunner runner = task.createRunner(tracker, tip); // /////// Few more methods being tested runner.setupChildTaskConfiguration(lDirAlloc); - TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()), + TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()), localizedJobConf); - File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID()); - // /////// + attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID()); // Make sure the task-conf file is created Path localTaskFile = lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task - .getJobID().toString(), task.getTaskID().toString(), task - .isTaskCleanupTask()), trackerFConf); + .getUser(), task.getJobID().toString(), task.getTaskID() + .toString(), task.isTaskCleanupTask()), trackerFConf); assertTrue("Task conf file " + localTaskFile.toString() + " is not created!!", new File(localTaskFile.toUri().getPath()) .exists()); // /////// One more method being tested. This happens in child space. - JobConf localizedTaskConf = new JobConf(localTaskFile); + localizedTaskConf = new JobConf(localTaskFile); TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf); // /////// + // Initialize task via TaskController + TaskControllerContext taskContext = + new TaskController.TaskControllerContext(); + taskContext.env = + new JvmEnv(null, null, null, null, -1, new File(localizedJobConf + .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf); + taskContext.task = task; + // /////////// The method being tested + taskController.initializeTask(taskContext); + // /////////// + + checkTaskLocalization(); + } + + protected void checkTaskLocalization() + throws IOException { // Make sure that the mapred.local.dir is sandboxed for (String childMapredLocalDir : localizedTaskConf .getStrings("mapred.local.dir")) { assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!", - childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId - .toString(), taskId.toString(), false))); + childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task + .getUser(), jobId.toString(), taskId.toString(), false))); } // Make sure task task.getJobFile is changed and pointed correctly. assertTrue(task.getJobFile().endsWith( - TaskTracker - .getTaskConfFile(jobId.toString(), taskId.toString(), false))); + TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId + .toString(), false))); // Make sure that the tmp directories are created assertTrue("tmp dir is not created in workDir " - + workDir.toUri().getPath(), - new File(workDir.toUri().getPath(), "tmp").exists()); + + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri() + .getPath(), "tmp").exists()); - // Make sure that the log are setup properly + // Make sure that the logs are setup properly File logDir = new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR + task.getTaskID().toString()); assertTrue("task's log dir " + logDir.toString() + " doesn't exist!", logDir.exists()); - UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf); - checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi - .getUserName(), ugi.getGroupNames()[0]); + checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task + .getUser(), taskTrackerUGI.getGroupNames()[0]); File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString()); assertTrue("stdout log file is improper. Expected : " - + expectedStdout.toString() + " Observed : " + logFiles[0].toString(), - expectedStdout.toString().equals(logFiles[0].toString())); + + expectedStdout.toString() + " Observed : " + + attemptLogFiles[0].toString(), expectedStdout.toString().equals( + attemptLogFiles[0].toString())); File expectedStderr = new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString()); assertTrue("stderr log file is improper. Expected : " - + expectedStderr.toString() + " Observed : " + logFiles[1].toString(), - expectedStderr.toString().equals(logFiles[1].toString())); + + expectedStderr.toString() + " Observed : " + + attemptLogFiles[1].toString(), expectedStderr.toString().equals( + attemptLogFiles[1].toString())); + } + + /** + * @throws IOException + */ + public void testTaskCleanup() + throws IOException { + + // Localize job and localize task. + tracker.getLocalizer().initializeUserDirs(task.getUser()); + localizedJobConf = tracker.localizeJobFiles(task); + // Now initialize the job via task-controller so as to set + // ownership/permissions of jars, job-work-dir + JobInitializationContext jobContext = new JobInitializationContext(); + jobContext.jobid = jobId; + jobContext.user = localizedJobConf.getUser(); + jobContext.workDir = + new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR)); + taskController.initializeJob(jobContext); + TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf); + tip.setJobConf(localizedJobConf); + tip.localizeTask(task); + Path workDir = + lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir( + task.getUser(), task.getJobID().toString(), task.getTaskID() + .toString(), task.isTaskCleanupTask()), trackerFConf); + TaskRunner runner = task.createRunner(tracker, tip); + tip.setTaskRunner(runner); + runner.setupChildTaskConfiguration(lDirAlloc); + TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()), + localizedJobConf); + TaskRunner.prepareLogFiles(task.getTaskID()); + Path localTaskFile = + lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task + .getUser(), task.getJobID().toString(), task.getTaskID() + .toString(), task.isTaskCleanupTask()), trackerFConf); + JobConf localizedTaskConf = new JobConf(localTaskFile); + TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf); + TaskControllerContext taskContext = + new TaskController.TaskControllerContext(); + taskContext.env = + new JvmEnv(null, null, null, null, -1, new File(localizedJobConf + .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf); + taskContext.task = task; + // /////////// The method being tested + taskController.initializeTask(taskContext); + + // TODO: Let the task run and create files. + + InlineCleanupQueue cleanupQueue = new InlineCleanupQueue(); + tracker.directoryCleanupThread = cleanupQueue; + + // ////////// The central methods being tested + tip.removeTaskFiles(true, taskId); + tracker.removeJobFiles(task.getUser(), jobId.toString()); + // ////////// + + // TODO: make sure that all files intended to be deleted are deleted. + + assertTrue("Some task files are not deleted!! Number of stale paths is " + + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); + + // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still + // there. + for (String localDir : localDirs) { + Path userDir = + new Path(localDir, TaskTracker.getUserDir(task.getUser())); + assertTrue("User directory " + userDir + " is not present!!", + tracker.localFs.exists(userDir)); + } + + // Test userlogs cleanup. + verifyUserLogsCleanup(); + } + + /** + * Test userlogs cleanup. + * + * @throws IOException + */ + private void verifyUserLogsCleanup() + throws IOException { + Path logDir = + new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME + + Path.SEPARATOR + task.getTaskID().toString()); + + // Logs should be there before cleanup. + assertTrue("Userlogs dir " + logDir + " is not presen as expected!!", + tracker.localFs.exists(logDir)); + + // ////////// Another being tested + TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file + // modification time behind retainTimeStatmp + // ////////// + + // Logs should be gone after cleanup. + assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!", + tracker.localFs.exists(logDir)); } } Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077116&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar 4 03:42:38 2011 @@ -0,0 +1,186 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.File; +import java.io.IOException; + +import javax.security.auth.login.LoginException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; +import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager; + +/** + * Test the DistributedCacheManager when LinuxTaskController is used. + * + */ +public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends + TestTrackerDistributedCacheManager { + + private File configFile; + private MyLinuxTaskController taskController; + private String taskTrackerSpecialGroup; + + private static final Log LOG = + LogFactory + .getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class); + + @Override + protected void setUp() + throws IOException { + + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; + } + + TEST_ROOT_DIR = + new File(System.getProperty("test.build.data", "/tmp"), + TestTrackerDistributedCacheManagerWithLinuxTaskController.class + .getSimpleName()).getAbsolutePath(); + + super.setUp(); + + taskController = new MyLinuxTaskController(); + String path = + System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); + configFile = + ClusterWithLinuxTaskController.createTaskControllerConf(path, conf + .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); + String execPath = path + "/task-controller"; + taskController.setTaskControllerExe(execPath); + taskController.setConf(conf); + taskController.setup(); + + taskTrackerSpecialGroup = + TestTaskTrackerLocalization.getFilePermissionAttrs(execPath)[2]; + } + + @Override + protected void tearDown() + throws IOException { + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; + } + if (configFile != null) { + configFile.delete(); + } + super.tearDown(); + } + + /** + * Test the control flow of distributed cache manager when LinuxTaskController + * is used. + */ + @Override + public void testManagerFlow() + throws IOException, + LoginException { + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; + } + + super.testManagerFlow(); + } + + @Override + protected String getJobOwnerName() { + String ugi = + System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI); + String userName = ugi.split(",")[0]; + return userName; + } + + @Override + protected TaskController getTaskController() { + return taskController; + } + + @Override + protected void checkFilePermissions(Path[] localCacheFiles) + throws IOException { + String cachedFirstFile = localCacheFiles[0].toUri().getPath(); + String cachedSecondFile = localCacheFiles[1].toUri().getPath(); + String userName = getJobOwnerName(); + + // First make sure that the cache files have proper permissions. + TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile, + "-r-xrwx---", userName, taskTrackerSpecialGroup); + TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile, + "-r-xrwx---", userName, taskTrackerSpecialGroup); + + // Now. make sure that all the path components also have proper + // permissions. + checkPermissionOnPathComponents(cachedFirstFile, userName); + checkPermissionOnPathComponents(cachedSecondFile, userName); + } + + /** + * @param cachedFilePath + * @param userName + * @throws IOException + */ + private void checkPermissionOnPathComponents(String cachedFilePath, + String userName) + throws IOException { + // The trailing distcache/file/... string + String trailingStringForFirstFile = + cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath() + + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]" + + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName), + ""); + LOG.info("Leading path for cacheFirstFile is : " + + trailingStringForFirstFile); + // The leading mapred.local.dir/0_[0-n]/taskTracker/$user string. + String leadingStringForFirstFile = + cachedFilePath.substring(0, cachedFilePath + .lastIndexOf(trailingStringForFirstFile)); + LOG.info("Leading path for cacheFirstFile is : " + + leadingStringForFirstFile); + + // Now check path permissions, starting with cache file's parent dir. + File path = new File(cachedFilePath).getParentFile(); + while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) { + TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(), + "dr-xrws---", userName, taskTrackerSpecialGroup); + path = path.getParentFile(); + } + } + + @Override + public void testDeleteCache() + throws Exception { + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; + } + super.testDeleteCache(); + } + + @Override + public void testFileSystemOtherThanDefault() + throws Exception { + if (!ClusterWithLinuxTaskController.shouldRun()) { + return; + } + super.testFileSystemOtherThanDefault(); + } +}