Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9C53F9656 for ; Sat, 4 Feb 2012 00:07:00 +0000 (UTC) Received: (qmail 31549 invoked by uid 500); 4 Feb 2012 00:07:00 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 31375 invoked by uid 500); 4 Feb 2012 00:06:59 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 31362 invoked by uid 99); 4 Feb 2012 00:06:59 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2012 00:06:59 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2012 00:06:49 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 3C93A23888D2; Sat, 4 Feb 2012 00:06:26 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1240414 [1/2] - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-cli... Date: Sat, 04 Feb 2012 00:06:25 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120204000626.3C93A23888D2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Sat Feb 4 00:06:24 2012 New Revision: 1240414 URL: http://svn.apache.org/viewvc?rev=1240414&view=rev Log: MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task output is recovered and thus reduce the unnecessarily bloated recovery time. Contributed by Robert Joseph Evans. svn merge --ignore-ancestry -c 1240413 ../../trunk/ Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Sat Feb 4 00:06:24 2012 @@ -628,6 +628,10 @@ Release 0.23.1 - Unreleased MAPREDUCE-3727. jobtoken location property in jobconf refers to wrong jobtoken file (tucu) + MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task + output is recovered and thus reduce the unnecessarily bloated recovery + time. (Robert Joseph Evans via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Feb 4 00:06:24 2012 @@ -559,6 +559,7 @@ public abstract class TaskImpl implement } private void internalError(TaskEventType type) { + LOG.error("Invalid event " + type + " on Task " + this.taskId); eventHandler.handle(new JobDiagnosticsUpdateEvent( this.taskId.getJobId(), "Invalid event " + type + " on Task " + this.taskId)); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/local/LocalContainerAllocator.java Sat Feb 4 00:06:24 2012 @@ -103,6 +103,7 @@ public class LocalContainerAllocator ext // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. if (System.currentTimeMillis() - retrystartTime >= retryInterval) { + LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); eventHandler.handle(new JobEvent(this.getJob().getID(), JobEventType.INTERNAL_ERROR)); throw new YarnException("Could not contact RM after " + Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Sat Feb 4 00:06:24 2012 @@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; @@ -358,16 +360,24 @@ public class RecoveryService extends Com //recover the task output TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(), attInfo.getAttemptId()); - try { - committer.recoverTask(taskContext); + try { + TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType(); + int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); + if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) { + committer.recoverTask(taskContext); + LOG.info("Recovered output from task attempt " + attInfo.getAttemptId()); + } else { + LOG.info("Will not try to recover output for " + + taskContext.getTaskAttemptID()); + } } catch (IOException e) { + LOG.error("Caught an exception while trying to recover task "+aId, e); actualHandler.handle(new JobDiagnosticsUpdateEvent( aId.getTaskId().getJobId(), "Error in recovering task output " + e.getMessage())); actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(), JobEventType.INTERNAL_ERROR)); } - LOG.info("Recovered output from task attempt " + attInfo.getAttemptId()); // send the done event LOG.info("Sending done event to " + aId); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Feb 4 00:06:24 2012 @@ -543,6 +543,7 @@ public class RMContainerAllocator extend // This can happen when the connection to the RM has gone down. Keep // re-trying until the retryInterval has expired. if (System.currentTimeMillis() - retrystartTime >= retryInterval) { + LOG.error("Could not contact RM after " + retryInterval + " milliseconds."); eventHandler.handle(new JobEvent(this.getJob().getID(), JobEventType.INTERNAL_ERROR)); throw new YarnException("Could not contact RM after " + Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Sat Feb 4 00:06:24 2012 @@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Disp import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestRecovery { private static final Log LOG = LogFactory.getLog(TestRecovery.class); @@ -112,7 +113,7 @@ public class TestRecovery { Assert.assertEquals("Reduce Task state not correct", TaskState.RUNNING, reduceTask.getReport().getTaskState()); - //send the fail signal to the 1st map task attempt + //send the fail signal to the 1st map task attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( task1Attempt1.getID(), @@ -193,7 +194,7 @@ public class TestRecovery { //RUNNING state app.waitForState(task2Attempt, TaskAttemptState.RUNNING); - //send the done signal to the 2nd map task + //send the done signal to the 2nd map task app.getContext().getEventHandler().handle( new TaskAttemptEvent( mapTask2.getAttempts().values().iterator().next().getID(), @@ -349,6 +350,151 @@ public class TestRecovery { validateOutput(); } + @Test + public void testOutputRecoveryMapsOnly() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask1 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator() + .next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + + // write output corresponding to map1 (This is just to validate that it is + //no included in the output) + writeBadOutput(task1Attempt1, conf); + + //send the done signal to the map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); + + //stop the app before the job completes. + app.stop(); + + //rerun + //in rerun the map will be recovered from previous run + app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask1 = it.next(); + + // map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + // Verify the shuffle-port after recovery + task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + Assert.assertEquals(5467, task1Attempt1.getShufflePort()); + + app.waitForState(mapTask2, TaskState.RUNNING); + + TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator() + .next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt1, TaskAttemptState.RUNNING); + + //send the done signal to the map + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task2Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for map task to complete + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // Verify the shuffle-port + Assert.assertEquals(5467, task2Attempt1.getShufflePort()); + + app.waitForState(reduceTask1, TaskState.RUNNING); + TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next(); + + // write output corresponding to reduce1 + writeOutput(reduce1Attempt1, conf); + + //send the done signal to the 1st reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduce1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first reduce task to complete + app.waitForState(reduceTask1, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + validateOutput(); + } + + private void writeBadOutput(TaskAttempt attempt, Configuration conf) + throws Exception { + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, + TypeConverter.fromYarn(attempt.getID())); + + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = theOutputFormat + .getRecordWriter(tContext); + + NullWritable nullWritable = NullWritable.get(); + try { + theRecordWriter.write(key2, val2); + theRecordWriter.write(null, nullWritable); + theRecordWriter.write(null, val2); + theRecordWriter.write(nullWritable, val1); + theRecordWriter.write(key1, nullWritable); + theRecordWriter.write(key2, null); + theRecordWriter.write(null, null); + theRecordWriter.write(key1, val1); + } finally { + theRecordWriter.close(tContext); + } + + OutputFormat outputFormat = ReflectionUtils.newInstance( + tContext.getOutputFormatClass(), conf); + OutputCommitter committer = outputFormat.getOutputCommitter(tContext); + committer.commitTask(tContext); +} + + private void writeOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Sat Feb 4 00:06:24 2012 @@ -19,14 +19,12 @@ package org.apache.hadoop.mapred; import java.io.IOException; -import java.net.URI; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** An {@link OutputCommitter} that commits files specified @@ -42,280 +40,140 @@ public class FileOutputCommitter extends /** * Temporary directory name */ - public static final String TEMP_DIR_NAME = "_temporary"; - public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; - static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = - "mapreduce.fileoutputcommitter.marksuccessfuljobs"; - - public void setupJob(JobContext context) throws IOException { + public static final String TEMP_DIR_NAME = + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME; + public static final String SUCCEEDED_FILE_NAME = + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME; + static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER; + + private static Path getOutputPath(JobContext context) { + JobConf conf = context.getJobConf(); + return FileOutputFormat.getOutputPath(conf); + } + + private static Path getOutputPath(TaskAttemptContext context) { JobConf conf = context.getJobConf(); - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - Path tmpDir = - new Path(outputPath, getJobAttemptBaseDirName(context) + - Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(conf); - if (!fileSys.mkdirs(tmpDir)) { - LOG.error("Mkdirs failed to create " + tmpDir.toString()); - } + return FileOutputFormat.getOutputPath(conf); + } + + private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null; + + private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + getWrapped(JobContext context) throws IOException { + if(wrapped == null) { + wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter( + getOutputPath(context), context); } + return wrapped; } - - // True if the job requires output.dir marked on successful job. - // Note that by default it is set to true. - private boolean shouldMarkOutputDir(JobConf conf) { - return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true); + + private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + getWrapped(TaskAttemptContext context) throws IOException { + if(wrapped == null) { + wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter( + getOutputPath(context), context); + } + return wrapped; } - public void commitJob(JobContext context) throws IOException { - //delete the task temp directory from the current jobtempdir - JobConf conf = context.getJobConf(); - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - FileSystem outputFileSystem = outputPath.getFileSystem(conf); - Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + - Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } else { - LOG.warn("Task temp dir could not be deleted " + tmpDir); - } + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context the context of the job. This is used to get the + * application attempt id. + * @return the path to store job attempt data. + */ + @Private + Path getJobAttemptPath(JobContext context) { + return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .getJobAttemptPath(context, getOutputPath(context)); + } - //move the job output to final place - Path jobOutputPath = - new Path(outputPath, getJobAttemptBaseDirName(context)); - moveJobOutputs(outputFileSystem, - jobOutputPath, outputPath, jobOutputPath); + @Private + Path getTaskAttemptPath(TaskAttemptContext context) throws IOException { + return getTaskAttemptPath(context, getOutputPath(context)); + } - // delete the _temporary folder in the output folder - cleanupJob(context); - // check if the output-dir marking is required - if (shouldMarkOutputDir(context.getJobConf())) { - // create a _success file in the output folder - markOutputDirSuccessful(context); - } + private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException { + Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf()); + if(workPath == null) { + return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .getTaskAttemptPath(context, out); } + return workPath; } - // Create a _success file in the job's output folder - private void markOutputDirSuccessful(JobContext context) throws IOException { - JobConf conf = context.getJobConf(); - // get the o/p path - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - // get the filesys - FileSystem fileSys = outputPath.getFileSystem(conf); - // create a file in the output folder to mark the job completion - Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); - fileSys.create(filePath).close(); - } + /** + * Compute the path where the output of a committed task is stored until + * the entire job is committed. + * @param context the context of the task attempt + * @return the path where the output of a committed task is stored until + * the entire job is committed. + */ + Path getCommittedTaskPath(TaskAttemptContext context) { + return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .getCommittedTaskPath(context, getOutputPath(context)); } - private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, - Path finalOutputDir, Path jobOutput) throws IOException { - LOG.debug("Told to move job output from " + jobOutput - + " to " + finalOutputDir + - " and orig job output path is " + origJobOutputPath); - if (fs.isFile(jobOutput)) { - Path finalOutputPath = - getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath); - if (!fs.rename(jobOutput, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new IOException("Failed to delete earlier output of job"); - } - if (!fs.rename(jobOutput, finalOutputPath)) { - throw new IOException("Failed to save output of job"); - } - } - LOG.debug("Moved job output file from " + jobOutput + " to " + - finalOutputPath); - } else if (fs.getFileStatus(jobOutput).isDirectory()) { - LOG.debug("Job output file " + jobOutput + " is a dir"); - FileStatus[] paths = fs.listStatus(jobOutput); - Path finalOutputPath = - getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath); - fs.mkdirs(finalOutputPath); - LOG.debug("Creating dirs along job output path " + finalOutputPath); - if (paths != null) { - for (FileStatus path : paths) { - moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath()); - } - } - } + public Path getWorkPath(TaskAttemptContext context, Path outputPath) + throws IOException { + return getTaskAttemptPath(context, outputPath); + } + + @Override + public void setupJob(JobContext context) throws IOException { + getWrapped(context).setupJob(context); + } + + @Override + public void commitJob(JobContext context) throws IOException { + getWrapped(context).commitJob(context); } @Override @Deprecated public void cleanupJob(JobContext context) throws IOException { - JobConf conf = context.getJobConf(); - // do the clean up of temporary directory - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(conf); - context.getProgressible().progress(); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } else { - LOG.warn("Output Path is Null in cleanup"); - } - } + getWrapped(context).cleanupJob(context); } @Override public void abortJob(JobContext context, int runState) throws IOException { - // simply delete the _temporary dir from the o/p folder of the job - cleanupJob(context); + JobStatus.State state; + if(runState == JobStatus.State.RUNNING.getValue()) { + state = JobStatus.State.RUNNING; + } else if(runState == JobStatus.State.SUCCEEDED.getValue()) { + state = JobStatus.State.SUCCEEDED; + } else if(runState == JobStatus.State.FAILED.getValue()) { + state = JobStatus.State.FAILED; + } else if(runState == JobStatus.State.PREP.getValue()) { + state = JobStatus.State.PREP; + } else if(runState == JobStatus.State.KILLED.getValue()) { + state = JobStatus.State.KILLED; + } else { + throw new IllegalArgumentException(runState+" is not a valid runState."); + } + getWrapped(context).abortJob(context, state); } public void setupTask(TaskAttemptContext context) throws IOException { - // FileOutputCommitter's setupTask doesn't do anything. Because the - // temporary task directory is created on demand when the - // task is writing. + getWrapped(context).setupTask(context); } - - public void commitTask(TaskAttemptContext context) - throws IOException { - Path taskOutputPath = getTempTaskOutputPath(context); - TaskAttemptID attemptId = context.getTaskAttemptID(); - JobConf job = context.getJobConf(); - if (taskOutputPath != null) { - FileSystem fs = taskOutputPath.getFileSystem(job); - context.getProgressible().progress(); - if (fs.exists(taskOutputPath)) { - // Move the task outputs to the current job attempt output dir - JobConf conf = context.getJobConf(); - Path outputPath = FileOutputFormat.getOutputPath(conf); - FileSystem outputFileSystem = outputPath.getFileSystem(conf); - Path jobOutputPath = new Path(outputPath, getJobTempDirName(context)); - moveTaskOutputs(context, outputFileSystem, jobOutputPath, - taskOutputPath); - - // Delete the temporary task-specific output directory - if (!fs.delete(taskOutputPath, true)) { - LOG.info("Failed to delete the temporary output" + - " directory of task: " + attemptId + " - " + taskOutputPath); - } - LOG.info("Saved output of task '" + attemptId + "' to " + - jobOutputPath); - } - } - } - - private void moveTaskOutputs(TaskAttemptContext context, - FileSystem fs, - Path jobOutputDir, - Path taskOutput) - throws IOException { - TaskAttemptID attemptId = context.getTaskAttemptID(); - context.getProgressible().progress(); - LOG.debug("Told to move taskoutput from " + taskOutput - + " to " + jobOutputDir); - if (fs.isFile(taskOutput)) { - Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, - getTempTaskOutputPath(context)); - if (!fs.rename(taskOutput, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new IOException("Failed to delete earlier output of task: " + - attemptId); - } - if (!fs.rename(taskOutput, finalOutputPath)) { - throw new IOException("Failed to save output of task: " + - attemptId); - } - } - LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); - } else if(fs.getFileStatus(taskOutput).isDirectory()) { - LOG.debug("Taskoutput " + taskOutput + " is a dir"); - FileStatus[] paths = fs.listStatus(taskOutput); - Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, - getTempTaskOutputPath(context)); - fs.mkdirs(finalOutputPath); - LOG.debug("Creating dirs along path " + finalOutputPath); - if (paths != null) { - for (FileStatus path : paths) { - moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); - } - } - } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + getWrapped(context).commitTask(context, getTaskAttemptPath(context)); } + @Override public void abortTask(TaskAttemptContext context) throws IOException { - Path taskOutputPath = getTempTaskOutputPath(context); - if (taskOutputPath != null) { - FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf()); - context.getProgressible().progress(); - fs.delete(taskOutputPath, true); - } - } - - @SuppressWarnings("deprecation") - private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput, - Path taskOutputPath) throws IOException { - URI taskOutputUri = taskOutput.makeQualified(fs).toUri(); - URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri(); - URI relativePath = taskOutputPathUri.relativize(taskOutputUri); - if (taskOutputUri == relativePath) { - //taskOutputPath is not a parent of taskOutput - throw new IOException("Can not get the relative path: base = " + - taskOutputPathUri + " child = " + taskOutputUri); - } - if (relativePath.getPath().length() > 0) { - return new Path(jobOutputDir, relativePath.getPath()); - } else { - return jobOutputDir; - } + getWrapped(context).abortTask(context, getTaskAttemptPath(context)); } + @Override public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { - Path taskOutputPath = getTempTaskOutputPath(context); - if (taskOutputPath != null) { - context.getProgressible().progress(); - // Get the file-system for the task output directory - FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf()); - // since task output path is created on demand, - // if it exists, task needs a commit - if (fs.exists(taskOutputPath)) { - return true; - } - } - return false; - } - - Path getTempTaskOutputPath(TaskAttemptContext taskContext) - throws IOException { - JobConf conf = taskContext.getJobConf(); - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - Path p = new Path(outputPath, - (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - "_" + taskContext.getTaskAttemptID().toString())); - FileSystem fs = p.getFileSystem(conf); - return p.makeQualified(fs); - } - return null; - } - - Path getWorkPath(TaskAttemptContext taskContext, Path basePath) - throws IOException { - // ${mapred.out.dir}/_temporary - Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf()); - if (!fs.exists(jobTmpDir)) { - throw new IOException("The temporary job-output directory " + - jobTmpDir.toString() + " doesn't exist!"); - } - // ${mapred.out.dir}/_temporary/_${taskid} - String taskid = taskContext.getTaskAttemptID().toString(); - Path taskTmpDir = new Path(jobTmpDir, "_" + taskid); - if (!fs.mkdirs(taskTmpDir)) { - throw new IOException("Mkdirs failed to create " - + taskTmpDir.toString()); - } - return taskTmpDir; + return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context)); } @Override @@ -326,54 +184,6 @@ public class FileOutputCommitter extends @Override public void recoverTask(TaskAttemptContext context) throws IOException { - Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf()); - context.progress(); - Path jobOutputPath = new Path(outputPath, getJobTempDirName(context)); - int previousAttempt = - context.getConfiguration().getInt( - MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1; - if (previousAttempt < 0) { - LOG.warn("Cannot recover task output for first attempt..."); - return; - } - - FileSystem outputFileSystem = - outputPath.getFileSystem(context.getJobConf()); - Path pathToRecover = - new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); - if (outputFileSystem.exists(pathToRecover)) { - // Move the task outputs to their final place - LOG.debug("Trying to recover task from " + pathToRecover - + " into " + jobOutputPath); - moveJobOutputs(outputFileSystem, - pathToRecover, jobOutputPath, pathToRecover); - LOG.info("Saved output of job to " + jobOutputPath); - } - } - - protected static String getJobAttemptBaseDirName(JobContext context) { - int appAttemptId = - context.getJobConf().getInt( - MRConstants.APPLICATION_ATTEMPT_ID, 0); - return getJobAttemptBaseDirName(appAttemptId); - } - - protected static String getJobTempDirName(TaskAttemptContext context) { - int appAttemptId = - context.getJobConf().getInt( - MRConstants.APPLICATION_ATTEMPT_ID, 0); - return getJobAttemptBaseDirName(appAttemptId); - } - - protected static String getJobAttemptBaseDirName(int appAttemptId) { - return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - + appAttemptId; - } - - protected static String getTaskAttemptBaseDirName( - TaskAttemptContext context) { - return getJobTempDirName(context) + Path.SEPARATOR + - FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - "_" + context.getTaskAttemptID().toString(); + getWrapped(context).recoverTask(context); } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Sat Feb 4 00:06:24 2012 @@ -525,7 +525,7 @@ abstract public class Task implements Wr if (outputPath != null) { if ((committer instanceof FileOutputCommitter)) { FileOutputFormat.setWorkOutputPath(conf, - ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext)); + ((FileOutputCommitter)committer).getTaskAttemptPath(taskContext)); } else { FileOutputFormat.setWorkOutputPath(conf, outputPath); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java Sat Feb 4 00:06:24 2012 @@ -51,17 +51,21 @@ import org.apache.hadoop.classification. * Discard the task commit. * * + * The methods in this class can be called from several different processes and + * from several different contexts. It is important to know which process and + * which context each is called from. Each method should be marked accordingly + * in its documentation. * * @see org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter * @see JobContext * @see TaskAttemptContext - * */ @InterfaceAudience.Public @InterfaceStability.Stable public abstract class OutputCommitter { /** - * For the framework to setup the job output during initialization + * For the framework to setup the job output during initialization. This is + * called from the application master process for the entire job. * * @param jobContext Context of the job whose output is being written. * @throws IOException if temporary output could not be created @@ -69,11 +73,12 @@ public abstract class OutputCommitter { public abstract void setupJob(JobContext jobContext) throws IOException; /** - * For cleaning up the job's output after job completion + * For cleaning up the job's output after job completion. This is called + * from the application master process for the entire job. * * @param jobContext Context of the job whose output is being written. * @throws IOException - * @deprecated Use {@link #commitJob(JobContext)} or + * @deprecated Use {@link #commitJob(JobContext)} and * {@link #abortJob(JobContext, JobStatus.State)} instead. */ @Deprecated @@ -81,7 +86,8 @@ public abstract class OutputCommitter { /** * For committing job's output after successful job completion. Note that this - * is invoked for jobs with final runstate as SUCCESSFUL. + * is invoked for jobs with final runstate as SUCCESSFUL. This is called + * from the application master process for the entire job. * * @param jobContext Context of the job whose output is being written. * @throws IOException @@ -94,7 +100,8 @@ public abstract class OutputCommitter { /** * For aborting an unsuccessful job's output. Note that this is invoked for * jobs with final runstate as {@link JobStatus.State#FAILED} or - * {@link JobStatus.State#KILLED}. + * {@link JobStatus.State#KILLED}. This is called from the application + * master process for the entire job. * * @param jobContext Context of the job whose output is being written. * @param state final runstate of the job @@ -106,7 +113,8 @@ public abstract class OutputCommitter { } /** - * Sets up output for the task. + * Sets up output for the task. This is called from each individual task's + * process that will output to HDFS, and it is called just for that task. * * @param taskContext Context of the task whose output is being written. * @throws IOException @@ -115,7 +123,9 @@ public abstract class OutputCommitter { throws IOException; /** - * Check whether task needs a commit + * Check whether task needs a commit. This is called from each individual + * task's process that will output to HDFS, and it is called just for that + * task. * * @param taskContext * @return true/false @@ -125,18 +135,23 @@ public abstract class OutputCommitter { throws IOException; /** - * To promote the task's temporary output to final output location - * - * The task's output is moved to the job's output directory. + * To promote the task's temporary output to final output location. + * If {@link #needsTaskCommit(TaskAttemptContext)} returns true and this + * task is the task that the AM determines finished first, this method + * is called to commit an individual task's output. This is to mark + * that tasks output as complete, as {@link #commitJob(JobContext)} will + * also be called later on if the entire job finished successfully. This + * is called from a task's process. * * @param taskContext Context of the task whose output is being written. - * @throws IOException if commit is not + * @throws IOException if commit is not successful. */ public abstract void commitTask(TaskAttemptContext taskContext) throws IOException; /** - * Discard the task output + * Discard the task output. This is called from a task's process to clean + * up a single task's output that can not yet been committed. * * @param taskContext * @throws IOException @@ -164,7 +179,8 @@ public abstract class OutputCommitter { * The retry-count for the job will be passed via the * {@link MRJobConfig#APPLICATION_ATTEMPT_ID} key in * {@link TaskAttemptContext#getConfiguration()} for the - * OutputCommitter. + * OutputCommitter. This is called from the application master + * process, but it is called individually for each task. * * If an exception is thrown the task will be attempted again. * Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Sat Feb 4 00:06:24 2012 @@ -19,16 +19,16 @@ package org.apache.hadoop.mapreduce.lib.output; import java.io.IOException; -import java.net.URI; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRJobConfig; @@ -37,41 +37,239 @@ import org.apache.hadoop.mapreduce.TaskA import org.apache.hadoop.mapreduce.TaskAttemptID; /** An {@link OutputCommitter} that commits files specified - * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. + * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. **/ @InterfaceAudience.Public @InterfaceStability.Stable public class FileOutputCommitter extends OutputCommitter { - private static final Log LOG = LogFactory.getLog(FileOutputCommitter.class); - /** - * Temporary directory name + /** + * Name of directory where pending data is placed. Data that has not been + * committed yet. */ - protected static final String TEMP_DIR_NAME = "_temporary"; + public static final String PENDING_DIR_NAME = "_temporary"; public static final String SUCCEEDED_FILE_NAME = "_SUCCESS"; - static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = + public static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"; - private FileSystem outputFileSystem = null; private Path outputPath = null; private Path workPath = null; /** * Create a file output committer - * @param outputPath the job's output path + * @param outputPath the job's output path, or null if you want the output + * committer to act as a noop. * @param context the task's context * @throws IOException */ public FileOutputCommitter(Path outputPath, TaskAttemptContext context) throws IOException { + this(outputPath, (JobContext)context); + if (outputPath != null) { + workPath = getTaskAttemptPath(context, outputPath); + } + } + + /** + * Create a file output committer + * @param outputPath the job's output path, or null if you want the output + * committer to act as a noop. + * @param context the task's context + * @throws IOException + */ + @Private + public FileOutputCommitter(Path outputPath, + JobContext context) throws IOException { if (outputPath != null) { - this.outputPath = outputPath; - outputFileSystem = outputPath.getFileSystem(context.getConfiguration()); - workPath = new Path(outputPath, - getTaskAttemptBaseDirName(context)) - .makeQualified(outputFileSystem); + FileSystem fs = outputPath.getFileSystem(context.getConfiguration()); + this.outputPath = fs.makeQualified(outputPath); } } + + /** + * @return the path where final output of the job should be placed. This + * could also be considered the committed application attempt path. + */ + private Path getOutputPath() { + return this.outputPath; + } + + /** + * @return true if we have an output path set, else false. + */ + private boolean hasOutputPath() { + return this.outputPath != null; + } + + /** + * @return the path where the output of pending job attempts are + * stored. + */ + private Path getPendingJobAttemptsPath() { + return getPendingJobAttemptsPath(getOutputPath()); + } + + /** + * Get the location of pending job attempts. + * @param out the base output directory. + * @return the location of pending job attempts. + */ + private static Path getPendingJobAttemptsPath(Path out) { + return new Path(out, PENDING_DIR_NAME); + } + + /** + * Get the Application Attempt Id for this job + * @param context the context to look in + * @return the Application Attempt Id for a given job. + */ + private static int getAppAttemptId(JobContext context) { + return context.getConfiguration().getInt( + MRJobConfig.APPLICATION_ATTEMPT_ID, 0); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context the context of the job. This is used to get the + * application attempt id. + * @return the path to store job attempt data. + */ + public Path getJobAttemptPath(JobContext context) { + return getJobAttemptPath(context, getOutputPath()); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param context the context of the job. This is used to get the + * application attempt id. + * @param out the output path to place these in. + * @return the path to store job attempt data. + */ + public static Path getJobAttemptPath(JobContext context, Path out) { + return getJobAttemptPath(getAppAttemptId(context), out); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path to store job attempt data. + */ + private Path getJobAttemptPath(int appAttemptId) { + return getJobAttemptPath(appAttemptId, getOutputPath()); + } + + /** + * Compute the path where the output of a given job attempt will be placed. + * @param appAttemptId the ID of the application attempt for this job. + * @return the path to store job attempt data. + */ + private static Path getJobAttemptPath(int appAttemptId, Path out) { + return new Path(getPendingJobAttemptsPath(out), String.valueOf(appAttemptId)); + } + + /** + * Compute the path where the output of pending task attempts are stored. + * @param context the context of the job with pending tasks. + * @return the path where the output of pending task attempts are stored. + */ + private Path getPendingTaskAttemptsPath(JobContext context) { + return getPendingTaskAttemptsPath(context, getOutputPath()); + } + + /** + * Compute the path where the output of pending task attempts are stored. + * @param context the context of the job with pending tasks. + * @return the path where the output of pending task attempts are stored. + */ + private static Path getPendingTaskAttemptsPath(JobContext context, Path out) { + return new Path(getJobAttemptPath(context, out), PENDING_DIR_NAME); + } + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. + * + * @param context the context of the task attempt. + * @return the path where a task attempt should be stored. + */ + public Path getTaskAttemptPath(TaskAttemptContext context) { + return new Path(getPendingTaskAttemptsPath(context), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Compute the path where the output of a task attempt is stored until + * that task is committed. + * + * @param context the context of the task attempt. + * @param out The output path to put things in. + * @return the path where a task attempt should be stored. + */ + public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) { + return new Path(getPendingTaskAttemptsPath(context, out), + String.valueOf(context.getTaskAttemptID())); + } + + /** + * Compute the path where the output of a committed task is stored until + * the entire job is committed. + * @param context the context of the task attempt + * @return the path where the output of a committed task is stored until + * the entire job is committed. + */ + public Path getCommittedTaskPath(TaskAttemptContext context) { + return getCommittedTaskPath(getAppAttemptId(context), context); + } + + public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { + return getCommittedTaskPath(getAppAttemptId(context), context, out); + } + + /** + * Compute the path where the output of a committed task is stored until the + * entire job is committed for a specific application attempt. + * @param appAttemptId the id of the application attempt to use + * @param context the context of any task. + * @return the path where the output of a committed task is stored. + */ + private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { + return new Path(getJobAttemptPath(appAttemptId), + String.valueOf(context.getTaskAttemptID().getTaskID())); + } + + private static Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context, Path out) { + return new Path(getJobAttemptPath(appAttemptId, out), + String.valueOf(context.getTaskAttemptID().getTaskID())); + } + + private static class CommittedTaskFilter implements PathFilter { + @Override + public boolean accept(Path path) { + return !PENDING_DIR_NAME.equals(path.getName()); + } + } + + /** + * Get a list of all paths where output from committed tasks are stored. + * @param context the context of the current job + * @return the list of these Paths/FileStatuses. + * @throws IOException + */ + private FileStatus[] getAllCommittedTaskPaths(JobContext context) + throws IOException { + Path jobAttemptPath = getJobAttemptPath(context); + FileSystem fs = jobAttemptPath.getFileSystem(context.getConfiguration()); + return fs.listStatus(jobAttemptPath, new CommittedTaskFilter()); + } + + /** + * Get the directory that the task should write results into. + * @return the work directory + * @throws IOException + */ + public Path getWorkPath() throws IOException { + return workPath; + } /** * Create the temporary directory that is the root of all of the task @@ -79,116 +277,103 @@ public class FileOutputCommitter extends * @param context the job's context */ public void setupJob(JobContext context) throws IOException { - if (outputPath != null) { - Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + - Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); - if (!fileSys.mkdirs(tmpDir)) { - LOG.error("Mkdirs failed to create " + tmpDir.toString()); + if (hasOutputPath()) { + Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); + FileSystem fs = pendingJobAttemptsPath.getFileSystem( + context.getConfiguration()); + if (!fs.mkdirs(pendingJobAttemptsPath)) { + LOG.error("Mkdirs failed to create " + pendingJobAttemptsPath); } - } - } - - // True if the job requires output.dir marked on successful job. - // Note that by default it is set to true. - private boolean shouldMarkOutputDir(Configuration conf) { - return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true); - } - - // Create a _success file in the job's output dir - private void markOutputDirSuccessful(MRJobConfig context) throws IOException { - if (outputPath != null) { - // create a file in the output folder to mark the job completion - Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); - outputFileSystem.create(filePath).close(); + } else { + LOG.warn("Output Path is null in setupJob()"); } } /** - * Move all job output to the final place. + * The job has completed so move all committed tasks to the final output dir. * Delete the temporary directory, including all of the work directories. * Create a _SUCCESS file to make it as successful. * @param context the job's context */ public void commitJob(JobContext context) throws IOException { - if (outputPath != null) { - //delete the task temp directory from the current jobtempdir - Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) + - Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } else { - LOG.warn("Task temp dir could not be deleted " + tmpDir); + if (hasOutputPath()) { + Path finalOutput = getOutputPath(); + FileSystem fs = finalOutput.getFileSystem(context.getConfiguration()); + for(FileStatus stat: getAllCommittedTaskPaths(context)) { + mergePaths(fs, stat, finalOutput); } - //move the job output to final place - Path jobOutputPath = - new Path(outputPath, getJobAttemptBaseDirName(context)); - moveJobOutputs(outputFileSystem, jobOutputPath, outputPath, jobOutputPath); - // delete the _temporary folder and create a _done file in the o/p folder cleanupJob(context); - if (shouldMarkOutputDir(context.getConfiguration())) { - markOutputDirSuccessful(context); + // True if the job requires output.dir marked on successful job. + // Note that by default it is set to true. + if (context.getConfiguration().getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)) { + Path markerPath = new Path(outputPath, SUCCEEDED_FILE_NAME); + fs.create(markerPath).close(); } + } else { + LOG.warn("Output Path is null in commitJob()"); } } /** - * Move job output to final location - * @param fs Filesystem handle - * @param origJobOutputPath The original location of the job output - * Required to generate the relative path for correct moving of data. - * @param finalOutputDir The final output directory to which the job output - * needs to be moved - * @param jobOutput The current job output directory being moved - * @throws IOException - */ - private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath, - Path finalOutputDir, Path jobOutput) throws IOException { - LOG.debug("Told to move job output from " + jobOutput - + " to " + finalOutputDir + - " and orig job output path is " + origJobOutputPath); - if (fs.isFile(jobOutput)) { - Path finalOutputPath = - getFinalPath(finalOutputDir, jobOutput, origJobOutputPath); - if (!fs.rename(jobOutput, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new IOException("Failed to delete earlier output of job"); - } - if (!fs.rename(jobOutput, finalOutputPath)) { - throw new IOException("Failed to save output of job"); - } - } - LOG.debug("Moved job output file from " + jobOutput + " to " + - finalOutputPath); - } else if (fs.getFileStatus(jobOutput).isDirectory()) { - LOG.debug("Job output file " + jobOutput + " is a dir"); - FileStatus[] paths = fs.listStatus(jobOutput); - Path finalOutputPath = - getFinalPath(finalOutputDir, jobOutput, origJobOutputPath); - fs.mkdirs(finalOutputPath); - LOG.debug("Creating dirs along job output path " + finalOutputPath); - if (paths != null) { - for (FileStatus path : paths) { - moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath()); - } - } - } + * Merge two paths together. Anything in from will be moved into to, if there + * are any name conflicts while merging the files or directories in from win. + * @param fs the File System to use + * @param from the path data is coming from. + * @param to the path data is going to. + * @throws IOException on any error + */ + private static void mergePaths(FileSystem fs, final FileStatus from, + final Path to) + throws IOException { + LOG.debug("Merging data from "+from+" to "+to); + if(from.isFile()) { + if(fs.exists(to)) { + if(!fs.delete(to, true)) { + throw new IOException("Failed to delete "+to); + } + } + + if(!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename "+from+" to "+to); + } + } else if(from.isDirectory()) { + if(fs.exists(to)) { + FileStatus toStat = fs.getFileStatus(to); + if(!toStat.isDirectory()) { + if(!fs.delete(to, true)) { + throw new IOException("Failed to delete "+to); + } + if(!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename "+from+" to "+to); + } + } else { + //It is a directory so merge everything in the directories + for(FileStatus subFrom: fs.listStatus(from.getPath())) { + Path subTo = new Path(to, subFrom.getPath().getName()); + mergePaths(fs, subFrom, subTo); + } + } + } else { + //it does not exist just rename + if(!fs.rename(from.getPath(), to)) { + throw new IOException("Failed to rename "+from+" to "+to); + } + } + } } @Override @Deprecated public void cleanupJob(JobContext context) throws IOException { - if (outputPath != null) { - Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration()); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } + if (hasOutputPath()) { + Path pendingJobAttemptsPath = getPendingJobAttemptsPath(); + FileSystem fs = pendingJobAttemptsPath + .getFileSystem(context.getConfiguration()); + fs.delete(pendingJobAttemptsPath, true); } else { - LOG.warn("Output Path is null in cleanup"); + LOG.warn("Output Path is null in cleanupJob()"); } } @@ -217,69 +402,40 @@ public class FileOutputCommitter extends * Move the files from the work directory to the job output directory * @param context the task context */ + @Override public void commitTask(TaskAttemptContext context) throws IOException { - TaskAttemptID attemptId = context.getTaskAttemptID(); - if (workPath != null) { - context.progress(); - if (outputFileSystem.exists(workPath)) { - // Move the task outputs to the current job attempt output dir - Path jobOutputPath = - new Path(outputPath, getJobAttemptBaseDirName(context)); - moveTaskOutputs(context, outputFileSystem, jobOutputPath, workPath); - // Delete the temporary task-specific output directory - if (!outputFileSystem.delete(workPath, true)) { - LOG.warn("Failed to delete the temporary output" + - " directory of task: " + attemptId + " - " + workPath); - } - LOG.info("Saved output of task '" + attemptId + "' to " + - jobOutputPath); - } - } + commitTask(context, null); } - /** - * Move all of the files from the work directory to the final output - * @param context the task context - * @param fs the output file system - * @param jobOutputDir the final output direcotry - * @param taskOutput the work path - * @throws IOException - */ - private void moveTaskOutputs(TaskAttemptContext context, - FileSystem fs, - Path jobOutputDir, - Path taskOutput) + @Private + public void commitTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { TaskAttemptID attemptId = context.getTaskAttemptID(); - context.progress(); - LOG.debug("Told to move taskoutput from " + taskOutput - + " to " + jobOutputDir); - if (fs.isFile(taskOutput)) { - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, - workPath); - if (!fs.rename(taskOutput, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new IOException("Failed to delete earlier output of task: " + - attemptId); - } - if (!fs.rename(taskOutput, finalOutputPath)) { - throw new IOException("Failed to save output of task: " + - attemptId); - } + if (hasOutputPath()) { + context.progress(); + if(taskAttemptPath == null) { + taskAttemptPath = getTaskAttemptPath(context); } - LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); - } else if(fs.getFileStatus(taskOutput).isDirectory()) { - LOG.debug("Taskoutput " + taskOutput + " is a dir"); - FileStatus[] paths = fs.listStatus(taskOutput); - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, workPath); - fs.mkdirs(finalOutputPath); - LOG.debug("Creating dirs along path " + finalOutputPath); - if (paths != null) { - for (FileStatus path : paths) { - moveTaskOutputs(context, fs, jobOutputDir, path.getPath()); + Path committedTaskPath = getCommittedTaskPath(context); + FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); + if (fs.exists(taskAttemptPath)) { + if(fs.exists(committedTaskPath)) { + if(!fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete " + committedTaskPath); + } + } + if(!fs.rename(taskAttemptPath, committedTaskPath)) { + throw new IOException("Could not rename " + taskAttemptPath + " to " + + committedTaskPath); } + LOG.info("Saved output of task '" + attemptId + "' to " + + committedTaskPath); + } else { + LOG.warn("No Output found for " + attemptId); } + } else { + LOG.warn("Output Path is null in commitTask()"); } } @@ -289,38 +445,22 @@ public class FileOutputCommitter extends */ @Override public void abortTask(TaskAttemptContext context) throws IOException { - if (workPath != null) { - context.progress(); - outputFileSystem.delete(workPath, true); - } + abortTask(context, null); } - /** - * Find the final name of a given output file, given the job output directory - * and the work directory. - * @param jobOutputDir the job's output directory - * @param taskOutput the specific task output file - * @param taskOutputPath the job's work directory - * @return the final path for the specific output file - * @throws IOException - */ - private Path getFinalPath(Path jobOutputDir, Path taskOutput, - Path taskOutputPath) throws IOException { - URI taskOutputUri = taskOutput.makeQualified(outputFileSystem.getUri(), - outputFileSystem.getWorkingDirectory()).toUri(); - URI taskOutputPathUri = - taskOutputPath.makeQualified( - outputFileSystem.getUri(), - outputFileSystem.getWorkingDirectory()).toUri(); - URI relativePath = taskOutputPathUri.relativize(taskOutputUri); - if (taskOutputUri == relativePath) { - throw new IOException("Can not get the relative path: base = " + - taskOutputPathUri + " child = " + taskOutputUri); - } - if (relativePath.getPath().length() > 0) { - return new Path(jobOutputDir, relativePath.getPath()); + @Private + public void abortTask(TaskAttemptContext context, Path taskAttemptPath) throws IOException { + if (hasOutputPath()) { + context.progress(); + if(taskAttemptPath == null) { + taskAttemptPath = getTaskAttemptPath(context); + } + FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); + if(!fs.delete(taskAttemptPath, true)) { + LOG.warn("Could not delete "+taskAttemptPath); + } } else { - return jobOutputDir; + LOG.warn("Output Path is null in abortTask()"); } } @@ -331,16 +471,20 @@ public class FileOutputCommitter extends @Override public boolean needsTaskCommit(TaskAttemptContext context ) throws IOException { - return workPath != null && outputFileSystem.exists(workPath); + return needsTaskCommit(context, null); } - /** - * Get the directory that the task should write results into - * @return the work directory - * @throws IOException - */ - public Path getWorkPath() throws IOException { - return workPath; + @Private + public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath + ) throws IOException { + if(hasOutputPath()) { + if(taskAttemptPath == null) { + taskAttemptPath = getTaskAttemptPath(context); + } + FileSystem fs = taskAttemptPath.getFileSystem(context.getConfiguration()); + return fs.exists(taskAttemptPath); + } + return false; } @Override @@ -352,43 +496,35 @@ public class FileOutputCommitter extends public void recoverTask(TaskAttemptContext context) throws IOException { context.progress(); - Path jobOutputPath = - new Path(outputPath, getJobAttemptBaseDirName(context)); - int previousAttempt = - context.getConfiguration().getInt( - MRJobConfig.APPLICATION_ATTEMPT_ID, 0) - 1; + TaskAttemptID attemptId = context.getTaskAttemptID(); + int previousAttempt = getAppAttemptId(context) - 1; if (previousAttempt < 0) { throw new IOException ("Cannot recover task output for first attempt..."); } - - Path pathToRecover = - new Path(outputPath, getJobAttemptBaseDirName(previousAttempt)); - LOG.debug("Trying to recover task from " + pathToRecover - + " into " + jobOutputPath); - if (outputFileSystem.exists(pathToRecover)) { - // Move the task outputs to their final place - moveJobOutputs(outputFileSystem, - pathToRecover, jobOutputPath, pathToRecover); - LOG.info("Saved output of job to " + jobOutputPath); + + Path committedTaskPath = getCommittedTaskPath(context); + Path previousCommittedTaskPath = getCommittedTaskPath( + previousAttempt, context); + FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration()); + + LOG.debug("Trying to recover task from " + previousCommittedTaskPath + + " into " + committedTaskPath); + if (fs.exists(previousCommittedTaskPath)) { + if(fs.exists(committedTaskPath)) { + if(!fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete "+committedTaskPath); + } + } + //Rename can fail if the parent directory does not yet exist. + Path committedParent = committedTaskPath.getParent(); + fs.mkdirs(committedParent); + if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) { + throw new IOException("Could not rename " + previousCommittedTaskPath + + " to " + committedTaskPath); + } + LOG.info("Saved output of " + attemptId + " to " + committedTaskPath); + } else { + LOG.warn(attemptId+" had no output to recover."); } } - - protected static String getJobAttemptBaseDirName(JobContext context) { - int appAttemptId = - context.getConfiguration().getInt( - MRJobConfig.APPLICATION_ATTEMPT_ID, 0); - return getJobAttemptBaseDirName(appAttemptId); - } - - protected static String getJobAttemptBaseDirName(int appAttemptId) { - return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - + appAttemptId; - } - - protected static String getTaskAttemptBaseDirName( - TaskAttemptContext context) { - return getJobAttemptBaseDirName(context) + Path.SEPARATOR + - FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - "_" + context.getTaskAttemptID().toString(); - } } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb 4 00:06:24 2012 @@ -105,10 +105,9 @@ public class TestFileOutputCommitter ext // do commit committer.commitTask(tContext); - Path jobTempDir1 = new Path(outDir, - FileOutputCommitter.getJobAttemptBaseDirName( - conf.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0))); - assertTrue((new File(jobTempDir1.toString()).exists())); + Path jobTempDir1 = committer.getCommittedTaskPath(tContext); + File jtd1 = new File(jobTempDir1.toUri().getPath()); + assertTrue(jtd1.exists()); validateContent(jobTempDir1); //now while running the second app attempt, @@ -119,14 +118,12 @@ public class TestFileOutputCommitter ext JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID()); TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID); FileOutputCommitter committer2 = new FileOutputCommitter(); - committer.setupJob(jContext2); - Path jobTempDir2 = new Path(outDir, - FileOutputCommitter.getJobAttemptBaseDirName( - conf2.getInt(MRConstants.APPLICATION_ATTEMPT_ID, 0))); - assertTrue((new File(jobTempDir2.toString()).exists())); + committer2.setupJob(jContext2); + Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2); - tContext2.getConfiguration().setInt(MRConstants.APPLICATION_ATTEMPT_ID, 2); committer2.recoverTask(tContext2); + File jtd2 = new File(jobTempDir2.toUri().getPath()); + assertTrue(jtd2.exists()); validateContent(jobTempDir2); committer2.commitJob(jContext2); @@ -135,7 +132,8 @@ public class TestFileOutputCommitter ext } private void validateContent(Path dir) throws IOException { - File expectedFile = new File(new Path(dir, partFile).toString()); + File fdir = new File(dir.toUri().getPath()); + File expectedFile = new File(fdir, partFile); StringBuffer expectedOutput = new StringBuffer(); expectedOutput.append(key1).append('\t').append(val1).append("\n"); expectedOutput.append(val1).append("\n"); @@ -244,21 +242,17 @@ public class TestFileOutputCommitter ext // do abort committer.abortTask(tContext); - FileSystem outputFileSystem = outDir.getFileSystem(conf); - Path workPath = new Path(outDir, - committer.getTaskAttemptBaseDirName(tContext)) - .makeQualified(outputFileSystem); - File expectedFile = new File(new Path(workPath, partFile) - .toString()); + File out = new File(outDir.toUri().getPath()); + Path workPath = committer.getWorkPath(tContext, outDir); + File wp = new File(workPath.toUri().getPath()); + File expectedFile = new File(wp, partFile); assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); - expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME) - .toString()); + expectedFile = new File(out, FileOutputCommitter.TEMP_DIR_NAME); assertFalse("job temp dir still exists", expectedFile.exists()); - assertEquals("Output directory not empty", 0, new File(outDir.toString()) - .listFiles().length); - FileUtil.fullyDelete(new File(outDir.toString())); + assertEquals("Output directory not empty", 0, out.listFiles().length); + FileUtil.fullyDelete(out); } public static class FakeFileSystem extends RawLocalFileSystem { Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java Sat Feb 4 00:06:24 2012 @@ -60,6 +60,22 @@ public class TestFileOutputCommitter ext private Text val2 = new Text("val2"); + private static void cleanup() throws IOException { + Configuration conf = new Configuration(); + FileSystem fs = outDir.getFileSystem(conf); + fs.delete(outDir, true); + } + + @Override + public void setUp() throws IOException { + cleanup(); + } + + @Override + public void tearDown() throws IOException { + cleanup(); + } + private void writeOutput(RecordWriter theRecordWriter, TaskAttemptContext context) throws IOException, InterruptedException { NullWritable nullWritable = NullWritable.get(); @@ -114,11 +130,10 @@ public class TestFileOutputCommitter ext // do commit committer.commitTask(tContext); - Path jobTempDir1 = new Path(outDir, - FileOutputCommitter.getJobAttemptBaseDirName( - conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0))); - assertTrue((new File(jobTempDir1.toString()).exists())); - validateContent(jobTempDir1); + Path jobTempDir1 = committer.getCommittedTaskPath(tContext); + File jtd = new File(jobTempDir1.toUri().getPath()); + assertTrue(jtd.exists()); + validateContent(jtd); //now while running the second app attempt, //recover the task output from first attempt @@ -128,15 +143,13 @@ public class TestFileOutputCommitter ext JobContext jContext2 = new JobContextImpl(conf2, taskID.getJobID()); TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2, taskID); FileOutputCommitter committer2 = new FileOutputCommitter(outDir, tContext2); - committer.setupJob(tContext2); - Path jobTempDir2 = new Path(outDir, - FileOutputCommitter.getJobAttemptBaseDirName( - conf2.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0))); - assertTrue((new File(jobTempDir2.toString()).exists())); + committer2.setupJob(tContext2); + Path jobTempDir2 = committer2.getCommittedTaskPath(tContext2); + File jtd2 = new File(jobTempDir2.toUri().getPath()); - tContext2.getConfiguration().setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2); committer2.recoverTask(tContext2); - validateContent(jobTempDir2); + assertTrue(jtd2.exists()); + validateContent(jtd2); committer2.commitJob(jContext2); validateContent(outDir); @@ -144,7 +157,12 @@ public class TestFileOutputCommitter ext } private void validateContent(Path dir) throws IOException { - File expectedFile = new File(new Path(dir, partFile).toString()); + validateContent(new File(dir.toUri().getPath())); + } + + private void validateContent(File dir) throws IOException { + File expectedFile = new File(dir, partFile); + assertTrue("Could not find "+expectedFile, expectedFile.exists()); StringBuffer expectedOutput = new StringBuffer(); expectedOutput.append(key1).append('\t').append(val1).append("\n"); expectedOutput.append(val1).append("\n"); @@ -259,7 +277,7 @@ public class TestFileOutputCommitter ext assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); - expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME) + expectedFile = new File(new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME) .toString()); assertFalse("job temp dir still exists", expectedFile.exists()); assertEquals("Output directory not empty", 0, new File(outDir.toString()) @@ -315,12 +333,10 @@ public class TestFileOutputCommitter ext assertNotNull(th); assertTrue(th instanceof IOException); assertTrue(th.getMessage().contains("fake delete failed")); - File jobTmpDir = new File(new Path(outDir, - FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - conf.getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0) + - Path.SEPARATOR + - FileOutputCommitter.TEMP_DIR_NAME).toString()); - File taskTmpDir = new File(jobTmpDir, "_" + taskID); + Path jtd = committer.getJobAttemptPath(jContext); + File jobTmpDir = new File(jtd.toUri().getPath()); + Path ttd = committer.getTaskAttemptPath(tContext); + File taskTmpDir = new File(ttd.toUri().getPath()); File expectedFile = new File(taskTmpDir, partFile); assertTrue(expectedFile + " does not exists", expectedFile.exists()); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java Sat Feb 4 00:06:24 2012 @@ -74,7 +74,7 @@ public class TestFileOutputCommitter ext TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, - committer.getTempTaskOutputPath(tContext)); + committer.getTaskAttemptPath(tContext)); committer.setupJob(jContext); committer.setupTask(tContext); @@ -115,7 +115,7 @@ public class TestFileOutputCommitter ext TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, committer - .getTempTaskOutputPath(tContext)); + .getTaskAttemptPath(tContext)); // do setup committer.setupJob(jContext); @@ -134,13 +134,13 @@ public class TestFileOutputCommitter ext // do abort committer.abortTask(tContext); File expectedFile = new File(new Path(committer - .getTempTaskOutputPath(tContext), file).toString()); + .getTaskAttemptPath(tContext), file).toString()); assertFalse("task temp dir still exists", expectedFile.exists()); committer.abortJob(jContext, JobStatus.State.FAILED); expectedFile = new File(new Path(outDir, FileOutputCommitter.TEMP_DIR_NAME) .toString()); - assertFalse("job temp dir still exists", expectedFile.exists()); + assertFalse("job temp dir "+expectedFile+" still exists", expectedFile.exists()); assertEquals("Output directory not empty", 0, new File(outDir.toString()) .listFiles().length); FileUtil.fullyDelete(new File(outDir.toString())); @@ -170,16 +170,15 @@ public class TestFileOutputCommitter ext TaskAttemptContext tContext = new TaskAttemptContextImpl(job, taskID); FileOutputCommitter committer = new FileOutputCommitter(); FileOutputFormat.setWorkOutputPath(job, committer - .getTempTaskOutputPath(tContext)); + .getTaskAttemptPath(tContext)); // do setup committer.setupJob(jContext); committer.setupTask(tContext); String file = "test.txt"; - String taskBaseDirName = committer.getTaskAttemptBaseDirName(tContext); - File jobTmpDir = new File(outDir.toString(), committer.getJobAttemptBaseDirName(jContext)); - File taskTmpDir = new File(outDir.toString(), taskBaseDirName); + File jobTmpDir = new File(committer.getJobAttemptPath(jContext).toUri().getPath()); + File taskTmpDir = new File(committer.getTaskAttemptPath(tContext).toUri().getPath()); File expectedFile = new File(taskTmpDir, file); // A reporter that does nothing Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java?rev=1240414&r1=1240413&r2=1240414&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestTaskCommit.java Sat Feb 4 00:06:24 2012 @@ -34,7 +34,7 @@ public class TestTaskCommit extends Hado static class CommitterWithCommitFail extends FileOutputCommitter { public void commitTask(TaskAttemptContext context) throws IOException { - Path taskOutputPath = getTempTaskOutputPath(context); + Path taskOutputPath = getTaskAttemptPath(context); TaskAttemptID attemptId = context.getTaskAttemptID(); JobConf job = context.getJobConf(); if (taskOutputPath != null) {