hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sc...@apache.org
Subject svn commit: r1066273 - in /hadoop/mapreduce/branches/branch-0.22: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Date Tue, 01 Feb 2011 23:36:05 GMT
Author: schen
Date: Tue Feb  1 23:36:05 2011
New Revision: 1066273

URL: http://svn.apache.org/viewvc?rev=1066273&view=rev
Log:
MAPREDUCE-1783. Delay task Initialization till a job can be run


Modified:
    hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1066273&r1=1066272&r2=1066273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Tue Feb  1 23:36:05 2011
@@ -182,6 +182,8 @@ Release 0.22.0 - Unreleased
 
     MAPREDUCE-2260. Remove auto-generated native build files. (rvs via eli)
 
+    MAPREDUCE-1783. Delay task Initialization till a job can be run
+
   OPTIMIZATIONS
 
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and

Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1066273&r1=1066272&r2=1066273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Tue Feb  1 23:36:05 2011
@@ -28,12 +28,16 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -49,18 +53,18 @@ public class FairScheduler extends TaskS
 
   // How often to dump scheduler state to the event log
   protected long dumpInterval = 10000;
-  
+
   // How often tasks are preempted (must be longer than a couple
   // of heartbeats to give task-kill commands a chance to act).
   protected long preemptionInterval = 15000;
-  
+
   // Used to iterate through map and reduce task types
-  private static final TaskType[] MAP_AND_REDUCE = 
+  private static final TaskType[] MAP_AND_REDUCE =
     new TaskType[] {TaskType.MAP, TaskType.REDUCE};
-  
+
   // Maximum locality delay when auto-computing locality delays
   private static final long MAX_AUTOCOMPUTED_LOCALITY_DELAY = 15000;
-  
+
   protected PoolManager poolMgr;
   protected LoadManager loadMgr;
   protected TaskSelector taskSelector;
@@ -82,15 +86,15 @@ public class FairScheduler extends TaskS
   protected boolean preemptionEnabled;
   protected boolean onlyLogPreemption; // Only log when tasks should be killed
   private Clock clock;
-  private EagerTaskInitializationListener eagerInitListener;
   private JobListener jobListener;
+  private JobInitializer jobInitializer;
   private boolean mockMode; // Used for unit tests; disables background updates
                             // and scheduler event log
   private FairSchedulerEventLog eventLog;
   protected long lastDumpTime;       // Time when we last dumped state to log
-  protected long lastHeartbeatTime;  // Time we last ran assignTasks 
+  protected long lastHeartbeatTime;  // Time we last ran assignTasks
   private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
-  
+
   /**
    * A class for holding per-job scheduler variables. These always contain the
    * values of the variables at the last update(), and are used along with a
@@ -98,6 +102,8 @@ public class FairScheduler extends TaskS
    */
   static class JobInfo {
     boolean runnable = false;   // Can the job run given user/pool limits?
+    // Does this job need to be initialized?
+    volatile boolean needsInitializing = true;
     public JobSchedulable mapSchedulable;
     public JobSchedulable reduceSchedulable;
     // Variables used for delay scheduling
@@ -111,11 +117,11 @@ public class FairScheduler extends TaskS
       this.lastMapLocalityLevel = LocalityLevel.NODE;
     }
   }
-  
+
   public FairScheduler() {
     this(new Clock(), false);
   }
-  
+
   /**
    * Constructor used for tests, which can change the clock and disable updates.
    */
@@ -141,23 +147,18 @@ public class FairScheduler extends TaskS
         eventLog.init(conf, hostname);
       }
       // Initialize other pieces of the scheduler
+      jobInitializer = new JobInitializer(conf, taskTrackerManager);
       taskTrackerManager.addJobInProgressListener(jobListener);
