Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 30684 invoked from network); 3 Jul 2009 06:11:05 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 3 Jul 2009 06:11:05 -0000 Received: (qmail 39438 invoked by uid 500); 3 Jul 2009 06:11:16 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 39406 invoked by uid 500); 3 Jul 2009 06:11:16 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 39396 invoked by uid 99); 3 Jul 2009 06:11:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2009 06:11:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 Jul 2009 06:11:05 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6406D23888C8; Fri, 3 Jul 2009 06:10:44 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r790797 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java Date: Fri, 03 Jul 2009 06:10:44 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090703061044.6406D23888C8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sharad Date: Fri Jul 3 06:10:43 2009 New Revision: 790797 URL: http://svn.apache.org/viewvc?rev=790797&view=rev Log: MAPREDUCE-625. Modify TestTaskLimits to improve execution time. Contributed by Jothi Padmanabhan. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=790797&r1=790796&r2=790797&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jul 3 06:10:43 2009 @@ -58,6 +58,9 @@ MAPREDUCE-686. Move TestSpeculativeExecution.Fake* into a separate class so that it can be used by other tests. (Jothi Padmanabhan via sharad) + MAPREDUCE-625. Modify TestTaskLimits to improve execution time. + (Jothi Padmanabhan via sharad) + BUG FIXES HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under Hadoop (Owen O'Malley) Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=790797&r1=790796&r2=790797&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jul 3 06:10:43 2009 @@ -536,9 +536,8 @@ LOG.info("Initializing " + jobId); - // log job info - JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), - this.startTime, hasRestarted()); + logToJobHistory(); + // log the job priority setPriority(this.priority); @@ -547,39 +546,16 @@ // String jobFile = profile.getJobFile(); - DataInputStream splitFile = - fs.open(new Path(conf.get("mapred.job.split.file"))); - JobClient.RawSplit[] splits; - try { - splits = JobClient.readSplitFile(splitFile); - } finally { - splitFile.close(); - } + JobClient.RawSplit[] splits = createSplits(); numMapTasks = splits.length; - - // if the number of splits is larger than a configured value - // then fail the job. - int maxTasks = jobtracker.getMaxTasksPerJob(); - if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) { - throw new IOException( - "The number of tasks for this job " + - (numMapTasks + numReduceTasks) + - " exceeds the configured limit " + maxTasks); - } + checkTaskLimits(); jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks); jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks); + + createMapTasks(jobFile, splits); - maps = new TaskInProgress[numMapTasks]; - for(int i=0; i < numMapTasks; ++i) { - inputLength += splits[i].getDataLength(); - maps[i] = new TaskInProgress(jobId, jobFile, - splits[i], - jobtracker, conf, this, i, numSlotsPerMap); - } - LOG.info("Input size for job " + jobId + " = " + inputLength - + ". Number of splits = " + splits.length); if (numMapTasks > 0) { nonRunningMapCache = createCache(splits, maxLevel); } @@ -587,17 +563,8 @@ // set the launch time this.launchTime = JobTracker.getClock().getTime(); - // - // Create reduce tasks - // - this.reduces = new TaskInProgress[numReduceTasks]; - for (int i = 0; i < numReduceTasks; i++) { - reduces[i] = new TaskInProgress(jobId, jobFile, - numMapTasks, i, - jobtracker, conf, this, numSlotsPerReduce); - nonRunningReduces.add(reduces[i]); - } - + createReduceTasks(jobFile); + // Calculate the minimum number of maps to be complete before // we should start scheduling reduces completedMapsForReduceSlowstart = @@ -627,7 +594,64 @@ } } - private void initSetupCleanupTasks(String jobFile) { + void logToJobHistory() throws IOException { + // log job info + JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), + this.startTime, hasRestarted()); + } + + JobClient.RawSplit[] createSplits() throws IOException { + DataInputStream splitFile = + fs.open(new Path(conf.get("mapred.job.split.file"))); + JobClient.RawSplit[] splits; + try { + splits = JobClient.readSplitFile(splitFile); + } finally { + splitFile.close(); + } + return splits; + } + + /** + * If the number of taks is greater than the configured value + * throw an exception that will fail job initialization + */ + void checkTaskLimits() throws IOException { + int maxTasks = jobtracker.getMaxTasksPerJob(); + if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) { + throw new IOException( + "The number of tasks for this job " + + (numMapTasks + numReduceTasks) + + " exceeds the configured limit " + maxTasks); + } + } + + synchronized void createMapTasks(String jobFile, JobClient.RawSplit[] splits) { + maps = new TaskInProgress[numMapTasks]; + for(int i=0; i < numMapTasks; ++i) { + inputLength += splits[i].getDataLength(); + maps[i] = new TaskInProgress(jobId, jobFile, + splits[i], + jobtracker, conf, this, + i, numSlotsPerMap); + } + LOG.info("Input size for job " + jobId + " = " + inputLength + + ". Number of splits = " + splits.length); + + } + + synchronized void createReduceTasks(String jobFile) { + this.reduces = new TaskInProgress[numReduceTasks]; + for (int i = 0; i < numReduceTasks; i++) { + reduces[i] = new TaskInProgress(jobId, jobFile, + numMapTasks, i, + jobtracker, conf, + this, numSlotsPerReduce); + nonRunningReduces.add(reduces[i]); + } + } + + synchronized void initSetupCleanupTasks(String jobFile) { if (!jobSetupCleanupNeeded) { // nothing to initialize return; Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java?rev=790797&r1=790796&r2=790797&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java (original) +++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestTaskLimits.java Fri Jul 3 06:10:43 2009 @@ -21,101 +21,46 @@ import junit.framework.TestCase; import java.io.IOException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.examples.PiEstimator; -import org.apache.hadoop.fs.FileSystem; - -import org.apache.commons.logging.impl.Log4JLogger; -import org.apache.log4j.Level; - /** - * A JUnit test to test configired task limits. + * A JUnit test to test configured task limits. */ public class TestTaskLimits extends TestCase { - { - ((Log4JLogger)JobInProgress.LOG).getLogger().setLevel(Level.ALL); - } - - private static final Log LOG = - LogFactory.getLog(TestMiniMRWithDFS.class.getName()); - - static final int NUM_MAPS = 5; - static final int NUM_SAMPLES = 100; - - public static class TestResult { - public String output; - public RunningJob job; - TestResult(RunningJob job, String output) { - this.job = job; - this.output = output; + static void runTest(int maxTasks, int numMaps, int numReds, + boolean shouldFail) throws Exception { + JobConf conf = new JobConf(); + conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks); + conf.set("mapred.job.tracker.handler.count", "1"); + MiniMRCluster mr = new MiniMRCluster(0, "file:///", 1, null, null, conf); + JobTracker jt = mr.getJobTrackerRunner().getJobTracker(); + JobConf jc = mr.createJobConf(); + jc.setNumMapTasks(numMaps); + jc.setNumReduceTasks(numReds); + JobInProgress jip = new JobInProgress(new JobID(), jc, jt); + boolean failed = false; + try { + jip.checkTaskLimits(); + } catch (IOException e) { + failed = true; } + assertEquals(shouldFail, failed); + mr.shutdown(); } - static void runPI(MiniMRCluster mr, JobConf jobconf) - throws IOException, InterruptedException, ClassNotFoundException { - LOG.info("runPI"); - double estimate = PiEstimator.estimate(NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue(); - double error = Math.abs(Math.PI - estimate); - System.out.println("PI estimation " + error); + public void testBeyondLimits() throws Exception { + // Max tasks is 4, Requested is 8, shouldFail = true + runTest(4, 8, 0, true); } - - /** - * Run the pi test with a specifix value of - * mapred.jobtracker.maxtasks.per.job. Returns true if the job succeeded. - */ - private boolean runOneTest(int maxTasks) - throws IOException, InterruptedException, ClassNotFoundException { - MiniDFSCluster dfs = null; - MiniMRCluster mr = null; - FileSystem fileSys = null; - boolean success = false; - try { - final int taskTrackers = 2; - - Configuration conf = new Configuration(); - conf.setInt("mapred.jobtracker.maxtasks.per.job", maxTasks); - dfs = new MiniDFSCluster(conf, 4, true, null); - fileSys = dfs.getFileSystem(); - JobConf jconf = new JobConf(conf); - mr = new MiniMRCluster(0, 0, taskTrackers, fileSys.getUri().toString(), 1, - null, null, null, jconf); - - JobConf jc = mr.createJobConf(); - try { - runPI(mr, jc); - success = true; - } catch (IOException e) { - success = false; - } - } finally { - if (dfs != null) { dfs.shutdown(); } - if (mr != null) { mr.shutdown(); } - } - return success; + + public void testTaskWithinLimits() throws Exception { + // Max tasks is 4, requested is 4, shouldFail = false + runTest(4, 4, 0, false); } - public void testTaskLimits() - throws IOException, InterruptedException, ClassNotFoundException { - System.out.println("Job 1 running with max set to 2"); - boolean status = runOneTest(2); - assertTrue(status == false); - System.out.println("Job 1 failed as expected."); - - // verify that checking this limit works well. The job - // needs 5 mappers and we set the limit to 7. - System.out.println("Job 2 running with max set to 7."); - status = runOneTest(7); - assertTrue(status == true); - System.out.println("Job 2 succeeded as expected."); - - System.out.println("Job 3 running with max disabled."); - status = runOneTest(-1); - assertTrue(status == true); - System.out.println("Job 3 succeeded as expected."); + public void testTaskWithoutLimits() throws Exception { + // No task limit, requested is 16, shouldFail = false + runTest(-1, 8, 8, false); } + }