Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E38BFDD6E for ; Mon, 13 Aug 2012 18:35:22 +0000 (UTC) Received: (qmail 28581 invoked by uid 500); 13 Aug 2012 18:35:22 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 28540 invoked by uid 500); 13 Aug 2012 18:35:22 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 28533 invoked by uid 99); 13 Aug 2012 18:35:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Aug 2012 18:35:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 13 Aug 2012 18:35:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 587D223888E4 for ; Mon, 13 Aug 2012 18:34:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1372541 - in /hadoop/common/branches/branch-1: ./ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapreduce/ Date: Mon, 13 Aug 2012 18:34:33 -0000 To: common-commits@hadoop.apache.org From: tomwhite@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120813183434.587D223888E4@eris.apache.org> Author: tomwhite Date: Mon Aug 13 18:34:33 2012 New Revision: 1372541 URL: http://svn.apache.org/viewvc?rev=1372541&view=rev Log: MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks should be optional) to branch-1. Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java (with props) Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/mapred/mapred-default.xml hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1372541&r1=1372540&r2=1372541&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Mon Aug 13 18:34:33 2012 @@ -27,6 +27,9 @@ Release 1.2.0 - unreleased module external to HDFS to specify how HDFS blocks should be placed. (Sumadhur Reddy Bolli via szetszwo) + MAPREDUCE-4488. Port MAPREDUCE-463 (The job setup and cleanup tasks + should be optional) to branch-1. (tomwhite) + IMPROVEMENTS HDFS-3515. Port HDFS-1457 to branch-1. (eli) Modified: hadoop/common/branches/branch-1/src/mapred/mapred-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/mapred-default.xml?rev=1372541&r1=1372540&r2=1372541&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/mapred-default.xml (original) +++ hadoop/common/branches/branch-1/src/mapred/mapred-default.xml Mon Aug 13 18:34:33 2012 @@ -35,6 +35,14 @@ + + mapred.committer.job.setup.cleanup.needed + true + true, if job needs job-setup and job-cleanup. + false, otherwise + + + Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1372541&r1=1372540&r2=1372541&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Aug 13 18:34:33 2012 @@ -131,6 +131,7 @@ public class JobInProgress { private volatile boolean launchedSetup = false; private volatile boolean jobKilled = false; private volatile boolean jobFailed = false; + private boolean jobSetupCleanupNeeded = true; JobPriority priority = JobPriority.NORMAL; final JobTracker jobtracker; @@ -361,6 +362,8 @@ public class JobInProgress { this.taskCompletionEvents = new ArrayList (numMapTasks + numReduceTasks + 10); + JobContext jobContext = new JobContext(conf, jobId); + this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded(); try { this.userUGI = UserGroupInformation.getCurrentUser(); } catch (IOException ie){ @@ -449,6 +452,9 @@ public class JobInProgress { this.taskCompletionEvents = new ArrayList (numMapTasks + numReduceTasks + 10); + + JobContext jobContext = new JobContext(conf, jobId); + this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded(); // Construct the jobACLs status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf)); @@ -757,7 +763,35 @@ public class JobInProgress { // ... use the same for estimating the total output of all maps resourceEstimator.setThreshhold(completedMapsForReduceSlowstart); + + initSetupCleanupTasks(jobFile); + + synchronized(jobInitKillStatus){ + jobInitKillStatus.initDone = true; + + // set this before the throw to make sure cleanup works properly + tasksInited = true; + + if(jobInitKillStatus.killed) { + throw new KillInterruptedException("Job " + jobId + " killed in init"); + } + } + + JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, + numMapTasks, numReduceTasks); + // if setup is not needed, mark it complete + if (!jobSetupCleanupNeeded) { + setupComplete(); + } + } + + private void initSetupCleanupTasks(String jobFile) { + if (!jobSetupCleanupNeeded) { + // nothing to initialize + return; + } + // create cleanup two cleanup tips, one map and one reduce. cleanup = new TaskInProgress[2]; @@ -787,25 +821,23 @@ public class JobInProgress { numReduceTasks + 1, jobtracker, conf, this, 1); setup[1].setJobSetupTask(); - synchronized(jobInitKillStatus){ - jobInitKillStatus.initDone = true; - - // set this before the throw to make sure cleanup works properly - tasksInited = true; - - if(jobInitKillStatus.killed) { - throw new KillInterruptedException("Job " + jobId + " killed in init"); - } - } - - JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, - numMapTasks, numReduceTasks); - // Log the number of map and reduce tasks LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks + " map tasks and " + numReduceTasks + " reduce tasks."); } + private void setupComplete() { + status.setSetupProgress(1.0f); + if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded) { + jobComplete(); + return; + } + if (this.status.getRunState() == JobStatus.PREP) { + this.status.setRunState(JobStatus.RUNNING); + JobHistory.JobInfo.logStarted(profile.getJobID()); + } + } + TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) throws IOException { TaskSplitMetaInfo[] allTaskSplitMetaInfo = @@ -2622,13 +2654,7 @@ public class JobInProgress { if (tip.isJobSetupTask()) { // setup task has finished. kill the extra setup tip killSetupTip(!tip.isMapTask()); - // Job can start running now. - this.status.setSetupProgress(1.0f); - // move the job to running state if the job is in prep state - if (this.status.getRunState() == JobStatus.PREP) { - changeStateTo(JobStatus.RUNNING); - JobHistory.JobInfo.logStarted(profile.getJobID()); - } + setupComplete(); } else if (tip.isJobCleanupTask()) { // cleanup task has finished. Kill the extra cleanup tip if (tip.isMapTask()) { @@ -2687,6 +2713,9 @@ public class JobInProgress { } } } + if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) { + jobComplete(); + } return true; } @@ -2747,7 +2776,8 @@ public class JobInProgress { // // All tasks are complete, then the job is done! // - if (this.status.getRunState() == JobStatus.RUNNING ) { + if (this.status.getRunState() == JobStatus.RUNNING || + this.status.getRunState() == JobStatus.PREP) { changeStateTo(JobStatus.SUCCEEDED); this.status.setCleanupProgress(1.0f); if (maps.length == 0) { @@ -2881,6 +2911,9 @@ public class JobInProgress { for (int i = 0; i < reduces.length; i++) { reduces[i].kill(); } + if (!jobSetupCleanupNeeded) { + terminateJob(jobTerminationState); + } } } @@ -3169,7 +3202,9 @@ public class JobInProgress { } boolean isSetupFinished() { - if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete() + // if there is no setup to be launched, consider setup is finished. + if ((tasksInited && setup.length == 0) || + setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete() || setup[1].isFailed()) { return true; } @@ -3283,10 +3318,12 @@ public class JobInProgress { */ public synchronized TaskInProgress getTaskInProgress(TaskID tipid) { if (tipid.isMap()) { - if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip + // cleanup map tip + if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) { return cleanup[0]; } - if (tipid.equals(setup[0].getTIPId())) { //setup map tip + // setup map tip + if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) { return setup[0]; } for (int i = 0; i < maps.length; i++) { @@ -3295,10 +3332,12 @@ public class JobInProgress { } } } else { - if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip + // cleanup reduce tip + if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) { return cleanup[1]; } - if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip + // setup reduce tip + if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) { return setup[1]; } for (int i = 0; i < reduces.length; i++) { Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java?rev=1372541&r1=1372540&r2=1372541&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/Job.java Mon Aug 13 18:34:33 2012 @@ -349,6 +349,18 @@ public class Job extends JobContext { } /** + * Specify whether job-setup and job-cleanup is needed for the job + * + * @param needed If true, job-setup and job-cleanup will be + * considered from {@link OutputCommitter} + * else ignored. + */ + public void setJobSetupCleanupNeeded(boolean needed) { + ensureState(JobState.DEFINE); + conf.setBoolean("mapred.committer.job.setup.cleanup.needed", needed); + } + + /** * Get the URL where some job progress information will be displayed. * * @return the URL where some job progress information will be displayed. Modified: hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java?rev=1372541&r1=1372540&r2=1372541&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java (original) +++ hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapreduce/JobContext.java Mon Aug 13 18:34:33 2012 @@ -275,4 +275,13 @@ public class JobContext { public RawComparator getGroupingComparator() { return conf.getOutputValueGroupingComparator(); } + + /** + * Get whether job-setup and job-cleanup is needed for the job + * + * @return boolean + */ + public boolean getJobSetupCleanupNeeded() { + return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true); + } } Added: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java?rev=1372541&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java (added) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java Mon Aug 13 18:34:33 2012 @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.hadoop.mapred.HadoopTestCase; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +public class TestNoJobSetupCleanup extends HadoopTestCase { + private static String TEST_ROOT_DIR = + new File(System.getProperty("test.build.data","/tmp")) + .toURI().toString().replace(' ', '+'); + private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input"); + private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output"); + + public TestNoJobSetupCleanup() throws IOException { + super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2); + } + + private Job submitAndValidateJob(Configuration conf, int numMaps, int numReds) + throws IOException, InterruptedException, ClassNotFoundException { + Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, + numMaps, numReds); + job.setJobSetupCleanupNeeded(false); + job.setOutputFormatClass(MyOutputFormat.class); + job.waitForCompletion(true); + assertTrue(job.isSuccessful()); + JobID jobid = (org.apache.hadoop.mapred.JobID) job.getJobID(); + JobClient jc = new JobClient(new JobConf(conf)); + assertTrue(jc.getSetupTaskReports(jobid).length == 0); + assertTrue(jc.getCleanupTaskReports(jobid).length == 0); + assertTrue(jc.getMapTaskReports(jobid).length == numMaps); + assertTrue(jc.getReduceTaskReports(jobid).length == numReds); + FileSystem fs = FileSystem.get(conf); + assertTrue("Job output directory doesn't exit!", fs.exists(outDir)); + FileStatus[] list = fs.listStatus(outDir, new OutputFilter()); + int numPartFiles = numReds == 0 ? numMaps : numReds; + assertTrue("Number of part-files is " + list.length + " and not " + + numPartFiles, list.length == numPartFiles); + return job; + } + + public void testNoJobSetupCleanup() throws Exception { + try { + Configuration conf = createJobConf(); + + // run a job without job-setup and cleanup + submitAndValidateJob(conf, 1, 1); + + // run a map only job. + submitAndValidateJob(conf, 1, 0); + + // run empty job without job setup and cleanup + submitAndValidateJob(conf, 0, 0); + + // run empty job without job setup and cleanup, with non-zero reduces + submitAndValidateJob(conf, 0, 1); + } finally { + tearDown(); + } + } + + static class MyOutputFormat extends TextOutputFormat { + public void checkOutputSpecs(JobContext job) + throws FileAlreadyExistsException, IOException{ + super.checkOutputSpecs(job); + // creating dummy TaskAttemptID + TaskAttemptID tid = new TaskAttemptID("jt", 1, false, 0, 0); + getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), tid)). + setupJob(job); + } + } + + private static class OutputFilter implements PathFilter { + public boolean accept(Path path) { + return !(path.getName().startsWith("_")); + } + } +} \ No newline at end of file Propchange: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java ------------------------------------------------------------------------------ svn:eol-style = native