-      if (!mockMode) {
-        eagerInitListener = new EagerTaskInitializationListener(conf);
-        eagerInitListener.setTaskTrackerManager(taskTrackerManager);
-        eagerInitListener.start();
-        taskTrackerManager.addJobInProgressListener(eagerInitListener);
-      }
       poolMgr = new PoolManager(this);
       poolMgr.initialize();
       loadMgr = (LoadManager) ReflectionUtils.newInstance(
-          conf.getClass("mapred.fairscheduler.loadmanager", 
+          conf.getClass("mapred.fairscheduler.loadmanager",
               CapBasedLoadManager.class, LoadManager.class), conf);
       loadMgr.setTaskTrackerManager(taskTrackerManager);
       loadMgr.setEventLog(eventLog);
       loadMgr.start();
       taskSelector = (TaskSelector) ReflectionUtils.newInstance(
-          conf.getClass("mapred.fairscheduler.taskselector", 
+          conf.getClass("mapred.fairscheduler.taskselector",
               DefaultTaskSelector.class, TaskSelector.class), conf);
       taskSelector.setTaskTrackerManager(taskTrackerManager);
       taskSelector.start();
@@ -191,7 +192,7 @@ public class FairScheduler extends TaskS
           "mapred.fairscheduler.locality.delay.node", defaultDelay);
       rackLocalityDelay = conf.getLong(
           "mapred.fairscheduler.locality.delay.rack", defaultDelay);
-      if (defaultDelay == -1 && 
+      if (defaultDelay == -1 &&
           (nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
         autoComputeLocalityDelay = true; // Compute from heartbeat interval
       }
@@ -231,14 +232,59 @@ public class FairScheduler extends TaskS
     if (eventLog != null)
       eventLog.log("SHUTDOWN");
     running = false;
+    jobInitializer.terminate();
     if (jobListener != null)
       taskTrackerManager.removeJobInProgressListener(jobListener);
-    if (eagerInitListener != null)
-      taskTrackerManager.removeJobInProgressListener(eagerInitListener);
     if (eventLog != null)
       eventLog.shutdown();
   }
-  
+
+  private class JobInitializer {
+    private final int DEFAULT_NUM_THREADS = 1;
+    private ExecutorService threadPool;
+    private TaskTrackerManager ttm;
+
+
+    public JobInitializer(Configuration conf, TaskTrackerManager ttm) {
+      int numThreads = conf.getInt("mapred.jobinit.threads",
+                                   DEFAULT_NUM_THREADS);
+      threadPool = Executors.newFixedThreadPool(numThreads);
+      this.ttm = ttm;
+    }
+
+    public void initJob(JobInfo jobInfo, JobInProgress job) {
+      if (!mockMode) {
+        threadPool.execute(new InitJob(jobInfo, job));
+      } else {
+        new InitJob(jobInfo, job).run();
+      }
+    }
+
+    class InitJob implements Runnable {
+      private JobInfo jobInfo;
+      private JobInProgress job;
+
+      public InitJob(JobInfo jobInfo, JobInProgress job) {
+        this.jobInfo = jobInfo;
+        this.job = job;
+      }
+
+      public void run() {
+        ttm.initJob(job);
+      }
+    }
+
+    void terminate() {
+      LOG.info("Shutting down thread pool");
+      threadPool.shutdownNow();
+      try {
+        threadPool.awaitTermination(1, TimeUnit.MINUTES);
+      } catch (InterruptedException e) {
+        // Ignore, we are in shutdown anyway.
+      }
+    }
+  }
+
   /**
    * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
    */
@@ -254,7 +300,7 @@ public class FairScheduler extends TaskS
         update();
       }
     }
-    
+
     @Override
     public void jobRemoved(JobInProgress job) {
       synchronized (FairScheduler.this) {
@@ -263,7 +309,7 @@ public class FairScheduler extends TaskS
         infos.remove(job);
       }
     }
-  
+
     @Override
     public void jobUpdated(JobChangeEvent event) {
       eventLog.log("JOB_UPDATED", event.getJobInProgress().getJobID());
@@ -292,7 +338,7 @@ public class FairScheduler extends TaskS
       }
     }
   }
