Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 65147 invoked from network); 1 Dec 2010 00:41:31 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 1 Dec 2010 00:41:31 -0000 Received: (qmail 92148 invoked by uid 500); 1 Dec 2010 00:41:31 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 92043 invoked by uid 500); 1 Dec 2010 00:41:30 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 92035 invoked by uid 99); 1 Dec 2010 00:41:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Dec 2010 00:41:30 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Dec 2010 00:41:26 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7EDBC23888E4; Wed, 1 Dec 2010 00:39:53 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1040840 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Date: Wed, 01 Dec 2010 00:39:53 -0000 To: mapreduce-commits@hadoop.apache.org From: schen@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101201003953.7EDBC23888E4@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: schen Date: Wed Dec 1 00:39:53 2010 New Revision: 1040840 URL: http://svn.apache.org/viewvc?rev=1040840&view=rev Log: MAPREDUCE-1783. FairScheduler initializes tasks only when the job can be run. (Ramkumar Vadali via schen) Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1040840&r1=1040839&r2=1040840&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Wed Dec 1 00:39:53 2010 @@ -432,6 +432,9 @@ Release 0.22.0 - Unreleased MAPREDUCE-2195. New property for local conf directory in system-test-mapreduce.xml file. (cos) + MAPREDUCE-1783. FairScheduler initializes tasks only when the job can be + run. (Ramkumar Vadali via schen) + Release 0.21.1 - Unreleased NEW FEATURES Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1040840&r1=1040839&r2=1040840&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Dec 1 00:39:53 2010 @@ -28,6 +28,9 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -82,8 +85,8 @@ public class FairScheduler extends TaskS protected boolean preemptionEnabled; protected boolean onlyLogPreemption; // Only log when tasks should be killed private Clock clock; - private EagerTaskInitializationListener eagerInitListener; private JobListener jobListener; + private JobInitializer jobInitializer; private boolean mockMode; // Used for unit tests; disables background updates // and scheduler event log private FairSchedulerEventLog eventLog; @@ -98,6 +101,8 @@ public class FairScheduler extends TaskS */ static class JobInfo { boolean runnable = false; // Can the job run given user/pool limits? + // Does this job need to be initialized? + volatile boolean needsInitializing = true; public JobSchedulable mapSchedulable; public JobSchedulable reduceSchedulable; // Variables used for delay scheduling @@ -141,13 +146,8 @@ public class FairScheduler extends TaskS eventLog.init(conf, hostname); } // Initialize other pieces of the scheduler + jobInitializer = new JobInitializer(conf, taskTrackerManager); taskTrackerManager.addJobInProgressListener(jobListener); - if (!mockMode) { - eagerInitListener = new EagerTaskInitializationListener(conf); - eagerInitListener.setTaskTrackerManager(taskTrackerManager); - eagerInitListener.start(); - taskTrackerManager.addJobInProgressListener(eagerInitListener); - } poolMgr = new PoolManager(this); poolMgr.initialize(); loadMgr = (LoadManager) ReflectionUtils.newInstance( @@ -231,15 +231,54 @@ public class FairScheduler extends TaskS if (eventLog != null) eventLog.log("SHUTDOWN"); running = false; + jobInitializer.terminate(); if (jobListener != null) taskTrackerManager.removeJobInProgressListener(jobListener); - if (eagerInitListener != null) - taskTrackerManager.removeJobInProgressListener(eagerInitListener); if (eventLog != null) eventLog.shutdown(); } - - /** + + + private class JobInitializer { + private final int DEFAULT_NUM_THREADS = 1; + private ExecutorService threadPool; + private TaskTrackerManager ttm; + public JobInitializer(Configuration conf, TaskTrackerManager ttm) { + int numThreads = conf.getInt("mapred.jobinit.threads", + DEFAULT_NUM_THREADS); + threadPool = Executors.newFixedThreadPool(numThreads); + this.ttm = ttm; + } + public void initJob(JobInfo jobInfo, JobInProgress job) { + if (!mockMode) { + threadPool.execute(new InitJob(jobInfo, job)); + } else { + new InitJob(jobInfo, job).run(); + } + } + class InitJob implements Runnable { + private JobInfo jobInfo; + private JobInProgress job; + public InitJob(JobInfo jobInfo, JobInProgress job) { + this.jobInfo = jobInfo; + this.job = job; + } + public void run() { + ttm.initJob(job); + } + } + void terminate() { + LOG.info("Shutting down thread pool"); + threadPool.shutdownNow(); + try { + threadPool.awaitTermination(1, TimeUnit.MINUTES); + } catch (InterruptedException e) { + // Ignore, we are in shutdown anyway. + } + } + } + +/** * Used to listen for jobs added/removed by our {@link TaskTrackerManager}. */ private class JobListener extends JobInProgressListener { @@ -630,16 +669,27 @@ public class FairScheduler extends TaskS Map userJobs = new HashMap(); Map poolJobs = new HashMap(); for (JobInProgress job: jobs) { - if (job.getStatus().getRunState() == JobStatus.RUNNING) { - String user = job.getJobConf().getUser(); - String pool = poolMgr.getPoolName(job); - int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0; - int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0; - if (userCount < poolMgr.getUserMaxJobs(user) && - poolCount < poolMgr.getPoolMaxJobs(pool)) { - infos.get(job).runnable = true; + String user = job.getJobConf().getUser(); + String pool = poolMgr.getPoolName(job); + int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0; + int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0; + if (userCount < poolMgr.getUserMaxJobs(user) && + poolCount < poolMgr.getPoolMaxJobs(pool)) { + if (job.getStatus().getRunState() == JobStatus.RUNNING || + job.getStatus().getRunState() == JobStatus.PREP) { userJobs.put(user, userCount + 1); poolJobs.put(pool, poolCount + 1); + JobInfo jobInfo = infos.get(job); + if (job.getStatus().getRunState() == JobStatus.RUNNING) { + jobInfo.runnable = true; + } else { + // The job is in the PREP state. Give it to the job initializer + // for initialization if we have not already done it. + if (jobInfo.needsInitializing) { + jobInfo.needsInitializing = false; + jobInitializer.initJob(jobInfo, job); + } + } } } } Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1040840&r1=1040839&r2=1040840&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Wed Dec 1 00:39:53 2010 @@ -40,6 +40,7 @@ import org.apache.hadoop.io.BytesWritabl import org.apache.hadoop.mapred.FairScheduler.JobInfo; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory; +import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException; import org.apache.hadoop.mapred.UtilsForTests.FakeClock; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker; @@ -63,6 +64,7 @@ public class TestFairScheduler extends T private int mapCounter = 0; private int reduceCounter = 0; private final String[][] mapInputLocations; // Array of hosts for each map + private boolean initialized; public FakeJobInProgress(JobConf jobConf, FakeTaskTrackerManager taskTrackerManager, @@ -79,7 +81,7 @@ public class TestFairScheduler extends T this.nonRunningReduces = new LinkedList(); this.runningReduces = new LinkedHashSet(); this.jobHistory = new FakeJobHistory(); - initTasks(); + this.initialized = false; } @Override @@ -130,6 +132,12 @@ public class TestFairScheduler extends T reduces[i] = new FakeTaskInProgress(getJobID(), i, getJobConf(), this); } + + initialized = true; + } + + public boolean isInitialized() { + return initialized; } @Override @@ -412,7 +420,11 @@ public class TestFairScheduler extends T } public void initJob (JobInProgress job) { - // do nothing + try { + job.initTasks(); + } catch (KillInterruptedException e) { + } catch (IOException e) { + } } public void failJob (JobInProgress job) { @@ -525,18 +537,23 @@ public class TestFairScheduler extends T } } + private JobInProgress submitJobNotInitialized(int state, int maps, int reduces) + throws IOException { + return submitJob(state, maps, reduces, null, null, false); + } + private JobInProgress submitJob(int state, int maps, int reduces) throws IOException { - return submitJob(state, maps, reduces, null, null); + return submitJob(state, maps, reduces, null, null, true); } private JobInProgress submitJob(int state, int maps, int reduces, String pool) throws IOException { - return submitJob(state, maps, reduces, pool, null); + return submitJob(state, maps, reduces, pool, null, true); } private JobInProgress submitJob(int state, int maps, int reduces, String pool, - String[][] mapInputLocations) throws IOException { + String[][] mapInputLocations, boolean initializeJob) throws IOException { JobConf jobConf = new JobConf(conf); jobConf.setNumMapTasks(maps); jobConf.setNumReduceTasks(reduces); @@ -544,6 +561,9 @@ public class TestFairScheduler extends T jobConf.set(POOL_PROPERTY, pool); JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager, mapInputLocations, UtilsForTests.getJobTracker()); + if (initializeJob) { + taskTrackerManager.initJob(job); + } job.getStatus().setRunState(state); taskTrackerManager.submitJob(job); job.startTime = clock.time; @@ -641,7 +661,6 @@ public class TestFairScheduler extends T } public void testNonRunningJobsAreIgnored() throws IOException { - submitJobs(1, JobStatus.PREP, 10, 10); submitJobs(1, JobStatus.SUCCEEDED, 10, 10); submitJobs(1, JobStatus.FAILED, 10, 10); submitJobs(1, JobStatus.KILLED, 10, 10); @@ -1345,18 +1364,28 @@ public class TestFairScheduler extends T // Submit jobs, advancing time in-between to make sure that they are // all submitted at distinct times. - JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); + JobInProgress job1 = submitJobNotInitialized(JobStatus.PREP, 10, 10); + assertTrue(((FakeJobInProgress)job1).isInitialized()); + job1.getStatus().setRunState(JobStatus.RUNNING); JobInfo info1 = scheduler.infos.get(job1); advanceTime(10); - JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10); + JobInProgress job2 = submitJobNotInitialized(JobStatus.PREP, 10, 10); + assertTrue(((FakeJobInProgress)job2).isInitialized()); + job2.getStatus().setRunState(JobStatus.RUNNING); JobInfo info2 = scheduler.infos.get(job2); advanceTime(10); - JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10); + JobInProgress job3 = submitJobNotInitialized(JobStatus.PREP, 10, 10); JobInfo info3 = scheduler.infos.get(job3); advanceTime(10); - JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10); + JobInProgress job4 = submitJobNotInitialized(JobStatus.PREP, 10, 10); JobInfo info4 = scheduler.infos.get(job4); + // Only two of the jobs should be initialized. + assertTrue(((FakeJobInProgress)job1).isInitialized()); + assertTrue(((FakeJobInProgress)job2).isInitialized()); + assertFalse(((FakeJobInProgress)job3).isInitialized()); + assertFalse(((FakeJobInProgress)job4).isInitialized()); + // Check scheduler variables assertEquals(2.0, info1.mapSchedulable.getFairShare()); assertEquals(2.0, info1.reduceSchedulable.getFairShare()); @@ -2158,7 +2187,7 @@ public class TestFairScheduler extends T JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1", new String[][] { {"rack2.node2"} - }); + }, true); JobInfo info1 = scheduler.infos.get(job1); // Advance time before submitting another job j2, to make j1 be ahead @@ -2206,7 +2235,7 @@ public class TestFairScheduler extends T JobInProgress job1 = submitJob(JobStatus.RUNNING, 4, 0, "pool1", new String[][] { {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"} - }); + }, true); JobInfo info1 = scheduler.infos.get(job1); // Advance time before submitting another job j2, to make j1 be ahead @@ -2289,7 +2318,7 @@ public class TestFairScheduler extends T new String[][] { {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, - }); + }, true); JobInfo info1 = scheduler.infos.get(job1); advanceTime(100); @@ -2577,6 +2606,7 @@ public class TestFairScheduler extends T jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA"); JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager, null, UtilsForTests.getJobTracker()); + job3.initTasks(); job3.getStatus().setRunState(JobStatus.RUNNING); taskTrackerManager.submitJob(job3); @@ -2592,6 +2622,7 @@ public class TestFairScheduler extends T jobConf2.set(POOL_PROPERTY, "poolA"); JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager, null, UtilsForTests.getJobTracker()); + job4.initTasks(); job4.getStatus().setRunState(JobStatus.RUNNING); taskTrackerManager.submitJob(job4); @@ -2613,10 +2644,10 @@ public class TestFairScheduler extends T protected void checkAssignment(String taskTrackerName, String... expectedTasks) throws IOException { List tasks = scheduler.assignTasks(tracker(taskTrackerName)); + assertNotNull(tasks); System.out.println("Assigned tasks:"); for (int i = 0; i < tasks.size(); i++) System.out.println("- " + tasks.get(i)); - assertNotNull(tasks); assertEquals(expectedTasks.length, tasks.size()); for (int i = 0; i < tasks.size(); i++) assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());