hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1040840 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Date Wed, 01 Dec 2010 00:39:53 GMT
Author: schen
Date: Wed Dec  1 00:39:53 2010
New Revision: 1040840

URL: http://svn.apache.org/viewvc?rev=1040840&view=rev
Log:
MAPREDUCE-1783. FairScheduler initializes tasks only when the job can be run.
(Ramkumar Vadali via schen)

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1040840&r1=1040839&r2=1040840&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Dec  1 00:39:53 2010
@@ -432,6 +432,9 @@ Release 0.22.0 - Unreleased
     MAPREDUCE-2195. New property for local conf directory in
     system-test-mapreduce.xml file. (cos)
 
+    MAPREDUCE-1783. FairScheduler initializes tasks only when the job can be
+    run. (Ramkumar Vadali via schen)
+
 Release 0.21.1 - Unreleased
 
   NEW FEATURES

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1040840&r1=1040839&r2=1040840&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Wed Dec  1 00:39:53 2010
@@ -28,6 +28,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -82,8 +85,8 @@ public class FairScheduler extends TaskS
   protected boolean preemptionEnabled;
   protected boolean onlyLogPreemption; // Only log when tasks should be killed
   private Clock clock;
-  private EagerTaskInitializationListener eagerInitListener;
   private JobListener jobListener;
+  private JobInitializer jobInitializer;
   private boolean mockMode; // Used for unit tests; disables background updates
                             // and scheduler event log
   private FairSchedulerEventLog eventLog;
@@ -98,6 +101,8 @@ public class FairScheduler extends TaskS
    */
   static class JobInfo {
     boolean runnable = false;   // Can the job run given user/pool limits?
+    // Does this job need to be initialized?
+    volatile boolean needsInitializing = true;
     public JobSchedulable mapSchedulable;
     public JobSchedulable reduceSchedulable;
     // Variables used for delay scheduling
@@ -141,13 +146,8 @@ public class FairScheduler extends TaskS
         eventLog.init(conf, hostname);
       }
       // Initialize other pieces of the scheduler
+      jobInitializer = new JobInitializer(conf, taskTrackerManager);
       taskTrackerManager.addJobInProgressListener(jobListener);
-      if (!mockMode) {
-        eagerInitListener = new EagerTaskInitializationListener(conf);
-        eagerInitListener.setTaskTrackerManager(taskTrackerManager);
-        eagerInitListener.start();
-        taskTrackerManager.addJobInProgressListener(eagerInitListener);
-      }
       poolMgr = new PoolManager(this);
       poolMgr.initialize();
       loadMgr = (LoadManager) ReflectionUtils.newInstance(
@@ -231,15 +231,54 @@ public class FairScheduler extends TaskS
     if (eventLog != null)
       eventLog.log("SHUTDOWN");
     running = false;
+    jobInitializer.terminate();
     if (jobListener != null)
       taskTrackerManager.removeJobInProgressListener(jobListener);
-    if (eagerInitListener != null)
-      taskTrackerManager.removeJobInProgressListener(eagerInitListener);
     if (eventLog != null)
       eventLog.shutdown();
   }