-  
+
   @Override
   public synchronized List<Task> assignTasks(TaskTracker tracker)
       throws IOException {
@@ -301,7 +347,7 @@ public class FairScheduler extends TaskS
     String trackerName = tracker.getTrackerName();
     eventLog.log("HEARTBEAT", trackerName);
     long currentTime = clock.getTime();
-    
+
     // Compute total runnable maps and reduces, and currently running ones
     int runnableMaps = 0;
     int runningMaps = 0;
@@ -316,17 +362,17 @@ public class FairScheduler extends TaskS
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     // Compute total map/reduce slots
-    // In the future we can precompute this if the Scheduler becomes a 
+    // In the future we can precompute this if the Scheduler becomes a
     // listener of tracker join/leave events.
     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
-    
-    eventLog.log("RUNNABLE_TASKS", 
+
+    eventLog.log("RUNNABLE_TASKS",
         runnableMaps, runningMaps, runnableReduces, runningReduces);
 
     // Update time waited for local maps for jobs skipped on last heartbeat
     updateLocalityWaitTimes(currentTime);
-    
+
     TaskTrackerStatus tts = tracker.getStatus();
 
     int mapsAssigned = 0; // loop counter for map in the below while loop
@@ -398,8 +444,8 @@ public class FairScheduler extends TaskS
       for (Schedulable sched: scheds) { // This loop will assign only one task
         eventLog.log("INFO", "Checking for " + taskType +
             " task in " + sched.getName());
-        Task task = taskType == TaskType.MAP ? 
-                    sched.assignTask(tts, currentTime, visitedForMap) : 
+        Task task = taskType == TaskType.MAP ?
+                    sched.assignTask(tts, currentTime, visitedForMap) :
                     sched.assignTask(tts, currentTime, visitedForReduce);
         if (task != null) {
           foundTask = true;
@@ -439,7 +485,7 @@ public class FairScheduler extends TaskS
         infos.get(job).skippedAtLastHeartbeat = true;
       }
     }
-    
+
     // If no tasks were found, return null
     return tasks.isEmpty() ? null : tasks;
   }
@@ -464,7 +510,7 @@ public class FairScheduler extends TaskS
    * Update locality wait times for jobs that were skipped at last heartbeat.
    */
   private void updateLocalityWaitTimes(long currentTime) {
-    long timeSinceLastHeartbeat = 
+    long timeSinceLastHeartbeat =
       (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
     lastHeartbeatTime = currentTime;
     for (JobInfo info: infos.values()) {
@@ -476,7 +522,7 @@ public class FairScheduler extends TaskS
   }
 
   /**
-   * Update a job's locality level and locality wait variables given that that 
+   * Update a job's locality level and locality wait variables given that that
    * it has just launched a map task on a given task tracker.
    */
   private void updateLastMapLocalityLevel(JobInProgress job,
@@ -494,7 +540,7 @@ public class FairScheduler extends TaskS
    * launch tasks, based on how long it has been waiting for local tasks.
    * This is used to implement the "delay scheduling" feature of the Fair
    * Scheduler for optimizing data locality.
-   * If the job has no locality information (e.g. it does not use HDFS), this 
+   * If the job has no locality information (e.g. it does not use HDFS), this
    * method returns LocalityLevel.ANY, allowing tasks at any level.
    * Otherwise, the job can only launch tasks at its current locality level
    * or lower, unless it has waited at least nodeLocalityDelay or
@@ -543,17 +589,17 @@ public class FairScheduler extends TaskS
       return LocalityLevel.ANY;
     }
   }
-  
+
   /**
    * Recompute the internal variables used by the scheduler - per-job weights,
    * fair shares, deficits, minimum slot allocations, and numbers of running
-   * and needed tasks of each type. 
+   * and needed tasks of each type.
    */
   protected void update() {
-    // Making more granular locking so that clusterStatus can be fetched 
+    // Making more granular locking so that clusterStatus can be fetched
     // from Jobtracker without locking the scheduler.
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
-    
+
     // Recompute locality delay from JobTracker heartbeat interval if enabled.
     // This will also lock the JT, so do it outside of a fair scheduler lock.
     if (autoComputeLocalityDelay) {
@@ -562,15 +608,15 @@ public class FairScheduler extends TaskS
           (long) (1.5 * jobTracker.getNextHeartbeatInterval()));
       rackLocalityDelay = nodeLocalityDelay;
     }
-    
+
     // Got clusterStatus hence acquiring scheduler lock now.
     synchronized (this) {
       // Reload allocations file if it hasn't been loaded in a while
       poolMgr.reloadAllocsIfNecessary();
-      
+
       // Remove any jobs that have stopped running
       List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
-      for (JobInProgress job: infos.keySet()) { 
+      for (JobInProgress job: infos.keySet()) {
         int runState = job.getStatus().getRunState();
         if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
           || runState == JobStatus.KILLED) {
@@ -581,15 +627,15 @@ public class FairScheduler extends TaskS
         infos.remove(job);
         poolMgr.removeJob(job);
       }
-      
-      updateRunnability(); // Set job runnability based on user/pool limits 
-      
+
+      updateRunnability(); // Set job runnability based on user/pool limits
+
       // Update demands of jobs and pools
       for (Pool pool: poolMgr.getPools()) {
         pool.getMapSchedulable().updateDemand();
         pool.getReduceSchedulable().updateDemand();
       }
-      
+
       // Compute fair shares based on updated demands
       List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
       List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
@@ -597,18 +643,18 @@ public class FairScheduler extends TaskS
           mapScheds, clusterStatus.getMaxMapTasks());
       SchedulingAlgorithms.computeFairShares(
           reduceScheds, clusterStatus.getMaxReduceTasks());
-      
+
       // Use the computed shares to assign shares within each pool
       for (Pool pool: poolMgr.getPools()) {
         pool.getMapSchedulable().redistributeShare();
         pool.getReduceSchedulable().redistributeShare();
       }
-      
+
       if (preemptionEnabled)
         updatePreemptionVariables();
     }
   }
-  
+
   public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
     List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
     for (Pool pool: poolMgr.getPools()) {
@@ -616,7 +662,7 @@ public class FairScheduler extends TaskS
     }
     return scheds;
   }
