Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 67709 invoked from network); 26 Jan 2010 14:04:18 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 26 Jan 2010 14:04:18 -0000 Received: (qmail 76449 invoked by uid 500); 26 Jan 2010 14:04:18 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 76401 invoked by uid 500); 26 Jan 2010 14:04:18 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 76391 invoked by uid 99); 26 Jan 2010 14:04:18 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2010 14:04:18 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2010 14:04:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9F39B2388B43; Tue, 26 Jan 2010 14:03:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r903227 [15/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri... Date: Tue, 26 Jan 2010 14:03:09 -0000 To: mapreduce-commits@hadoop.apache.org From: stevel@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100126140315.9F39B2388B43@eris.apache.org> Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Tue Jan 26 14:02:53 2010 @@ -21,6 +21,7 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.PrintWriter; import java.util.ArrayList; import java.util.List; import java.util.jar.JarOutputStream; @@ -36,15 +37,18 @@ import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskType; -import org.apache.hadoop.mapreduce.security.JobTokens; +import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.mapred.JvmManager.JvmEnv; import org.apache.hadoop.mapred.TaskController.JobInitializationContext; import org.apache.hadoop.mapred.TaskController.TaskControllerContext; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; +import org.apache.hadoop.mapred.UtilsForTests.InlineCleanupQueue; import junit.framework.TestCase; @@ -55,9 +59,14 @@ */ public class TestTaskTrackerLocalization extends TestCase { - private File TEST_ROOT_DIR; + private static File TEST_ROOT_DIR = + new File(System.getProperty("test.build.data", "/tmp")); private File ROOT_MAPRED_LOCAL_DIR; private File HADOOP_LOG_DIR; + private static File PERMISSION_SCRIPT_DIR; + private static File PERMISSION_SCRIPT_FILE; + private static final String PERMISSION_SCRIPT_CONTENT = "ls -l -d $1 | " + + "awk '{print $1\":\"$3\":\"$4}'"; private int numLocalDirs = 6; private static final Log LOG = @@ -78,36 +87,20 @@ protected File[] attemptLogFiles; protected JobConf localizedTaskConf; - class InlineCleanupQueue extends CleanupQueue { - List stalePaths = new ArrayList(); - - public InlineCleanupQueue() { - // do nothing - } - - @Override - public void addToQueue(FileSystem fs, Path... paths) { - // delete in-line - for (Path p : paths) { - try { - LOG.info("Trying to delete the path " + p); - if (!fs.delete(p, true)) { - LOG.warn("Stale path " + p.toUri().getPath()); - stalePaths.add(p); - } - } catch (IOException e) { - LOG.warn("Caught exception while deleting path " - + p.toUri().getPath()); - LOG.info(StringUtils.stringifyException(e)); - stalePaths.add(p); - } - } - } + /** + * Dummy method in this base class. Only derived classes will define this + * method for checking if a test can be run. + */ + protected boolean canRun() { + return true; } @Override protected void setUp() throws Exception { + if (!canRun()) { + return; + } TEST_ROOT_DIR = new File(System.getProperty("test.build.data", "/tmp"), getClass() .getSimpleName()); @@ -147,7 +140,8 @@ tracker.setConf(trackerFConf); // for test case system FS is the local FS - tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf); + tracker.systemFS = FileSystem.getLocal(trackerFConf); + tracker.setLocalFileSystem(tracker.systemFS); tracker.systemDirectory = new Path(TEST_ROOT_DIR.getAbsolutePath()); taskTrackerUGI = UserGroupInformation.login(trackerFConf); @@ -158,7 +152,7 @@ taskId = new TaskAttemptID(jtIdentifier, jobId.getId(), TaskType.MAP, 1, 0); task = - new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, null, 1); + new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1); task.setConf(job.getConfiguration()); // Set conf. Set user name in particular. // create jobTokens file @@ -169,11 +163,40 @@ taskController.setConf(trackerFConf); taskController.setup(); - tracker.setLocalizer(new Localizer(tracker.localFs, localDirs, + tracker.setTaskController(taskController); + tracker.setLocalizer(new Localizer(tracker.getLocalFileSystem(), localDirs, taskController)); } /** + * static block setting up the permission script which would be used by the + * checkFilePermissions + */ + static { + PERMISSION_SCRIPT_DIR = new File(TEST_ROOT_DIR, "permission_script_dir"); + PERMISSION_SCRIPT_FILE = new File(PERMISSION_SCRIPT_DIR, "getperms.sh"); + + if(PERMISSION_SCRIPT_FILE.exists()) { + PERMISSION_SCRIPT_FILE.delete(); + } + + if(PERMISSION_SCRIPT_DIR.exists()) { + PERMISSION_SCRIPT_DIR.delete(); + } + + PERMISSION_SCRIPT_DIR.mkdir(); + + try { + PrintWriter writer = new PrintWriter(PERMISSION_SCRIPT_FILE); + writer.write(PERMISSION_SCRIPT_CONTENT); + writer.close(); + } catch (FileNotFoundException fe) { + fail(); + } + PERMISSION_SCRIPT_FILE.setExecutable(true, true); + } + + /** * @param job * @throws IOException * @throws FileNotFoundException @@ -222,10 +245,10 @@ if(!dir.exists()) assertTrue("faild to create dir="+dir.getAbsolutePath(), dir.mkdirs()); - File jobTokenFile = new File(dir, JobTokens.JOB_TOKEN_FILENAME); + File jobTokenFile = new File(dir, SecureShuffleUtils.JOB_TOKEN_FILENAME); FileOutputStream fos = new FileOutputStream(jobTokenFile); java.io.DataOutputStream out = new java.io.DataOutputStream(fos); - JobTokens jt = new JobTokens(); + Token jt = new Token(); jt.write(out); // writing empty file, we don't the keys for this test out.close(); } @@ -233,15 +256,31 @@ @Override protected void tearDown() throws Exception { + if (!canRun()) { + return; + } FileUtil.fullyDelete(TEST_ROOT_DIR); } protected static String[] getFilePermissionAttrs(String path) throws IOException { - String output = Shell.execCommand("stat", path, "-c", "%A:%U:%G"); + String[] command = {"bash",PERMISSION_SCRIPT_FILE.getAbsolutePath(), path}; + String output=Shell.execCommand(command); return output.split(":|\n"); } + + /** + * Utility method to check permission of a given path. Requires the permission + * script directory to be setup in order to call. + * + * + * @param path + * @param expectedPermissions + * @param expectedOwnerUser + * @param expectedOwnerGroup + * @throws IOException + */ static void checkFilePermissions(String path, String expectedPermissions, String expectedOwnerUser, String expectedOwnerGroup) throws IOException { @@ -264,6 +303,9 @@ */ public void testTaskControllerSetup() throws IOException { + if (!canRun()) { + return; + } // Task-controller is already set up in the test's setup method. Now verify. for (String localDir : localDirs) { @@ -287,7 +329,9 @@ */ public void testUserLocalization() throws IOException { - + if (!canRun()) { + return; + } // /////////// The main method being tested tracker.getLocalizer().initializeUserDirs(task.getUser()); // /////////// @@ -341,7 +385,7 @@ // Verify the distributed cache dir. File distributedCacheDir = new File(localDir, TaskTracker - .getDistributedCacheDir(task.getUser())); + .getPrivateDistributedCacheDir(task.getUser())); assertTrue("distributed cache dir " + distributedCacheDir + " doesn't exists!", distributedCacheDir.exists()); checkFilePermissions(distributedCacheDir.getAbsolutePath(), @@ -358,7 +402,9 @@ */ public void testJobLocalization() throws IOException { - + if (!canRun()) { + return; + } tracker.getLocalizer().initializeUserDirs(task.getUser()); // /////////// The main method being tested @@ -452,7 +498,9 @@ */ public void testTaskLocalization() throws IOException { - + if (!canRun()) { + return; + } tracker.getLocalizer().initializeUserDirs(task.getUser()); localizedJobConf = tracker.localizeJobFiles(task); @@ -568,14 +616,102 @@ } /** + * Validates the removal of $taskid and $tasid/work under mapred-local-dir + * in cases where those directories cannot be deleted without adding + * write permission to the newly created directories under $taskid and + * $taskid/work + * Also see TestSetupWorkDir.createFileAndSetPermissions for details + */ + void validateRemoveFiles(boolean needCleanup, boolean jvmReuse, + TaskInProgress tip) throws IOException { + // create files and set permissions 555. Verify if task controller sets + // the permissions for TT to delete the taskDir or workDir + String dir = (!needCleanup || jvmReuse) ? + TaskTracker.getTaskWorkDir(task.getUser(), task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()) + : TaskTracker.getLocalTaskDir(task.getUser(), task.getJobID().toString(), + taskId.toString(), task.isTaskCleanupTask()); + + Path[] paths = tracker.getLocalFiles(localizedJobConf, dir); + for (Path p : paths) { + if (tracker.getLocalFileSystem().exists(p)) { + TestSetupWorkDir.createFileAndSetPermissions(localizedJobConf, p); + } + } + + InlineCleanupQueue cleanupQueue = new InlineCleanupQueue(); + tracker.setCleanupThread(cleanupQueue); + + tip.removeTaskFiles(needCleanup, taskId); + + if (jvmReuse) { + // work dir should still exist and cleanup queue should be empty + assertTrue("cleanup queue is not empty after removeTaskFiles() in case " + + "of jvm reuse.", cleanupQueue.isQueueEmpty()); + boolean workDirExists = false; + for (Path p : paths) { + if (tracker.getLocalFileSystem().exists(p)) { + workDirExists = true; + } + } + assertTrue("work dir does not exist in case of jvm reuse", workDirExists); + + // now try to delete the work dir and verify that there are no stale paths + JvmManager.deleteWorkDir(tracker, task); + } + tracker.removeJobFiles(task.getUser(), jobId.toString()); + + assertTrue("Some task files are not deleted!! Number of stale paths is " + + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); + } + + /** + * Validates if task cleanup is done properly for a succeeded task * @throws IOException */ public void testTaskCleanup() throws IOException { + if (!canRun()) { + return; + } + testTaskCleanup(false, false);// no needCleanup; no jvmReuse + } + + /** + * Validates if task cleanup is done properly for a task that is not succeeded + * @throws IOException + */ + public void testFailedTaskCleanup() + throws IOException { + if (!canRun()) { + return; + } + testTaskCleanup(true, false);// needCleanup; no jvmReuse + } + + /** + * Validates if task cleanup is done properly for a succeeded task + * @throws IOException + */ + public void testTaskCleanupWithJvmUse() + throws IOException { + if (!canRun()) { + return; + } + testTaskCleanup(false, true);// no needCleanup; jvmReuse + } + /** + * Validates if task cleanup is done properly + */ + private void testTaskCleanup(boolean needCleanup, boolean jvmReuse) + throws IOException { // Localize job and localize task. tracker.getLocalizer().initializeUserDirs(task.getUser()); localizedJobConf = tracker.localizeJobFiles(task); + if (jvmReuse) { + localizedJobConf.setNumTasksToExecutePerJvm(2); + } // Now initialize the job via task-controller so as to set // ownership/permissions of jars, job-work-dir JobInitializationContext jobContext = new JobInitializationContext(); @@ -614,18 +750,9 @@ // TODO: Let the task run and create files. - InlineCleanupQueue cleanupQueue = new InlineCleanupQueue(); - tracker.directoryCleanupThread = cleanupQueue; - - // ////////// The central methods being tested - tip.removeTaskFiles(true, taskId); - tracker.removeJobFiles(task.getUser(), jobId.toString()); - // ////////// - - // TODO: make sure that all files intended to be deleted are deleted. - - assertTrue("Some task files are not deleted!! Number of stale paths is " - + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0); + // create files and set permissions 555. Verify if task controller sets + // the permissions for TT to delete the task dir or work dir properly + validateRemoveFiles(needCleanup, jvmReuse, tip); // Check that the empty $mapreduce.cluster.local.dir/taskTracker/$user dirs are still // there. @@ -633,7 +760,7 @@ Path userDir = new Path(localDir, TaskTracker.getUserDir(task.getUser())); assertTrue("User directory " + userDir + " is not present!!", - tracker.localFs.exists(userDir)); + tracker.getLocalFileSystem().exists(userDir)); } // Test userlogs cleanup. @@ -653,7 +780,7 @@ // Logs should be there before cleanup. assertTrue("Userlogs dir " + logDir + " is not presen as expected!!", - tracker.localFs.exists(logDir)); + tracker.getLocalFileSystem().exists(logDir)); // ////////// Another being tested TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file @@ -662,6 +789,6 @@ // Logs should be gone after cleanup. assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!", - tracker.localFs.exists(logDir)); + tracker.getLocalFileSystem().exists(logDir)); } } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Tue Jan 26 14:02:53 2010 @@ -24,6 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController; import org.apache.hadoop.mapreduce.filecache.TestTrackerDistributedCacheManager; @@ -36,7 +37,6 @@ TestTrackerDistributedCacheManager { private File configFile; - private MyLinuxTaskController taskController; private String taskTrackerSpecialGroup; private static final Log LOG = @@ -45,7 +45,7 @@ @Override protected void setUp() - throws IOException { + throws IOException, InterruptedException { if (!ClusterWithLinuxTaskController.shouldRun()) { return; @@ -57,7 +57,7 @@ .getSimpleName()).getAbsolutePath(); super.setUp(); - + taskController = new MyLinuxTaskController(); String path = System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); @@ -65,7 +65,7 @@ ClusterWithLinuxTaskController.createTaskControllerConf(path, conf .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); String execPath = path + "/task-controller"; - taskController.setTaskControllerExe(execPath); + ((MyLinuxTaskController)taskController).setTaskControllerExe(execPath); taskController.setConf(conf); taskController.setup(); @@ -74,6 +74,17 @@ } @Override + protected void refreshConf(Configuration conf) throws IOException { + super.refreshConf(conf); + String path = + System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH); + configFile = + ClusterWithLinuxTaskController.createTaskControllerConf(path, conf + .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY)); + + } + + @Override protected void tearDown() throws IOException { if (!ClusterWithLinuxTaskController.shouldRun()) { @@ -99,27 +110,19 @@ } @Override - protected TaskController getTaskController() { - return taskController; - } - - @Override protected void checkFilePermissions(Path[] localCacheFiles) throws IOException { - String cachedFirstFile = localCacheFiles[0].toUri().getPath(); - String cachedSecondFile = localCacheFiles[1].toUri().getPath(); String userName = getJobOwnerName(); - // First make sure that the cache files have proper permissions. - TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile, - "-r-xrwx---", userName, taskTrackerSpecialGroup); - TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile, - "-r-xrwx---", userName, taskTrackerSpecialGroup); - - // Now. make sure that all the path components also have proper - // permissions. - checkPermissionOnPathComponents(cachedFirstFile, userName); - checkPermissionOnPathComponents(cachedSecondFile, userName); + for (Path p : localCacheFiles) { + // First make sure that the cache file has proper permissions. + TestTaskTrackerLocalization.checkFilePermissions(p.toUri().getPath(), + "-r-xrwx---", userName, taskTrackerSpecialGroup); + // Now. make sure that all the path components also have proper + // permissions. + checkPermissionOnPathComponents(p.toUri().getPath(), userName); + } + } /** @@ -134,9 +137,9 @@ String trailingStringForFirstFile = cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath() + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]" - + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName), + + Path.SEPARATOR + TaskTracker.getPrivateDistributedCacheDir(userName), ""); - LOG.info("Leading path for cacheFirstFile is : " + LOG.info("Trailing path for cacheFirstFile is : " + trailingStringForFirstFile); // The leading mapreduce.cluster.local.dir/0_[0-n]/taskTracker/$user string. String leadingStringForFirstFile = Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java Tue Jan 26 14:02:53 2010 @@ -25,9 +25,11 @@ import java.io.IOException; import java.io.InputStream; import java.text.DecimalFormat; +import java.util.ArrayList; import java.util.Arrays; import java.util.Enumeration; import java.util.Iterator; +import java.util.List; import java.util.Properties; import org.apache.commons.logging.LogFactory; @@ -48,6 +50,7 @@ import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.util.StringUtils; import org.apache.commons.logging.Log; @@ -272,7 +275,9 @@ while (true) { boolean shouldWait = false; for (JobStatus jobStatuses : jobClient.getAllJobs()) { - if (jobStatuses.getRunState() == JobStatus.RUNNING) { + if (jobStatuses.getRunState() != JobStatus.SUCCEEDED + && jobStatuses.getRunState() != JobStatus.FAILED + && jobStatuses.getRunState() != JobStatus.KILLED) { shouldWait = true; break; } @@ -620,6 +625,7 @@ conf.setJobName("test-job-fail"); conf.setMapperClass(FailMapper.class); conf.setReducerClass(IdentityReducer.class); + conf.setMaxMapAttempts(1); RunningJob job = UtilsForTests.runJob(conf, inDir, outDir); while (!job.isComplete()) { @@ -660,6 +666,37 @@ return job; } + + /** + * Cleans up files/dirs inline. CleanupQueue deletes in a separate thread + * asynchronously. + */ + public static class InlineCleanupQueue extends CleanupQueue { + List stalePaths = new ArrayList(); + + public InlineCleanupQueue() { + // do nothing + } + + @Override + public void addToQueue(PathDeletionContext... contexts) { + // delete paths in-line + for (PathDeletionContext context : contexts) { + try { + if (!deletePath(context)) { + LOG.warn("Stale path " + context.fullPath); + stalePaths.add(context.fullPath); + } + } catch (IOException e) { + LOG.warn("Caught exception while deleting path " + + context.fullPath); + LOG.info(StringUtils.stringifyException(e)); + stalePaths.add(context.fullPath); + } + } + } + } + static class FakeClock extends Clock { long time = 0; @@ -722,7 +759,7 @@ conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0"); JobTracker jt; try { - jt = new JobTracker(conf); + jt = JobTracker.startTracker(conf); return jt; } catch (Exception e) { throw new RuntimeException("Could not start jt", e); Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/GenericMRLoadGenerator.java Tue Jan 26 14:02:53 2010 @@ -150,13 +150,13 @@ } else if (null != conf.getClass(INDIRECT_INPUT_FORMAT, null)) { // specified IndirectInputFormat? Build src list JobClient jClient = new JobClient(conf); - Path sysdir = jClient.getSystemDir(); + Path tmpDir = new Path("/tmp"); Random r = new Random(); - Path indirInputFile = new Path(sysdir, + Path indirInputFile = new Path(tmpDir, Integer.toString(r.nextInt(Integer.MAX_VALUE), 36) + "_files"); conf.set(INDIRECT_INPUT_FILE, indirInputFile.toString()); SequenceFile.Writer writer = SequenceFile.createWriter( - sysdir.getFileSystem(conf), conf, indirInputFile, + tmpDir.getFileSystem(conf), conf, indirInputFile, LongWritable.class, Text.class, SequenceFile.CompressionType.NONE); try { Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMRJobClient.java Tue Jan 26 14:02:53 2010 @@ -19,6 +19,7 @@ import java.io.BufferedReader; import java.io.ByteArrayOutputStream; +import java.io.File; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStream; @@ -34,6 +35,8 @@ import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.junit.Test; + public class TestMRJobClient extends ClusterMapReduceTestCase { private static final Log LOG = LogFactory.getLog(TestMRJobClient.class); @@ -61,6 +64,7 @@ } } + @Test public void testJobClient() throws Exception { Configuration conf = createJobConf(); Job job = runJob(conf); @@ -69,7 +73,8 @@ testJobList(jobId, conf); testChangingJobPriority(jobId, conf); } - + + @Test public void testGetCounter(String jobId, Configuration conf) throws Exception { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -81,6 +86,7 @@ assertEquals("Counter", "3", out.toString().trim()); } + @Test public void testJobList(String jobId, Configuration conf) throws Exception { verifyJobPriority(jobId, "HIGH", conf, createJobClient()); @@ -106,7 +112,8 @@ } pis.close(); } - + + @Test public void testChangingJobPriority(String jobId, Configuration conf) throws Exception { int exitCode = runTool(conf, createJobClient(), @@ -115,7 +122,56 @@ assertEquals("Exit code", 0, exitCode); verifyJobPriority(jobId, "VERY_LOW", conf, createJobClient()); } - + + @Test + public void testMissingProfileOutput() throws Exception { + Configuration conf = createJobConf(); + final String input = "hello1\n"; + + // Set a job to be profiled with an empty agentlib parameter. + // This will fail to create profile.out files for tasks. + // This will succeed by skipping the HTTP fetch of the + // profiler output. + Job job = MapReduceTestUtil.createJob(conf, + getInputDir(), getOutputDir(), 1, 1, input); + job.setJobName("disable-profile-fetch"); + job.setProfileEnabled(true); + job.setProfileParams("-agentlib:,verbose=n,file=%s"); + job.setMaxMapAttempts(1); + job.setMaxReduceAttempts(1); + job.setJobSetupCleanupNeeded(false); + job.waitForCompletion(true); + + // Run another job with an hprof agentlib param; verify + // that the HTTP fetch works here. + Job job2 = MapReduceTestUtil.createJob(conf, + getInputDir(), getOutputDir(), 1, 1, input); + job2.setJobName("enable-profile-fetch"); + job2.setProfileEnabled(true); + job2.setProfileParams( + "-agentlib:hprof=cpu=samples,heap=sites,force=n," + + "thread=y,verbose=n,file=%s"); + job2.setProfileTaskRange(true, "0-1"); + job2.setProfileTaskRange(false, ""); + job2.setMaxMapAttempts(1); + job2.setMaxReduceAttempts(1); + job2.setJobSetupCleanupNeeded(false); + job2.waitForCompletion(true); + + // Find the first map task, verify that we got its profile output file. + TaskReport [] reports = job2.getTaskReports(TaskType.MAP); + assertTrue("No task reports found!", reports.length > 0); + TaskReport report = reports[0]; + TaskID id = report.getTaskId(); + assertTrue(TaskType.MAP == id.getTaskType()); + System.out.println("Using task id: " + id); + TaskAttemptID attemptId = new TaskAttemptID(id, 0); + + File profileOutFile = new File(attemptId.toString() + ".profile"); + assertTrue("Couldn't find profiler output", profileOutFile.exists()); + assertTrue("Couldn't remove profiler output", profileOutFile.delete()); + } + protected CLI createJobClient() throws IOException { return new CLI(); } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/TestMapReduceLocal.java Tue Jan 26 14:02:53 2010 @@ -162,10 +162,16 @@ "REDUCE_OUTPUT_RECORDS").getValue(); long reduceGrps = ctrs.findCounter(COUNTER_GROUP, "REDUCE_INPUT_GROUPS").getValue(); + long mergedMapOutputs = ctrs.findCounter(COUNTER_GROUP, + "MERGED_MAP_OUTPUTS").getValue(); + long shuffledMaps = ctrs.findCounter(COUNTER_GROUP, + "SHUFFLED_MAPS").getValue(); assertEquals("map out = combine in", mapOut, combineIn); assertEquals("combine out = reduce in", combineOut, reduceIn); assertTrue("combine in > combine out", combineIn > combineOut); assertEquals("reduce groups = reduce out", reduceGrps, reduceOut); + assertEquals("Mismatch in mergedMapOutputs", mergedMapOutputs, 2); + assertEquals("Mismatch in shuffledMaps", shuffledMaps, 2); String group = "Random Group"; CounterGroup ctrGrp = ctrs.getGroup(group); assertEquals(0, ctrGrp.size()); Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/filecache/TestTrackerDistributedCacheManager.java Tue Jan 26 14:02:53 2010 @@ -32,12 +32,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.DefaultTaskController; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskController; import org.apache.hadoop.mapred.TaskTracker; -import org.apache.hadoop.mapred.TaskController.InitializationContext; import org.apache.hadoop.mapreduce.Cluster; import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -45,10 +45,13 @@ import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager; import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.ReflectionUtils; import org.mortbay.log.Log; public class TestTrackerDistributedCacheManager extends TestCase { @@ -59,7 +62,6 @@ .getAbsolutePath(); protected File ROOT_MAPRED_LOCAL_DIR; - private static String TEST_CACHE_BASE_DIR = "cachebasedir"; protected int numLocalDirs = 6; private static final int TEST_FILE_SIZE = 4 * 1024; // 4K @@ -70,10 +72,11 @@ private FileSystem fs; protected LocalDirAllocator localDirAllocator = - new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY); + new LocalDirAllocator(MRConfig.LOCAL_DIR); + protected TaskController taskController; @Override - protected void setUp() throws IOException { + protected void setUp() throws IOException,InterruptedException { // Prepare the tests' root dir File TEST_ROOT = new File(TEST_ROOT_DIR); @@ -85,17 +88,36 @@ ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local"); ROOT_MAPRED_LOCAL_DIR.mkdirs(); + String []localDirs = new String[numLocalDirs]; + for (int i = 0; i < numLocalDirs; i++) { + File localDir = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i); + localDirs[i] = localDir.getPath(); + localDir.mkdir(); + } + conf = new Configuration(); - conf.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT); - conf.set(JobConf.MAPRED_LOCAL_DIR_PROPERTY, ROOT_MAPRED_LOCAL_DIR.toString()); + conf.setStrings(MRConfig.LOCAL_DIR, localDirs); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); fs = FileSystem.get(conf); + Class taskControllerClass = conf.getClass( + TTConfig.TT_TASK_CONTROLLER, DefaultTaskController.class, + TaskController.class); + taskController = (TaskController) ReflectionUtils.newInstance( + taskControllerClass, conf); + + // setup permissions for mapred local dir + taskController.setup(); // Create the temporary cache files to be used in the tests. firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile"); secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile"); - createTempFile(firstCacheFile); - createTempFile(secondCacheFile); + createPrivateTempFile(firstCacheFile); + createPrivateTempFile(secondCacheFile); + } + + protected void refreshConf(Configuration conf) throws IOException { + taskController.setConf(conf); + taskController.setup(); } /** @@ -121,9 +143,12 @@ // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(conf); + String userName = getJobOwnerName(); + subConf.set(JobContext.USER_NAME, userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); DistributedCache.addFileToClassPath(secondCacheFile, subConf); TrackerDistributedCacheManager.determineTimestamps(subConf); + TrackerDistributedCacheManager.determineCacheVisibilities(subConf); // ****** End of imitating JobClient code Path jobFile = new Path(TEST_ROOT_DIR, "job.xml"); @@ -131,22 +156,16 @@ subConf.writeXml(os); os.close(); - String userName = getJobOwnerName(); - // ****** Imitate TaskRunner code. TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); + new TrackerDistributedCacheManager(conf, taskController); TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); handle.setup(localDirAllocator, workDir, TaskTracker - .getDistributedCacheDir(userName)); - - InitializationContext context = new InitializationContext(); - context.user = userName; - context.workDir = workDir; - getTaskController().initializeDistributedCache(context); + .getPrivateDistributedCacheDir(userName), + TaskTracker.getPublicDistributedCacheDir()); // ****** End of imitating TaskRunner code Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); @@ -176,28 +195,26 @@ TrackerDistributedCacheManager { public FakeTrackerDistributedCacheManager(Configuration conf) throws IOException { - super(conf); + super(conf, taskController); } @Override Path localizeCache(Configuration conf, URI cache, long confFileStamp, - CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive) - throws IOException { + CacheStatus cacheStatus, FileStatus fileStatus, boolean isArchive, + boolean isPublic) throws IOException { if (cache.equals(firstCacheFile.toUri())) { throw new IOException("fake fail"); } return super.localizeCache(conf, cache, confFileStamp, cacheStatus, - fileStatus, isArchive); + fileStatus, isArchive, isPublic); } } public void testReferenceCount() throws IOException, LoginException, - URISyntaxException { + URISyntaxException, InterruptedException { if (!canRun()) { return; } - Configuration conf = new Configuration(); - conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///"); TrackerDistributedCacheManager manager = new FakeTrackerDistributedCacheManager(conf); Cluster cluster = new Cluster(conf); @@ -206,25 +223,29 @@ // Configures a job with a regular file Job job1 = Job.getInstance(cluster, conf); + job1.setUser(userName); job1.addCacheFile(secondCacheFile.toUri()); Configuration conf1 = job1.getConfiguration(); TrackerDistributedCacheManager.determineTimestamps(conf1); + TrackerDistributedCacheManager.determineCacheVisibilities(conf1); // Task localizing for first job TaskDistributedCacheManager handle = manager .newTaskDistributedCacheManager(conf1); handle.setup(localDirAllocator, workDir, TaskTracker - .getDistributedCacheDir(userName)); + .getPrivateDistributedCacheDir(userName), + TaskTracker.getPublicDistributedCacheDir()); handle.release(); for (TaskDistributedCacheManager.CacheFile c : handle.getCacheFiles()) { assertEquals(0, manager.getReferenceCount(c.uri, conf1, c.timestamp)); } Path thirdCacheFile = new Path(TEST_ROOT_DIR, "thirdcachefile"); - createTempFile(thirdCacheFile); + createPrivateTempFile(thirdCacheFile); // Configures another job with three regular files. Job job2 = Job.getInstance(cluster, conf); + job2.setUser(userName); // add a file that would get failed to localize job2.addCacheFile(firstCacheFile.toUri()); // add a file that is already localized by different job @@ -233,6 +254,7 @@ job2.addCacheFile(thirdCacheFile.toUri()); Configuration conf2 = job2.getConfiguration(); TrackerDistributedCacheManager.determineTimestamps(conf2); + TrackerDistributedCacheManager.determineCacheVisibilities(conf2); // Task localizing for second job // localization for the "firstCacheFile" will fail. @@ -240,7 +262,8 @@ Throwable th = null; try { handle.setup(localDirAllocator, workDir, TaskTracker - .getDistributedCacheDir(userName)); + .getPrivateDistributedCacheDir(userName), + TaskTracker.getPublicDistributedCacheDir()); } catch (IOException e) { th = e; Log.info("Exception during setup", e); @@ -261,7 +284,73 @@ assertTrue(th.getMessage().contains(thirdCacheFile.getName())); fs.delete(thirdCacheFile, false); } + + /** + * Tests that localization of distributed cache file happens in the desired + * directory + * @throws IOException + * @throws LoginException + */ + public void testPublicPrivateCache() + throws IOException, LoginException, InterruptedException { + if (!canRun()) { + return; + } + checkLocalizedPath("true"); + checkLocalizedPath("false"); + } + + private void checkLocalizedPath(String visibility) + throws IOException, LoginException, InterruptedException { + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf, taskController); + Cluster cluster = new Cluster(conf); + String userName = getJobOwnerName(); + File workDir = new File(TEST_ROOT_DIR, "workdir"); + Path cacheFile = new Path(TEST_ROOT_DIR, "fourthcachefile"); + if ("true".equals(visibility)) { + createPublicTempFile(cacheFile); + } else { + createPrivateTempFile(cacheFile); + } + + Job job1 = Job.getInstance(cluster, conf); + job1.setUser(userName); + job1.addCacheFile(cacheFile.toUri()); + Configuration conf1 = job1.getConfiguration(); + TrackerDistributedCacheManager.determineTimestamps(conf1); + TrackerDistributedCacheManager.determineCacheVisibilities(conf1); + // Task localizing for job + TaskDistributedCacheManager handle = manager + .newTaskDistributedCacheManager(conf1); + handle.setup(localDirAllocator, workDir, TaskTracker + .getPrivateDistributedCacheDir(userName), + TaskTracker.getPublicDistributedCacheDir()); + TaskDistributedCacheManager.CacheFile c = handle.getCacheFiles().get(0); + String distCacheDir; + if ("true".equals(visibility)) { + distCacheDir = TaskTracker.getPublicDistributedCacheDir(); + } else { + distCacheDir = TaskTracker.getPrivateDistributedCacheDir(userName); + } + Path localizedPath = + manager.getLocalCache(cacheFile.toUri(), conf1, distCacheDir, + fs.getFileStatus(cacheFile), false, + c.timestamp, new Path(TEST_ROOT_DIR), false, + Boolean.parseBoolean(visibility)); + assertTrue("Cache file didn't get localized in the expected directory. " + + "Expected localization to happen within " + + ROOT_MAPRED_LOCAL_DIR + "/" + distCacheDir + + ", but was localized at " + + localizedPath, localizedPath.toString().contains(distCacheDir)); + if ("true".equals(visibility)) { + checkPublicFilePermissions(new Path[]{localizedPath}); + } else { + checkFilePermissions(new Path[]{localizedPath}); + } + } + /** * Check proper permissions on the cache files * @@ -270,17 +359,29 @@ */ protected void checkFilePermissions(Path[] localCacheFiles) throws IOException { - Path cachedFirstFile = localCacheFiles[0]; - Path cachedSecondFile = localCacheFiles[1]; - // Both the files should have executable permissions on them. - assertTrue("First cache file is not executable!", new File(cachedFirstFile - .toUri().getPath()).canExecute()); - assertTrue("Second cache file is not executable!", new File( - cachedSecondFile.toUri().getPath()).canExecute()); + // All the files should have executable permissions on them. + for (Path p : localCacheFiles) { + assertTrue("Cache file is not executable!", new File(p + .toUri().getPath()).canExecute()); + } } - protected TaskController getTaskController() { - return new DefaultTaskController(); + /** + * Check permissions on the public cache files + * + * @param localCacheFiles + * @throws IOException + */ + private void checkPublicFilePermissions(Path[] localCacheFiles) + throws IOException { + // All the files should have read and executable permissions for others + for (Path p : localCacheFiles) { + FsPermission perm = fs.getFileStatus(p).getPermission(); + assertTrue("cache file is not readable by others", perm.getOtherAction() + .implies(FsAction.READ)); + assertTrue("cache file is not executable by others", perm + .getOtherAction().implies(FsAction.EXECUTE)); + } } protected String getJobOwnerName() throws LoginException { @@ -293,27 +394,39 @@ if (!canRun()) { return; } + // This test needs MRConfig.LOCAL_DIR to be single directory + // instead of four, because it assumes that both + // firstcachefile and secondcachefile will be localized on same directory + // so that second localization triggers deleteCache. + // If MRConfig.LOCAL_DIR is four directories, second localization might not + // trigger deleteCache, if it is localized in different directory. + Configuration conf2 = new Configuration(conf); + conf2.set(MRConfig.LOCAL_DIR, ROOT_MAPRED_LOCAL_DIR.toString()); + conf2.setLong(TTConfig.TT_LOCAL_CACHE_SIZE, LOCAL_CACHE_LIMIT); + refreshConf(conf2); TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); - FileSystem localfs = FileSystem.getLocal(conf); + new TrackerDistributedCacheManager(conf2, taskController); + FileSystem localfs = FileSystem.getLocal(conf2); long now = System.currentTimeMillis(); + String userName = getJobOwnerName(); + conf2.set(JobContext.USER_NAME, userName); - manager.getLocalCache(firstCacheFile.toUri(), conf, - TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false, - now, new Path(TEST_ROOT_DIR), false); - manager.releaseCache(firstCacheFile.toUri(), conf, now); + Path localCache = manager.getLocalCache(firstCacheFile.toUri(), conf2, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(firstCacheFile), false, + now, new Path(TEST_ROOT_DIR), false, false); + manager.releaseCache(firstCacheFile.toUri(), conf2, now); //in above code,localized a file of size 4K and then release the cache // which will cause the cache be deleted when the limit goes out. // The below code localize another cache which's designed to //sweep away the first cache. - manager.getLocalCache(secondCacheFile.toUri(), conf, - TEST_CACHE_BASE_DIR, fs.getFileStatus(secondCacheFile), false, - System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false); - FileStatus[] dirStatuses = localfs.listStatus( - new Path(ROOT_MAPRED_LOCAL_DIR.toString())); - assertTrue("DistributedCache failed deleting old" + + manager.getLocalCache(secondCacheFile.toUri(), conf2, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(secondCacheFile), false, + System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + assertFalse("DistributedCache failed deleting old" + " cache when the cache store is full.", - dirStatuses.length == 1); + localfs.exists(localCache)); } public void testFileSystemOtherThanDefault() throws Exception { @@ -321,14 +434,17 @@ return; } TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(conf); + new TrackerDistributedCacheManager(conf, taskController); conf.set("fs.fakefile.impl", conf.get("fs.file.impl")); + String userName = getJobOwnerName(); + conf.set(JobContext.USER_NAME, userName); Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath()); Path result = manager.getLocalCache(fileToCache.toUri(), conf, - TEST_CACHE_BASE_DIR, fs.getFileStatus(firstCacheFile), false, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(firstCacheFile), false, System.currentTimeMillis(), - new Path(TEST_ROOT_DIR), false); + new Path(TEST_ROOT_DIR), false, false); assertNotNull("DistributedCache cached file on non-default filesystem.", result); } @@ -342,6 +458,18 @@ os.close(); FileSystem.LOG.info("created: " + p + ", size=" + TEST_FILE_SIZE); } + + static void createPublicTempFile(Path p) + throws IOException, InterruptedException { + createTempFile(p); + FileUtil.chmod(p.toString(), "0777",true); + } + + static void createPrivateTempFile(Path p) + throws IOException, InterruptedException { + createTempFile(p); + FileUtil.chmod(p.toString(), "0770",true); + } @Override protected void tearDown() throws IOException { @@ -382,26 +510,29 @@ return; } Configuration myConf = new Configuration(conf); - myConf.set("fs.default.name", "refresh:///"); + myConf.set(FileSystem.FS_DEFAULT_NAME_KEY, "refresh:///"); myConf.setClass("fs.refresh.impl", FakeFileSystem.class, FileSystem.class); + String userName = getJobOwnerName(); + TrackerDistributedCacheManager manager = - new TrackerDistributedCacheManager(myConf); + new TrackerDistributedCacheManager(myConf, taskController); // ****** Imitate JobClient code // Configures a task/job with both a regular file and a "classpath" file. Configuration subConf = new Configuration(myConf); + subConf.set(JobContext.USER_NAME, userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf); TrackerDistributedCacheManager.determineTimestamps(subConf); + TrackerDistributedCacheManager.determineCacheVisibilities(subConf); // ****** End of imitating JobClient code - String userName = getJobOwnerName(); - // ****** Imitate TaskRunner code. TaskDistributedCacheManager handle = manager.newTaskDistributedCacheManager(subConf); assertNull(null, DistributedCache.getLocalCacheFiles(subConf)); File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString()); handle.setup(localDirAllocator, workDir, TaskTracker - .getDistributedCacheDir(userName)); + .getPrivateDistributedCacheDir(userName), + TaskTracker.getPublicDistributedCacheDir()); // ****** End of imitating TaskRunner code Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf); @@ -422,7 +553,7 @@ Throwable th = null; try { handle.setup(localDirAllocator, workDir, TaskTracker - .getDistributedCacheDir(userName)); + .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir()); } catch (IOException ie) { th = ie; } @@ -434,13 +565,15 @@ // submit another job Configuration subConf2 = new Configuration(myConf); + subConf2.set(JobContext.USER_NAME, userName); DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf2); TrackerDistributedCacheManager.determineTimestamps(subConf2); + TrackerDistributedCacheManager.determineCacheVisibilities(subConf2); handle = manager.newTaskDistributedCacheManager(subConf2); handle.setup(localDirAllocator, workDir, TaskTracker - .getDistributedCacheDir(userName)); + .getPrivateDistributedCacheDir(userName), TaskTracker.getPublicDistributedCacheDir()); Path[] localCacheFiles2 = DistributedCache.getLocalCacheFiles(subConf2); assertNotNull(null, localCacheFiles2); assertEquals(1, localCacheFiles2.length); @@ -456,4 +589,46 @@ handle.release(); } + /** + * Localize a file. After localization is complete, create a file, "myFile", + * under the directory where the file is localized and ensure that it has + * permissions different from what is set by default. Then, localize another + * file. Verify that "myFile" has the right permissions. + * @throws Exception + */ + public void testCustomPermissions() throws Exception { + if (!canRun()) { + return; + } + String userName = getJobOwnerName(); + conf.set(JobContext.USER_NAME, userName); + TrackerDistributedCacheManager manager = + new TrackerDistributedCacheManager(conf, taskController); + FileSystem localfs = FileSystem.getLocal(conf); + long now = System.currentTimeMillis(); + + Path[] localCache = new Path[2]; + localCache[0] = manager.getLocalCache(firstCacheFile.toUri(), conf, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(firstCacheFile), false, + now, new Path(TEST_ROOT_DIR), false, false); + FsPermission myPermission = new FsPermission((short)0600); + Path myFile = new Path(localCache[0].getParent(), "myfile.txt"); + if (FileSystem.create(localfs, myFile, myPermission) == null) { + throw new IOException("Could not create " + myFile); + } + try { + localCache[1] = manager.getLocalCache(secondCacheFile.toUri(), conf, + TaskTracker.getPrivateDistributedCacheDir(userName), + fs.getFileStatus(secondCacheFile), false, + System.currentTimeMillis(), new Path(TEST_ROOT_DIR), false, false); + FileStatus stat = localfs.getFileStatus(myFile); + assertTrue(stat.getPermission().equals(myPermission)); + // validate permissions of localized files. + checkFilePermissions(localCache); + } finally { + localfs.delete(myFile, false); + } + } + } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/mapreduce/util/TestProcfsBasedProcessTree.java Tue Jan 26 14:02:53 2010 @@ -25,6 +25,7 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; +import java.util.Arrays; import java.util.Random; import java.util.Vector; import java.util.regex.Matcher; @@ -181,12 +182,12 @@ LOG.info("Process-tree dump follows: \n" + processTreeDump); assertTrue("Process-tree dump doesn't start with a proper header", - processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " - + "VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n")); + processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " + + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " + + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n")); for (int i = N; i >= 0; i--) { - String cmdLineDump = - "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\) [0-9]+ [0-9]+ sh " + shellScript - + " " + i; + String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" + + " [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i; Pattern pat = Pattern.compile(cmdLineDump); Matcher mat = pat.matcher(processTreeDump); assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i @@ -267,6 +268,8 @@ String session; String vmem = "0"; String rssmemPage = "0"; + String utime = "0"; + String stime = "0"; public ProcessStatInfo(String[] statEntries) { pid = statEntries[0]; @@ -278,27 +281,32 @@ if (statEntries.length > 6) { rssmemPage = statEntries[6]; } + if (statEntries.length > 7) { + utime = statEntries[7]; + stime = statEntries[8]; + } } // construct a line that mimics the procfs stat file. // all unused numerical entries are set to 0. public String getStatLine() { return String.format("%s (%s) S %s %s %s 0 0 0" + - " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s %s 0 0" + + " 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" + " 0 0 0 0 0 0 0 0" + " 0 0 0 0 0", - pid, name, ppid, pgrpId, session, vmem, rssmemPage); + pid, name, ppid, pgrpId, session, + utime, stime, vmem, rssmemPage); } } /** * A basic test that creates a few process directories and writes - * stat files. Verifies that the virtual and rss memory is correctly + * stat files. Verifies that the cpu time and memory is correctly * computed. * @throws IOException if there was a problem setting up the * fake procfs directories or files. */ - public void testMemoryForProcessTree() throws IOException { + public void testCpuAndMemoryForProcessTree() throws IOException { // test processes String[] pids = { "100", "200", "300", "400" }; @@ -313,13 +321,13 @@ // assuming processes 100, 200, 300 are in tree and 400 is not. ProcessStatInfo[] procInfos = new ProcessStatInfo[4]; procInfos[0] = new ProcessStatInfo(new String[] - {"100", "proc1", "1", "100", "100", "100000", "100"}); + {"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"}); procInfos[1] = new ProcessStatInfo(new String[] - {"200", "proc2", "100", "100", "100", "200000", "200"}); + {"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"}); procInfos[2] = new ProcessStatInfo(new String[] - {"300", "proc3", "200", "100", "100", "300000", "300"}); + {"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"}); procInfos[3] = new ProcessStatInfo(new String[] - {"400", "proc4", "1", "400", "400", "400000", "400"}); + {"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"}); writeStatFiles(procfsRootDir, pids, procInfos); @@ -339,6 +347,28 @@ 600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L; assertEquals("Cumulative rss memory does not match", cumuRssMem, processTree.getCumulativeRssmem()); + + // verify cumulative cpu time + long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ? + 7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L; + assertEquals("Cumulative cpu time does not match", + cumuCpuTime, processTree.getCumulativeCpuTime()); + + // test the cpu time again to see if it cumulates + procInfos[0] = new ProcessStatInfo(new String[] + {"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"}); + procInfos[1] = new ProcessStatInfo(new String[] + {"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"}); + writeStatFiles(procfsRootDir, pids, procInfos); + + // build the process tree. + processTree.getProcessTree(); + + // verify cumulative cpu time again + cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ? + 9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L; + assertEquals("Cumulative cpu time does not match", + cumuCpuTime, processTree.getCumulativeCpuTime()); } finally { FileUtil.fullyDelete(procfsRootDir); } @@ -498,17 +528,17 @@ // Processes 200, 300, 400 and 500 are descendants of 100. 600 is not. ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses]; procInfos[0] = new ProcessStatInfo(new String[] { - "100", "proc1", "1", "100", "100", "100000", "100"}); + "100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"}); procInfos[1] = new ProcessStatInfo(new String[] { - "200", "proc2", "100", "100", "100", "200000", "200"}); + "200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"}); procInfos[2] = new ProcessStatInfo(new String[] { - "300", "proc3", "200", "100", "100", "300000", "300"}); + "300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"}); procInfos[3] = new ProcessStatInfo(new String[] { - "400", "proc4", "200", "100", "100", "400000", "400"}); + "400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"}); procInfos[4] = new ProcessStatInfo(new String[] { - "500", "proc5", "400", "100", "100", "400000", "400"}); + "500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"}); procInfos[5] = new ProcessStatInfo(new String[] { - "600", "proc6", "1", "1", "1", "400000", "400"}); + "600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"}); String[] cmdLines = new String[numProcesses]; cmdLines[0] = "proc1 arg1 arg2"; @@ -532,15 +562,17 @@ LOG.info("Process-tree dump follows: \n" + processTreeDump); assertTrue("Process-tree dump doesn't start with a proper header", - processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " - + "VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n")); + processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " + + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " + + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n")); for (int i = 0; i < 5; i++) { ProcessStatInfo p = procInfos[i]; assertTrue( "Process-tree dump doesn't contain the cmdLineDump of process " + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name - + ") " + p.vmem + " " + p.rssmemPage + " " + cmdLines[i])); + + ") " + p.utime + " " + p.stime + " " + p.vmem + " " + + p.rssmemPage + " " + cmdLines[i])); } // 600 should not be in the dump @@ -549,7 +581,7 @@ "Process-tree dump shouldn't contain the cmdLineDump of process " + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name + ") " - + p.vmem + " " + p.rssmemPage + " " + cmdLines[5])); + + p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5])); } finally { FileUtil.fullyDelete(procfsRootDir); } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java Tue Jan 26 14:02:53 2010 @@ -58,7 +58,10 @@ JobConf mrConf = new JobConf(conf); mr = new MiniMRCluster(slaves, fileSys.getUri().toString(), 1, null, null, mrConf); - + // make cleanup inline sothat validation of existence of these directories + // can be done + mr.setInlineCleanupThreads(); + // Run examples TestMiniMRWithDFS.runPI(mr, mr.createJobConf(mrConf)); TestMiniMRWithDFS.runWordCount(mr, mr.createJobConf(mrConf)); Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java ------------------------------------------------------------------------------ --- svn:mergeinfo (original) +++ svn:mergeinfo Tue Jan 26 14:02:53 2010 @@ -1,3 +1,3 @@ /hadoop/core/branches/branch-0.19/mapred/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:713112 /hadoop/core/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:776175-785643 -/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:817878-835934,884917-885774 +/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/security/authorize/TestServiceLevelAuthorization.java:817878-835934,884917-903221 Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/tools/TestCopyFiles.java Tue Jan 26 14:02:53 2010 @@ -995,6 +995,41 @@ } } + /** + * verify that -delete option works for other {@link FileSystem} + * implementations. See MAPREDUCE-1285 */ + public void testDeleteLocal() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + final FileSystem localfs = FileSystem.get(LOCAL_FS, conf); + cluster = new MiniDFSCluster(conf, 1, true, null); + final FileSystem hdfs = cluster.getFileSystem(); + final String namenode = FileSystem.getDefaultUri(conf).toString(); + if (namenode.startsWith("hdfs://")) { + MyFile[] files = createFiles(URI.create(namenode), "/srcdat"); + String destdir = TEST_ROOT_DIR + "/destdat"; + MyFile[] localFiles = createFiles(localfs, destdir); + ToolRunner.run(new DistCp(conf), new String[] { + "-delete", + "-update", + "-log", + "/logs", + namenode+"/srcdat", + "file:///"+TEST_ROOT_DIR+"/destdat"}); + assertTrue("Source and destination directories do not match.", + checkFiles(localfs, destdir, files)); + assertTrue("Log directory does not exist.", + hdfs.exists(new Path("/logs"))); + deldir(localfs, destdir); + deldir(hdfs, "/logs"); + deldir(hdfs, "/srcdat"); + } + } finally { + if (cluster != null) { cluster.shutdown(); } + } + } + /** test globbing */ public void testGlobbing() throws Exception { String namenode = null; @@ -1057,4 +1092,4 @@ } return results.toString(); } -} \ No newline at end of file +} Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/test/mapred/org/apache/hadoop/util/TestReflectionUtils.java Tue Jan 26 14:02:53 2010 @@ -16,109 +16,33 @@ * limitations under the License. */ -package org.apache.hadoop.util; - -import java.net.URL; -import java.net.URLClassLoader; -import java.util.HashMap; +package org.apache.hadoop.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; - -import junit.framework.TestCase; - -public class TestReflectionUtils extends TestCase { - - private static Class toConstruct[] = { String.class, TestReflectionUtils.class, HashMap.class }; - private Throwable failure = null; - - public void setUp() { - ReflectionUtils.clearCache(); - } - - public void testCache() throws Exception { - assertEquals(0, cacheSize()); - doTestCache(); - assertEquals(toConstruct.length, cacheSize()); - ReflectionUtils.clearCache(); - assertEquals(0, cacheSize()); - } - - - @SuppressWarnings("unchecked") - private void doTestCache() { - for (int i=0; i ops, Path log) throws IOException { + private boolean setup(List ops, Path log) + throws IOException { final String randomId = getRandomId(); JobClient jClient = new JobClient(jobconf); - Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId); + Path stagingArea; + try { + stagingArea = JobSubmissionFiles.getStagingDir( + jClient.getClusterHandle(), jobconf); + } catch (InterruptedException ie){ + throw new IOException(ie); + } + Path jobdir = new Path(stagingArea + NAME + "_" + randomId); + FsPermission mapredSysPerms = + new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); + FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms); LOG.info(JOB_DIR_LABEL + "=" + jobdir); if (log == null) { Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp.java Tue Jan 26 14:02:53 2010 @@ -34,6 +34,8 @@ import java.util.Stack; import java.util.StringTokenizer; +import javax.security.auth.login.LoginException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -66,7 +68,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.SequenceFileRecordReader; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -1196,9 +1201,22 @@ final String randomId = getRandomId(); JobClient jClient = new JobClient(jobConf); - Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId); + Path stagingArea; + try { + stagingArea = + JobSubmissionFiles.getStagingDir(jClient.getClusterHandle(), conf); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + + Path jobDirectory = new Path(stagingArea + NAME + "_" + randomId); + FsPermission mapredSysPerms = + new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); + FileSystem.mkdirs(jClient.getFs(), jobDirectory, mapredSysPerms); jobConf.set(JOB_DIR_LABEL, jobDirectory.toString()); + long maxBytesPerMap = conf.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP); + FileSystem dstfs = args.dst.getFileSystem(conf); boolean dstExists = dstfs.exists(args.dst); boolean dstIsDir = false; @@ -1365,7 +1383,7 @@ ++cnsyncf; cbsyncs += child.getLen(); - if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) { + if (cnsyncf > SYNC_FILE_MAX || cbsyncs > maxBytesPerMap) { src_writer.sync(); dst_writer.sync(); cnsyncf = 0; @@ -1540,7 +1558,7 @@ //write dst lsr results final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr"); final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf, - dstlsr, Text.class, FileStatus.class, + dstlsr, Text.class, dstroot.getClass(), SequenceFile.CompressionType.NONE); try { //do lsr to get all file statuses in dstroot Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/DistCp_Counter.properties Tue Jan 26 14:02:53 2010 @@ -1,3 +1,15 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # ResourceBundle properties file for distcp counters CounterGroupName= distcp Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/HadoopArchives.java Tue Jan 26 14:02:53 2010 @@ -29,6 +29,8 @@ import java.util.Set; import java.util.TreeMap; +import javax.security.auth.login.LoginException; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HarFileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.SequenceFile; @@ -56,7 +59,12 @@ import org.apache.hadoop.mapred.SequenceFileRecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.lib.NullOutputFormat; +import org.apache.hadoop.mapreduce.Cluster; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.security.UnixUserGroupInformation; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -359,12 +367,22 @@ } conf.set(DST_DIR_LABEL, outputPath.toString()); final String randomId = DistCp.getRandomId(); - Path jobDirectory = new Path(new JobClient(conf).getSystemDir(), + Path stagingArea; + try { + stagingArea = JobSubmissionFiles.getStagingDir(new Cluster(conf), + conf); + } catch (InterruptedException ie) { + throw new IOException(ie); + } + Path jobDirectory = new Path(stagingArea, NAME + "_" + randomId); + FsPermission mapredSysPerms = + new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION); + FileSystem.mkdirs(jobDirectory.getFileSystem(conf), jobDirectory, + mapredSysPerms); conf.set(JOB_DIR_LABEL, jobDirectory.toString()); //get a tmp directory for input splits FileSystem jobfs = jobDirectory.getFileSystem(conf); - jobfs.mkdirs(jobDirectory); Path srcFiles = new Path(jobDirectory, "_har_src_files"); conf.set(SRC_LIST_LABEL, srcFiles.toString()); SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf, Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/HadoopLogsAnalyzer.java Tue Jan 26 14:02:53 2010 @@ -1285,7 +1285,7 @@ attempt.setLocation(host.makeLoggedLocation()); } - ArrayList locs = task.getPreferredLocations(); + List locs = task.getPreferredLocations(); if (host != null && locs != null) { for (LoggedLocation loc : locs) { Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/JsonObjectMapperParser.java Tue Jan 26 14:02:53 2010 @@ -36,7 +36,9 @@ /** * A simple wrapper for parsing JSON-encoded data using ObjectMapper. - * @param The (base) type of the object(s) to be parsed by this parser. + * + * @param + * The (base) type of the object(s) to be parsed by this parser. */ class JsonObjectMapperParser implements Closeable { private final ObjectMapper mapper; @@ -47,7 +49,7 @@ /** * Constructor. * - * @param path + * @param path * Path to the JSON data file, possibly compressed. * @param conf * @throws IOException Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedJob.java Tue Jan 26 14:02:53 2010 @@ -102,6 +102,24 @@ setJobID(jobID); } + void adjustTimes(long adjustment) { + submitTime += adjustment; + launchTime += adjustment; + finishTime += adjustment; + + for (LoggedTask task : mapTasks) { + task.adjustTimes(adjustment); + } + + for (LoggedTask task : reduceTasks) { + task.adjustTimes(adjustment); + } + + for (LoggedTask task : otherTasks) { + task.adjustTimes(adjustment); + } + } + @SuppressWarnings("unused") // for input parameter ignored. @JsonAnySetter Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedLocation.java Tue Jan 26 14:02:53 2010 @@ -18,7 +18,10 @@ package org.apache.hadoop.tools.rumen; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.TreeSet; @@ -41,12 +44,15 @@ * */ public class LoggedLocation implements DeepCompare { + static final Map, List> layersCache = + new HashMap, List>(); + /** * The full path from the root of the network to the host. * * NOTE that this assumes that the network topology is a tree. */ - List layers = new ArrayList(); + List layers = Collections.emptyList(); static private Set alreadySeenAnySetterAttributes = new TreeSet(); @@ -56,7 +62,26 @@ } void setLayers(List layers) { - this.layers = layers; + if (layers == null || layers.isEmpty()) { + this.layers = Collections.emptyList(); + } else { + synchronized (layersCache) { + List found = layersCache.get(layers); + if (found == null) { + // make a copy with interned string. + List clone = new ArrayList(layers.size()); + for (String s : layers) { + clone.add(s.intern()); + } + // making it read-only as we are sharing them. + List readonlyLayers = Collections.unmodifiableList(clone); + layersCache.put(readonlyLayers, readonlyLayers); + this.layers = readonlyLayers; + } else { + this.layers = found; + } + } + } } @SuppressWarnings("unused") Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTask.java Tue Jan 26 14:02:53 2010 @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.rumen; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Set; import java.util.TreeSet; @@ -44,9 +45,7 @@ Pre21JobHistoryConstants.Values taskType; Pre21JobHistoryConstants.Values taskStatus; List attempts = new ArrayList(); - - ArrayList preferredLocations = - new ArrayList(); + List preferredLocations = Collections.emptyList(); int numberMaps = -1; int numberReduces = -1; @@ -69,6 +68,15 @@ super(); } + void adjustTimes(long adjustment) { + startTime += adjustment; + finishTime += adjustment; + + for (LoggedTaskAttempt attempt : attempts) { + attempt.adjustTimes(adjustment); + } + } + public long getInputBytes() { return inputBytes; } @@ -130,15 +138,23 @@ } void setAttempts(List attempts) { - this.attempts = attempts; + if (attempts == null) { + this.attempts = new ArrayList(); + } else { + this.attempts = attempts; + } } - public ArrayList getPreferredLocations() { + public List getPreferredLocations() { return preferredLocations; } - void setPreferredLocations(ArrayList preferredLocations) { - this.preferredLocations = preferredLocations; + void setPreferredLocations(List preferredLocations) { + if (preferredLocations == null || preferredLocations.isEmpty()) { + this.preferredLocations = Collections.emptyList(); + } else { + this.preferredLocations = preferredLocations; + } } public int getNumberMaps() { @@ -204,8 +220,8 @@ } } - private void compareLoggedLocations(ArrayList c1, - ArrayList c2, TreePath loc, String eltname) + private void compareLoggedLocations(List c1, + List c2, TreePath loc, String eltname) throws DeepInequalityException { if (c1 == null && c2 == null) { return; Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/tools/org/apache/hadoop/tools/rumen/LoggedTaskAttempt.java Tue Jan 26 14:02:53 2010 @@ -82,6 +82,11 @@ } } + void adjustTimes(long adjustment) { + startTime += adjustment; + finishTime += adjustment; + } + public long getShuffleFinished() { return shuffleFinished; } @@ -135,7 +140,7 @@ } void setHostName(String hostName) { - this.hostName = hostName; + this.hostName = hostName.intern(); } public long getHdfsBytesRead() {