-  
-  /**
+ 
+
+  private class JobInitializer {
+    private final int DEFAULT_NUM_THREADS = 1;
+    private ExecutorService threadPool;
+    private TaskTrackerManager ttm;
+    public JobInitializer(Configuration conf, TaskTrackerManager ttm) {
+      int numThreads = conf.getInt("mapred.jobinit.threads",
+          DEFAULT_NUM_THREADS);
+      threadPool = Executors.newFixedThreadPool(numThreads);
+      this.ttm = ttm;
+    }
+    public void initJob(JobInfo jobInfo, JobInProgress job) {
+      if (!mockMode) {
+        threadPool.execute(new InitJob(jobInfo, job));
+      } else {
+        new InitJob(jobInfo, job).run();
+      }
+    }
+    class InitJob implements Runnable {
+      private JobInfo jobInfo;
+      private JobInProgress job;
+      public InitJob(JobInfo jobInfo, JobInProgress job) {
+        this.jobInfo = jobInfo;
+        this.job = job;
+      }
+      public void run() {
+        ttm.initJob(job);
+      }
+    }
+    void terminate() {
+      LOG.info("Shutting down thread pool");
+      threadPool.shutdownNow();
+      try {
+        threadPool.awaitTermination(1, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        // Ignore, we are in shutdown anyway.
+      }
+    }
+  }
+
+/**
    * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
    */
   private class JobListener extends JobInProgressListener {
@@ -630,16 +669,27 @@ public class FairScheduler extends TaskS
     Map<String, Integer> userJobs = new HashMap<String, Integer>();
     Map<String, Integer> poolJobs = new HashMap<String, Integer>();
     for (JobInProgress job: jobs) {
-      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-        String user = job.getJobConf().getUser();
-        String pool = poolMgr.getPoolName(job);
-        int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
-        int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
-        if (userCount < poolMgr.getUserMaxJobs(user) && 
-            poolCount < poolMgr.getPoolMaxJobs(pool)) {
-          infos.get(job).runnable = true;
+      String user = job.getJobConf().getUser();
+      String pool = poolMgr.getPoolName(job);
+      int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+      int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+      if (userCount < poolMgr.getUserMaxJobs(user) &&
+          poolCount < poolMgr.getPoolMaxJobs(pool)) {
+        if (job.getStatus().getRunState() == JobStatus.RUNNING ||
+            job.getStatus().getRunState() == JobStatus.PREP) {
           userJobs.put(user, userCount + 1);
           poolJobs.put(pool, poolCount + 1);
+          JobInfo jobInfo = infos.get(job);
+          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+            jobInfo.runnable = true;
+          } else {
+            // The job is in the PREP state. Give it to the job initializer
+            // for initialization if we have not already done it.
+            if (jobInfo.needsInitializing) {
+              jobInfo.needsInitializing = false;
+              jobInitializer.initJob(jobInfo, job);
+            }
+          }
         }
       }
     }

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1040840&r1=1040839&r2=1040840&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Wed Dec  1 00:39:53 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.io.BytesWritabl
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -63,6 +64,7 @@ public class TestFairScheduler extends T
     private int mapCounter = 0;
     private int reduceCounter = 0;
     private final String[][] mapInputLocations; // Array of hosts for each map
+    private boolean initialized;
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, 
@@ -79,7 +81,7 @@ public class TestFairScheduler extends T
       this.nonRunningReduces = new LinkedList<TaskInProgress>();   
       this.runningReduces = new LinkedHashSet<TaskInProgress>();
       this.jobHistory = new FakeJobHistory();
-      initTasks();
+      this.initialized = false;
     }
     
     @Override
@@ -130,6 +132,12 @@ public class TestFairScheduler extends T
         reduces[i] = new FakeTaskInProgress(getJobID(), i,
             getJobConf(), this);
       }
+      
+      initialized = true;
+    }
+    
+    public boolean isInitialized() {
+      return initialized;
     }
 
     @Override
@@ -412,7 +420,11 @@ public class TestFairScheduler extends T
     }
 
     public void initJob (JobInProgress job) {
-      // do nothing
+      try {
+        job.initTasks();
+      } catch (KillInterruptedException e) {
+      } catch (IOException e) {
+      }
     }
     
     public void failJob (JobInProgress job) {
@@ -525,18 +537,23 @@ public class TestFairScheduler extends T
     }
   }
   