-  
+
   private void updateRunnability() {
     // Start by marking everything as not runnable
     for (JobInfo info: infos.values()) {
@@ -630,16 +676,27 @@ public class FairScheduler extends TaskS
     Map<String, Integer> userJobs = new HashMap<String, Integer>();
     Map<String, Integer> poolJobs = new HashMap<String, Integer>();
     for (JobInProgress job: jobs) {
-      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
-        String user = job.getJobConf().getUser();
-        String pool = poolMgr.getPoolName(job);
-        int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
-        int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
-        if (userCount < poolMgr.getUserMaxJobs(user) && 
-            poolCount < poolMgr.getPoolMaxJobs(pool)) {
-          infos.get(job).runnable = true;
+      String user = job.getJobConf().getUser();
+      String pool = poolMgr.getPoolName(job);
+      int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+      int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+      if (userCount < poolMgr.getUserMaxJobs(user) &&
+          poolCount < poolMgr.getPoolMaxJobs(pool)) {
+        if (job.getStatus().getRunState() == JobStatus.RUNNING ||
+            job.getStatus().getRunState() == JobStatus.PREP) {
           userJobs.put(user, userCount + 1);
           poolJobs.put(pool, poolCount + 1);
+          JobInfo jobInfo = infos.get(job);
+          if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+            jobInfo.runnable = true;
+          } else {
+            // The job is in the PREP state. Give it to the job initializer
+            // for initialization if we have not already done it.
+            if (jobInfo.needsInitializing) {
+              jobInfo.needsInitializing = false;
+              jobInitializer.initJob(jobInfo, job);
+            }
+          }
         }
       }
     }
@@ -655,7 +712,7 @@ public class FairScheduler extends TaskS
         // Set weight based on runnable tasks
         JobInfo info = infos.get(job);
         int runnableTasks = (taskType == TaskType.MAP) ?
-            info.mapSchedulable.getDemand() : 
+            info.mapSchedulable.getDemand() :
             info.reduceSchedulable.getDemand();
         weight = Math.log1p(runnableTasks) / Math.log(2);
       }
@@ -677,7 +734,7 @@ public class FairScheduler extends TaskS
     default:        return 0.25; // priority = VERY_LOW
     }
   }
-  
+
   public PoolManager getPoolManager() {
     return poolMgr;
   }
@@ -716,7 +773,7 @@ public class FairScheduler extends TaskS
     int desiredShare = Math.min(sched.getMinShare(), sched.getDemand());
     return (sched.getRunningTasks() < desiredShare);
   }