+  private JobInProgress submitJobNotInitialized(int state, int maps, int reduces)
+	    throws IOException {
+    return submitJob(state, maps, reduces, null, null, false);
+  }
+
   private JobInProgress submitJob(int state, int maps, int reduces)
       throws IOException {
-    return submitJob(state, maps, reduces, null, null);
+    return submitJob(state, maps, reduces, null, null, true);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool)
       throws IOException {
-    return submitJob(state, maps, reduces, pool, null);
+    return submitJob(state, maps, reduces, pool, null, true);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool,
-      String[][] mapInputLocations) throws IOException {
+      String[][] mapInputLocations, boolean initializeJob) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
@@ -544,6 +561,9 @@ public class TestFairScheduler extends T
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
         mapInputLocations, UtilsForTests.getJobTracker());
+    if (initializeJob) {
+      taskTrackerManager.initJob(job);
+    }
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;
@@ -641,7 +661,6 @@ public class TestFairScheduler extends T
   }
 
   public void testNonRunningJobsAreIgnored() throws IOException {
-    submitJobs(1, JobStatus.PREP, 10, 10);
     submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
     submitJobs(1, JobStatus.FAILED, 10, 10);
     submitJobs(1, JobStatus.KILLED, 10, 10);
@@ -1345,18 +1364,28 @@ public class TestFairScheduler extends T
     
     // Submit jobs, advancing time in-between to make sure that they are
     // all submitted at distinct times.
-    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job1 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
+    assertTrue(((FakeJobInProgress)job1).isInitialized());
+    job1.getStatus().setRunState(JobStatus.RUNNING);
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(10);
-    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job2 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
+    assertTrue(((FakeJobInProgress)job2).isInitialized());
+    job2.getStatus().setRunState(JobStatus.RUNNING);
     JobInfo info2 = scheduler.infos.get(job2);
     advanceTime(10);
-    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job3 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
-    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job4 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info4 = scheduler.infos.get(job4);
     
+    // Only two of the jobs should be initialized.
+    assertTrue(((FakeJobInProgress)job1).isInitialized());
+    assertTrue(((FakeJobInProgress)job2).isInitialized());
+    assertFalse(((FakeJobInProgress)job3).isInitialized());
+    assertFalse(((FakeJobInProgress)job4).isInitialized());
+    
     // Check scheduler variables
     assertEquals(2.0,  info1.mapSchedulable.getFairShare());
     assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
@@ -2158,7 +2187,7 @@ public class TestFairScheduler extends T
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
         new String[][] {
           {"rack2.node2"}
-        });
+        }, true);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Advance time before submitting another job j2, to make j1 be ahead
@@ -2206,7 +2235,7 @@ public class TestFairScheduler extends T
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 4, 0, "pool1",
         new String[][] {
           {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}
-        });
+        }, true);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Advance time before submitting another job j2, to make j1 be ahead
@@ -2289,7 +2318,7 @@ public class TestFairScheduler extends T
         new String[][] {
           {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"},
           {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"},
-        });
+        }, true);
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(100);
     
@@ -2577,6 +2606,7 @@ public class TestFairScheduler extends T
     jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA");
     JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager,
         null, UtilsForTests.getJobTracker());
+    job3.initTasks();
     job3.getStatus().setRunState(JobStatus.RUNNING);
     taskTrackerManager.submitJob(job3);
 
@@ -2592,6 +2622,7 @@ public class TestFairScheduler extends T
     jobConf2.set(POOL_PROPERTY, "poolA");
     JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager,
         null, UtilsForTests.getJobTracker());
+    job4.initTasks();
     job4.getStatus().setRunState(JobStatus.RUNNING);
     taskTrackerManager.submitJob(job4);
 
@@ -2613,10 +2644,10 @@ public class TestFairScheduler extends T
   protected void checkAssignment(String taskTrackerName,
       String... expectedTasks) throws IOException {
     List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(tasks);
     System.out.println("Assigned tasks:");
     for (int i = 0; i < tasks.size(); i++)
       System.out.println("- " + tasks.get(i));
-    assertNotNull(tasks);
     assertEquals(expectedTasks.length, tasks.size());
     for (int i = 0; i < tasks.size(); i++)
       assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());



Mime
View raw message