-  
+
   /**
    * Is a pool being starved for fair share for the given task type?
    * This is defined as being below half its fair share.
@@ -733,19 +790,19 @@ public class FairScheduler extends TaskS
    * have been below half their fair share for the fairSharePreemptionTimeout.
    * If such pools exist, compute how many tasks of each type need to be
    * preempted and then select the right ones using preemptTasks.
-   * 
+   *
    * This method computes and logs the number of tasks we want to preempt even
    * if preemption is disabled, for debugging purposes.
    */
   protected void preemptTasksIfNecessary() {
     if (!preemptionEnabled)
       return;
-    
+
     long curTime = clock.getTime();
     if (curTime - lastPreemptCheckTime < preemptionInterval)
       return;
     lastPreemptCheckTime = curTime;
-    
+
     // Acquire locks on both the JobTracker (task tracker manager) and this
     // because we might need to call some JobTracker methods (killTask).
     synchronized (taskTrackerManager) {
@@ -768,9 +825,9 @@ public class FairScheduler extends TaskS
   }
 
   /**
-   * Preempt a given number of tasks from a list of PoolSchedulables. 
-   * The policy for this is to pick tasks from pools that are over their fair 
-   * share, but make sure that no pool is placed below its fair share in the 
+   * Preempt a given number of tasks from a list of PoolSchedulables.
+   * The policy for this is to pick tasks from pools that are over their fair
+   * share, but make sure that no pool is placed below its fair share in the
    * process. Furthermore, we want to minimize the amount of computation
    * wasted by preemption, so out of the tasks in over-scheduled pools, we
    * prefer to preempt tasks that started most recently.
@@ -778,9 +835,9 @@ public class FairScheduler extends TaskS
   private void preemptTasks(List<PoolSchedulable> scheds, int tasksToPreempt) {
     if (scheds.isEmpty() || tasksToPreempt == 0)
       return;
-    
+
     TaskType taskType = scheds.get(0).getTaskType();
-    
+
     // Collect running tasks of our type from over-scheduled pools
     List<TaskStatus> runningTasks = new ArrayList<TaskStatus>();
     for (PoolSchedulable sched: scheds) {
@@ -789,7 +846,7 @@ public class FairScheduler extends TaskS
         runningTasks.addAll(getRunningTasks(js.getJob(), taskType));
       }
     }
-    
+
     // Sort tasks into reverse order of start time
     Collections.sort(runningTasks, new Comparator<TaskStatus>() {
       public int compare(TaskStatus t1, TaskStatus t2) {
@@ -801,15 +858,15 @@ public class FairScheduler extends TaskS
           return -1;
       }
     });
-    
+
     // Maintain a count of tasks left in each pool; this is a bit
     // faster than calling runningTasks() on the pool repeatedly
     // because the latter must scan through jobs in the pool
-    HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>(); 
+    HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>();
     for (Pool p: poolMgr.getPools()) {
       tasksLeft.put(p, p.getSchedulable(taskType).getRunningTasks());
     }
-    
+
     // Scan down the sorted list of task statuses until we've killed enough
     // tasks, making sure we don't kill too many from any pool
     for (TaskStatus status: runningTasks) {
@@ -862,8 +919,8 @@ public class FairScheduler extends TaskS
     }
     int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
     if (tasksToPreempt > 0) {
-      String message = "Should preempt " + tasksToPreempt + " " 
-          + sched.getTaskType() + " tasks for pool " + sched.getName() 
+      String message = "Should preempt " + tasksToPreempt + " "
+          + sched.getTaskType() + " tasks for pool " + sched.getName()
           + ": tasksDueToMinShare = " + tasksDueToMinShare
           + ", tasksDueToFairShare = " + tasksDueToFairShare;
       eventLog.log("INFO", message);
@@ -930,7 +987,7 @@ public class FairScheduler extends TaskS
     synchronized (eventLog) {
       eventLog.log("BEGIN_DUMP");
       // List jobs in order of submit time
-      ArrayList<JobInProgress> jobs = 
+      ArrayList<JobInProgress> jobs =
         new ArrayList<JobInProgress>(infos.keySet());
       Collections.sort(jobs, new Comparator<JobInProgress>() {
         public int compare(JobInProgress j1, JobInProgress j2) {
@@ -986,7 +1043,7 @@ public class FairScheduler extends TaskS
   public Clock getClock() {
     return clock;
   }
-  
+
   public FairSchedulerEventLog getEventLog() {
     return eventLog;
   }

Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1066273&r1=1066272&r2=1066273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Tue Feb  1 23:36:05 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
 import org.apache.hadoop.net.Node;
@@ -63,6 +64,7 @@ public class TestFairScheduler extends T
     private int mapCounter = 0;
     private int reduceCounter = 0;
     private final String[][] mapInputLocations; // Array of hosts for each map
+    private boolean initialized;
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, 
@@ -79,7 +81,7 @@ public class TestFairScheduler extends T
       this.nonRunningReduces = new LinkedList<TaskInProgress>();   
       this.runningReduces = new LinkedHashSet<TaskInProgress>();
       this.jobHistory = new FakeJobHistory();
-      initTasks();
+      this.initialized = false;
     }
     
     @Override
@@ -130,6 +132,12 @@ public class TestFairScheduler extends T
         reduces[i] = new FakeTaskInProgress(getJobID(), i,
             getJobConf(), this);
       }
+
+      initialized = true;
+    }
+
+    public boolean isInitialized() {
+      return initialized;
     }
 
     @Override
@@ -412,7 +420,12 @@ public class TestFairScheduler extends T
     }
 
     public void initJob (JobInProgress job) {
-      // do nothing
+      try {
+        job.initTasks();
+      } catch (KillInterruptedException e) {
+      } catch (IOException e) {
+      }
+      job.status.setRunState(JobStatus.RUNNING);
     }
     
     public void failJob (JobInProgress job) {
@@ -525,18 +538,23 @@ public class TestFairScheduler extends T
     }
   }
   
+  private JobInProgress submitJobNotInitialized(int state, int maps, int reduces)
+      throws IOException {
+    return submitJob(state, maps, reduces, null, null, false);
+  }
+
   private JobInProgress submitJob(int state, int maps, int reduces)
       throws IOException {
-    return submitJob(state, maps, reduces, null, null);
+    return submitJob(state, maps, reduces, null, null, true);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool)
       throws IOException {
-    return submitJob(state, maps, reduces, pool, null);
+    return submitJob(state, maps, reduces, pool, null, true);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool,
-      String[][] mapInputLocations) throws IOException {
+      String[][] mapInputLocations, boolean initializeJob) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
@@ -544,6 +562,9 @@ public class TestFairScheduler extends T
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
         mapInputLocations, UtilsForTests.getJobTracker());
+    if (initializeJob) {
+      taskTrackerManager.initJob(job);
+    }
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;
@@ -641,7 +662,6 @@ public class TestFairScheduler extends T
   }
 
   public void testNonRunningJobsAreIgnored() throws IOException {
-    submitJobs(1, JobStatus.PREP, 10, 10);
     submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
     submitJobs(1, JobStatus.FAILED, 10, 10);
     submitJobs(1, JobStatus.KILLED, 10, 10);
@@ -1345,18 +1365,24 @@ public class TestFairScheduler extends T
     
     // Submit jobs, advancing time in-between to make sure that they are
     // all submitted at distinct times.
-    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job1 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(10);
-    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job2 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info2 = scheduler.infos.get(job2);
     advanceTime(10);
-    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job3 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
-    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job4 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info4 = scheduler.infos.get(job4);
-    
+
+    // Only two of the jobs should be initialized.
+    assertTrue(((FakeJobInProgress)job1).isInitialized());
+    assertTrue(((FakeJobInProgress)job2).isInitialized());
+    assertFalse(((FakeJobInProgress)job3).isInitialized());
+    assertFalse(((FakeJobInProgress)job4).isInitialized());
+
     // Check scheduler variables
     assertEquals(2.0,  info1.mapSchedulable.getFairShare());
     assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
@@ -2253,7 +2279,7 @@ public class TestFairScheduler extends T
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
         new String[][] {
           {"rack2.node2"}
-        });
+        }, true);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Advance time before submitting another job j2, to make j1 be ahead
@@ -2301,7 +2327,7 @@ public class TestFairScheduler extends T
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 4, 0, "pool1",
         new String[][] {
           {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}
-        });
+        }, true);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Advance time before submitting another job j2, to make j1 be ahead
@@ -2384,7 +2410,7 @@ public class TestFairScheduler extends T
         new String[][] {
           {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"},
           {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"},
-        });
+        }, true);
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(100);
     
@@ -2672,6 +2698,7 @@ public class TestFairScheduler extends T
     jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA");
     JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager,
         null, UtilsForTests.getJobTracker());
+    job3.initTasks();
     job3.getStatus().setRunState(JobStatus.RUNNING);
     taskTrackerManager.submitJob(job3);
 
@@ -2687,6 +2714,7 @@ public class TestFairScheduler extends T
     jobConf2.set(POOL_PROPERTY, "poolA");
     JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager,
         null, UtilsForTests.getJobTracker());
+    job4.initTasks();
     job4.getStatus().setRunState(JobStatus.RUNNING);
     taskTrackerManager.submitJob(job4);
 
@@ -2708,10 +2736,10 @@ public class TestFairScheduler extends T
   protected void checkAssignment(String taskTrackerName,
       String... expectedTasks) throws IOException {
     List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(tasks);
     System.out.println("Assigned tasks:");
     for (int i = 0; i < tasks.size(); i++)
       System.out.println("- " + tasks.get(i));
-    assertNotNull(tasks);
     assertEquals(expectedTasks.length, tasks.size());
     for (int i = 0; i < tasks.size(); i++)
       assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());



Mime
View raw message