hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r736070 - in /hadoop/core/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Tue, 20 Jan 2009 19:18:58 GMT
Author: yhemanth
Date: Tue Jan 20 11:18:58 2009
New Revision: 736070

URL: http://svn.apache.org/viewvc?rev=736070&view=rev
Log:
HADOOP-5048. Fix capacity scheduler to correctly cleanup jobs that are killed after initialization,
but before running. Contributed by Sreekanth Ramakrishnan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Jan 20 11:18:58 2009
@@ -617,6 +617,10 @@
     HADOOP-4993. Fix Chukwa agent configuration and startup to make it both
     more modular and testable. (Ari Rabkin via cdouglas)
 
+    HADOOP-5048. Fix capacity scheduler to correctly cleanup jobs that are
+    killed after initialization, but before running. 
+    (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue Jan 20 11:18:58 2009
@@ -1276,7 +1276,7 @@
       }
       
       //update stats on waiting jobs
-      for(JobInProgress j: jobQueuesManager.getJobs(qsi.queueName)) {
+      for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) {
         // pending tasks
         if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
             (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
@@ -1498,7 +1498,7 @@
       jobCollection.addAll(runningJobs);
     }
     Collection<JobInProgress> waitingJobs = 
-      jobQueuesManager.getJobs(queueName);
+      jobQueuesManager.getWaitingJobs(queueName);
     Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
     if(waitingJobs != null) {
       tempCollection.addAll(waitingJobs);

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
Tue Jan 20 11:18:58 2009
@@ -243,7 +243,7 @@
    * Set of jobs which have been passed to Initialization threads.
    * This is maintained so that we dont call initTasks() for same job twice.
    */
-  private HashSet<JobID> initializedJobs;
+  private HashMap<JobID,JobInProgress> initializedJobs;
 
   private volatile boolean running;
 
@@ -255,7 +255,7 @@
 
   public JobInitializationPoller(JobQueuesManager mgr,
       CapacitySchedulerConf rmConf, Set<String> queue) {
-    initializedJobs = new HashSet<JobID>();
+    initializedJobs = new HashMap<JobID,JobInProgress>();
     jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
     threadsToQueueMap = new HashMap<String, JobInitializationThread>();
@@ -293,9 +293,20 @@
     }
   }
 
+  /**
+   * This is main thread of initialization poller, We essentially do 
+   * following in the main threads:
+   * 
+   * <ol>
+   * <li> Clean up the list of initialized jobs list which poller maintains
+   * </li>
+   * <li> Select jobs to initialize in the polling interval.</li>
+   * </ol>
+   */
   public void run() {
     while (running) {
       try {
+        cleanUpInitializedJobsList();
         selectJobsToInitialize();
         if (!this.isInterrupted()) {
           Thread.sleep(sleepInterval);
@@ -307,17 +318,18 @@
     }
   }
 
-  // The key method that picks up jobs to initialize for each queue.
-  // The jobs picked up are added to the worker thread that is handling
-  // initialization for that queue.
-  // The method is package private to allow tests to call it synchronously
-  // in a controlled manner.
+  /**
+   * The key method which does selecting jobs to be initalized across 
+   * queues and assign those jobs to their appropriate init-worker threads.
+   * <br/>
+   * This method is overriden in test case which is used to test job
+   * initialization poller.
+   * 
+   */
   void selectJobsToInitialize() {
     for (String queue : jobQueues.keySet()) {
       ArrayList<JobInProgress> jobsToInitialize = getJobsToInitialize(queue);
-      //if (LOG.isDebugEnabled()) {
-        printJobs(jobsToInitialize);
-      //}
+      printJobs(jobsToInitialize);
       JobInitializationThread t = threadsToQueueMap.get(queue);
       for (JobInProgress job : jobsToInitialize) {
         t.addJobsToQueue(queue, job);
@@ -325,6 +337,13 @@
     }
   }
 
+  /**
+   * Method used to print log statements about which jobs are being
+   * passed to init-threads. 
+   * 
+   * @param jobsToInitialize list of jobs which are passed to be 
+   * init-threads.
+   */
   private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
     for (JobInProgress job : jobsToInitialize) {
       LOG.info("Passing to Initializer Job Id :" + job.getJobID()
@@ -333,13 +352,25 @@
     }
   }
 
-  // This method exists to be overridden by test cases that wish to
-  // create a test-friendly worker thread which can be controlled
-  // synchronously.
+  /**
+   * This method exists to be overridden by test cases that wish to
+   * create a test-friendly worker thread which can be controlled
+   * synchronously.
+   * 
+   * @return Instance of worker init-threads.
+   */
   JobInitializationThread createJobInitializationThread() {
     return new JobInitializationThread();
   }
   
+  /**
+   * Method which is used by the poller to assign appropriate worker thread
+   * to a queue. The number of threads would be always less than or equal
+   * to number of queues in a system. If number of threads is configured to 
+   * be more than number of queues then poller does not create threads more
+   * than number of queues. 
+   * 
+   */
   private void assignThreadsToQueues() {
     int countOfQueues = jobQueues.size();
     String[] queues = (String[]) jobQueues.keySet().toArray(
@@ -369,14 +400,38 @@
     }
   }
 
-  /*
-   * Select jobs to be initialized for a given queue.
+  /**
+   * 
+   * Method used to select jobs to be initialized for a given queue. <br/>
+   * 
+   * We want to ensure that enough jobs have been initialized, so that when the
+   * Scheduler wants to consider a new job to run, it's ready. We clearly don't
+   * want to initialize too many jobs as each initialized job has a memory
+   * footprint, sometimes significant.
    * 
-   * The jobs are selected such that they are within the limits
-   * for number of users and number of jobs per user in the queue.
-   * The only exception is if high priority jobs are waiting to be
-   * initialized. In that case, we could exceed the configured limits.
-   * However, we try to restrict the excess to a minimum.
+   * Number of jobs to be initialized is restricted by two values: - Maximum
+   * number of users whose jobs we want to initialize, which is equal to 
+   * the number of concurrent users the queue can support. - Maximum number 
+   * of initialized jobs per user. The product of these two gives us the
+   * total number of initialized jobs.
+   * 
+   * Note that this is a rough number, meant for decreasing extra memory
+   * footprint. It's OK if we go over it once in a while, if we have to.
+   * 
+   * This can happen as follows. Suppose we have initialized 3 jobs for a
+   * user. Now, suppose the user submits a job who's priority is higher than
+   * that of the 3 jobs initialized. This job needs to be initialized, since it
+   * will run earlier than the 3 jobs. We'll now have 4 initialized jobs for the
+   * user. If memory becomes a problem, we should ideally un-initialize one of
+   * the 3 jobs, to keep the count of initialized jobs at 3, but that's
+   * something we don't do for now. This situation can also arise when a new
+   * user submits a high priority job, thus superceeding a user whose jobs have
+   * already been initialized. The latter user's initialized jobs are redundant,
+   * but we'll leave them initialized.
+   * 
+   * @param queue name of the queue to pick the jobs to initialize.
+   * @return list of jobs to be initalized in a queue. An empty queue is
+   *         returned if no jobs are found.
    */
   ArrayList<JobInProgress> getJobsToInitialize(String queue) {
     QueueInfo qi = jobQueues.get(queue);
@@ -385,116 +440,131 @@
     // queue.
     int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
     int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
-    // calculate maximum number of jobs which can be allowed to initialize
-    // for this queue.
-    // This value is used when a user submits a high priority job after we
-    // have initialized jobs for that queue and none of them is scheduled.
-    // This would prevent us from initializing extra jobs for that particular
-    // user. Explanation given at end of method.
     int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
         * maxJobsPerUserAllowedToInitialize;
-    Collection<JobInProgress> jobs = jobQueueManager.getJobs(queue);
     int countOfJobsInitialized = 0;
     HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
+    Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+    /*
+     * Walk through the collection of waiting jobs.
+     *  We maintain a map of jobs that have already been initialized. If a 
+     *  job exists in that map, increment the count for that job's user 
+     *  and move on to the next job.
+     *   
+     *  If the job doesn't exist, see whether we  want to initialize it. 
+     *  We initialize it if: - at least one job of the user has already 
+     *  been initialized, but the user's total initialized jobs are below 
+     *  the limit, OR - this is a new user, and we haven't reached the limit
+     *  for the number of users whose jobs we want to initialize. We break 
+     *  when we've reached the limit of maximum jobs to initialize.
+     */
     for (JobInProgress job : jobs) {
-      /*
-       * First check if job has been scheduled or completed or killed. If so
-       * then remove from uninitialised jobs. Remove from Job queue
-       */
-      if ((job.getStatus().getRunState() == JobStatus.RUNNING)
-          && (job.runningMaps() > 0 || job.runningReduces() > 0
-              || job.finishedMaps() > 0 || job.finishedReduces() > 0)) {
-        LOG.debug("Removing from the queue " + job.getJobID());
-        initializedJobs.remove(job.getJobID());
-        jobQueueManager.removeJobFromQueue(job);
-        continue;
-      } else if (job.isComplete()) {
-        LOG.debug("Removing from completed job from " + "the queue "
-            + job.getJobID());
-        initializedJobs.remove(job.getJobID());
-        jobQueueManager.removeJobFromQueue(job);
-        continue;
-      }
       String user = job.getProfile().getUser();
       int numberOfJobs = userJobsInitialized.get(user) == null ? 0
           : userJobsInitialized.get(user);
       // If the job is already initialized then add the count against user
       // then continue.
-      if (initializedJobs.contains(job.getJobID())) {
+      if (initializedJobs.containsKey(job.getJobID())) {
         userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
         countOfJobsInitialized++;
         continue;
       }
       boolean isUserPresent = userJobsInitialized.containsKey(user);
-      /*
-       * If the user is present in user list and size of user list is less
-       * maximum allowed users initialize then initialize this job and add this
-       * user to the global list.
-       * 
-       * Else if he is present we check if his number of jobs has not crossed
-       * his quota and global quota.
-       * 
-       * The logic behind using a global per queue job can be understood by example
-       * below: Consider 3 users submitting normal priority job in a job queue with
-       * user limit as 100. (Max jobs per user = 2)
-       * 
-       * U1J1,U1J2,U1J3....,U3J3.
-       * 
-       * Jobs initialized would be
-       * 
-       * U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
-       * 
-       * Now consider a case where U4 comes in and submits a high priority job.
-       * 
-       * U4J1 --- High Priority JOb, U4J2---- Normal priority job.
-       * 
-       * So, if we dont use global per queue value we would end up initializing both
-       * U4 jobs which is not correct.
-       * 
-       * By using a global value we ensure that we dont initialize any extra jobs
-       * for a user.
-       */
       if (!isUserPresent
           && userJobsInitialized.size() < maximumUsersAllowedToInitialize) {
         // this is a new user being considered and the number of users
         // is within limits.
         userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
         jobsToInitialize.add(job);
-        initializedJobs.add(job.getJobID());
+        initializedJobs.put(job.getJobID(),job);
         countOfJobsInitialized++;
       } else if (isUserPresent
-          && numberOfJobs < maxJobsPerUserAllowedToInitialize
-          && countOfJobsInitialized < maxJobsPerQueueToInitialize) {
-        /*
-         * this is an existing user and the number of jobs per user
-         * is within limits, as also the number of jobs per queue.
-         * We need the check on number of jobs per queue to restrict
-         * the number of jobs we initialize over the limit due to high
-         * priority jobs.
-         * 
-         * For e.g Consider 3 users submitting normal priority job in 
-         * a job queue with user limit as 100 and max jobs per user as 2
-         * Say the jobs are U1J1,U1J2,U1J3....,U3J3.
-         * 
-         * Jobs initialized would be U1J1,U1J2,U2J1,U2J2,U3J1,U3J2
-         * 
-         * Now consider a case where U4 comes in and submits a high priority job
-         * and a normal priority job. Say U4J1 and U4J2
-         * 
-         * If we dont consider the number of jobs per queue we would end up 
-         * initializing both jobs from U4. Initializing the second job is
-         * unnecessary.
-         */
+          && numberOfJobs < maxJobsPerUserAllowedToInitialize) {
         userJobsInitialized.put(user, Integer.valueOf(numberOfJobs + 1));
         jobsToInitialize.add(job);
-        initializedJobs.add(job.getJobID());
+        initializedJobs.put(job.getJobID(),job);
         countOfJobsInitialized++;
       }
+      /*
+       * if the maximum number of jobs to initalize for a queue is reached
+       * then we stop looking at further jobs. The jobs beyond this number
+       * can be initialized.
+       */
+      if(countOfJobsInitialized > maxJobsPerQueueToInitialize) {
+        break;
+      }
     }
     return jobsToInitialize;
   }
 
 
+  /**
+   * Method which is used internally to clean up the initialized jobs
+   * data structure which the job initialization poller uses to check
+   * if a job is initalized or not.
+   * 
+   * Algorithm for cleaning up task is as follows:
+   * 
+   * <ul>
+   * <li> For jobs in <b>initalizedJobs</b> list </li>
+   * <ul>
+   * <li> If job is running</li>
+   * <ul>
+   * <li> If job is scheduled then remove the job from the waiting queue 
+   * of the scheduler and <b>initalizedJobs</b>.<br/>
+   *  The check for a job is scheduled or not is done by following 
+   *  formulae:<br/> 
+   *  if pending <i>task</i> &lt; desired <i>task</i> then scheduled
else
+   *  not scheduled.<br/>
+   *  The formulae would return <i>scheduled</i> if one task has run or failed,
+   *  any cases in which there has been a failure but not enough to mark task 
+   *  as failed, we return <i>not scheduled</i> in formulae.
+   * </li>
+   * </ul>
+   * 
+   * <li> If job is complete, then remove the job from <b>initalizedJobs</b>.
+   * </li>
+   * 
+   * </ul>
+   * </ul>
+   * 
+   */
+  void cleanUpInitializedJobsList() {
+    Iterator<Entry<JobID, JobInProgress>> jobsIterator = 
+      initializedJobs.entrySet().iterator();
+    while(jobsIterator.hasNext()) {
+      Entry<JobID,JobInProgress> entry = jobsIterator.next();
+      JobInProgress job = entry.getValue();
+      if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+        if (isScheduled(job)) {
+          LOG.info("Removing scheduled jobs from waiting queue"
+              + job.getJobID());
+          jobsIterator.remove();
+          jobQueueManager.removeJobFromWaitingQueue(job);
+          continue;
+        }
+      }
+      if(job.isComplete()) {
+        LOG.info("Removing killed/completed job from initalized jobs " +
+        		"list : "+ job.getJobID());
+        jobsIterator.remove();
+      }
+    }
+  }
+
+  /**
+   * Convenience method to check if job has been scheduled or not.
+   * 
+   * The method may return false in case of job which has failure but
+   * has not failed the tip.
+   * @param job
+   * @return
+   */
+  private boolean isScheduled(JobInProgress job) {
+    return ((job.pendingMaps() < job.desiredMaps()) 
+        || (job.pendingReduces() < job.desiredReduces()));
+  }
+
   void terminate() {
     running = false;
     for (Entry<String, JobInitializationThread> entry : threadsToQueueMap
@@ -519,7 +589,7 @@
     }
   }
 
-  HashSet<JobID> getInitializedJobList() {
-    return initializedJobs;
+  Set<JobID> getInitializedJobList() {
+    return initializedJobs.keySet();
   }
 }

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
Tue Jan 20 11:18:58 2009
@@ -65,7 +65,7 @@
 
     // whether the queue supports priorities
     boolean supportsPriorities;
-    Map<JobSchedulingInfo, JobInProgress> jobList; // for waiting jobs
+    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
     Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
     
     public Comparator<JobSchedulingInfo> comparator;
@@ -79,14 +79,14 @@
       else {
         comparator = STARTTIME_JOB_COMPARATOR;
       }
-      jobList = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
+      waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
       runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
     }
     
-    Collection<JobInProgress> getJobs() {
-      synchronized (jobList) {
+    Collection<JobInProgress> getWaitingJobs() {
+      synchronized (waitingJobs) {
         return Collections.unmodifiableCollection(
-            new LinkedList<JobInProgress>(jobList.values()));
+            new LinkedList<JobInProgress>(waitingJobs.values()));
       }
     }
     
@@ -109,21 +109,21 @@
       }
     }
     
-    JobInProgress removeJob(JobSchedulingInfo schedInfo) {
-      synchronized (jobList) {
-        return jobList.remove(schedInfo);
+    JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
+      synchronized (waitingJobs) {
+        return waitingJobs.remove(schedInfo);
       }
     }
     
-    void addJob(JobInProgress job) {
-      synchronized (jobList) {
-        jobList.put(new JobSchedulingInfo(job), job);
+    void addWaitingJob(JobInProgress job) {
+      synchronized (waitingJobs) {
+        waitingJobs.put(new JobSchedulingInfo(job), job);
       }
     }
     
     int getWaitingJobCount() {
-      synchronized (jobList) {
-       return jobList.size(); 
+      synchronized (waitingJobs) {
+       return waitingJobs.size(); 
       }
     }
     
@@ -157,11 +157,11 @@
   }
   
   /**
-   * Returns the queue of Uninitialised jobs associated with queue name.
+   * Returns the queue of waiting jobs associated with queue name.
    * 
    */
-  public Collection<JobInProgress> getJobs(String queueName) {
-    return jobQueues.get(queueName).getJobs();
+  Collection<JobInProgress> getWaitingJobs(String queueName) {
+    return jobQueues.get(queueName).getWaitingJobs();
   }
   
   @Override
@@ -178,21 +178,23 @@
     }
     // add job to waiting queue. It will end up in the right place, 
     // based on priority. 
-    qi.addJob(job);
+    qi.addWaitingJob(job);
     // let scheduler know. 
     scheduler.jobAdded(job);
   }
 
   /*
-   * The removal of the running jobs alone is done by the JobQueueManager.
-   * The removal of the jobs in the job queue is taken care by the
-   * JobInitializationPoller.
+   * Method removes the jobs from both running and waiting job queue in 
+   * job queue manager.
    */
   private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
                             QueueInfo qi) {
     LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
         + job.getProfile().getQueueName() + " has completed");
+    //remove jobs from both queue's a job can be in
+    //running and waiting queue at the same time.
     qi.removeRunningJob(oldInfo);
+    qi.removeWaitingJob(oldInfo);
     // let scheduler know
     scheduler.jobCompleted(job);
   }
@@ -206,8 +208,8 @@
   private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
                            QueueInfo qi) {
     
-    if(qi.removeJob(oldInfo) != null) {
-      qi.addJob(job);
+    if(qi.removeWaitingJob(oldInfo) != null) {
+      qi.addWaitingJob(job);
     }
     if(qi.removeRunningJob(oldInfo) != null) {
       qi.addRunningJob(job);
@@ -264,10 +266,10 @@
     }
   }
   
-  void removeJobFromQueue(JobInProgress job) {
+  void removeJobFromWaitingQueue(JobInProgress job) {
     String queue = job.getProfile().getQueueName();
     QueueInfo qi = jobQueues.get(queue);
-    qi.removeJob(new JobSchedulingInfo(job));
+    qi.removeWaitingJob(new JobSchedulingInfo(job));
   }
   
   Comparator<JobSchedulingInfo> getComparator(String queue) {

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=736070&r1=736069&r2=736070&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Tue Jan 20 11:18:58 2009
@@ -128,6 +128,7 @@
 
     @Override
     void selectJobsToInitialize() {
+      super.cleanUpInitializedJobsList();
       super.selectJobsToInitialize();
       for (ControlledJobInitializer t : workers) {
         t.initializeJobs();
@@ -704,7 +705,7 @@
     // check if the jobs are missing from the waiting queue
     // The jobs are not removed from waiting queue until they are scheduled 
     assertEquals("Waiting queue is garbled on job init", 2, 
-                 scheduler.jobQueuesManager.getJobs("default")
+                 scheduler.jobQueuesManager.getWaitingJobs("default")
                           .size());
     
     // test if changing the job priority/start-time works as expected in the 
@@ -788,7 +789,7 @@
   private JobInProgress[] getJobsInQueue(boolean waiting) {
     Collection<JobInProgress> queue = 
       waiting 
-      ? scheduler.jobQueuesManager.getJobs("default")
+      ? scheduler.jobQueuesManager.getWaitingJobs("default")
       : scheduler.jobQueuesManager.getRunningJobQueue("default");
     return queue.toArray(new JobInProgress[0]);
   }
@@ -897,7 +898,7 @@
    
     JobQueuesManager mgr = scheduler.jobQueuesManager;
     
-    while(mgr.getJobs("default").size() < 4){
+    while(mgr.getWaitingJobs("default").size() < 4){
       Thread.sleep(1);
     }
     //Raise status change events for jobs submitted.
@@ -2147,10 +2148,10 @@
     
     // reference to the initializedJobs data structure
     // changes are reflected in the set as they are made by the poller
-    HashSet<JobID> initializedJobs = initPoller.getInitializedJobList();
+    Set<JobID> initializedJobs = initPoller.getInitializedJobList();
     
     // we should have 12 (3 x 4) jobs in the job queue
-    assertEquals(mgr.getJobs("default").size(), 12);
+    assertEquals(mgr.getWaitingJobs("default").size(), 12);
 
     // run one poller iteration.
     p.selectJobsToInitialize();
@@ -2264,7 +2265,7 @@
     scheduler.setInitializationPoller(p);
     scheduler.start();
     JobInitializationPoller initPoller = scheduler.getInitializationPoller();
-    HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
+    Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
 
     // submit 3 jobs for 3 users
     submitJobs(3,3,"default");
@@ -2317,22 +2318,38 @@
     scheduler.start();
     
     JobQueuesManager mgr = scheduler.jobQueuesManager;
-    JobInitializationPoller initPoller = scheduler.getInitializationPoller();
-    HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
     
+    // check proper running job movement and completion
+    checkRunningJobMovementAndCompletion();
+
+    // check failed running job movement
+    checkFailedRunningJobMovement();
+
+    // Check job movement of failed initalized job
+    checkFailedInitializedJobMovement();
+
+    // Check failed waiting job movement
+    checkFailedWaitingJobMovement();
+    
+  }
+  
+  private void checkRunningJobMovementAndCompletion() throws IOException {
+    
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobInitializationPoller p = scheduler.getInitializationPoller();
     // submit a job
     FakeJobInProgress job = 
       submitJob(JobStatus.PREP, 1, 1, "default", "u1");
     p.selectJobsToInitialize();
     
-    assertEquals(initializedJobsList.size(), 1);
+    assertEquals(p.getInitializedJobList().size(), 1);
 
     // make it running.
     raiseStatusChangeEvents(mgr);
     
     // it should be there in both the queues.
     assertTrue("Job not present in Job Queue",
-        mgr.getJobs("default").contains(job));
+        mgr.getWaitingJobs("default").contains(job));
     assertTrue("Job not present in Running Queue",
         mgr.getRunningJobQueue("default").contains(job));
     
@@ -2343,12 +2360,12 @@
     p.selectJobsToInitialize();
     
     // now this task should be removed from the initialized list.
-    assertTrue(initializedJobsList.isEmpty());
+    assertTrue(p.getInitializedJobList().isEmpty());
 
     // the job should also be removed from the job queue as tasks
     // are scheduled
     assertFalse("Job present in Job Queue",
-        mgr.getJobs("default").contains(job));
+        mgr.getWaitingJobs("default").contains(job));
     
     // complete tasks and job
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
@@ -2360,8 +2377,78 @@
         mgr.getRunningJobQueue("default").contains(job));
   }
   
+  private void checkFailedRunningJobMovement() throws IOException {
+    
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    
+    //submit a job and initalized the same
+    FakeJobInProgress job = 
+      submitJobAndInit(JobStatus.RUNNING, 1, 1, "default", "u1");
+    
+    //check if the job is present in running queue.
+    assertTrue("Running jobs list does not contain submitted job",
+        mgr.getRunningJobQueue("default").contains(job));
+    
+    taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
+    
+    //check if the job is properly removed from running queue.
+    assertFalse("Running jobs list does not contain submitted job",
+        mgr.getRunningJobQueue("default").contains(job));
+    
+  }
+
+  private void checkFailedInitializedJobMovement() throws IOException {
+    
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobInitializationPoller p = scheduler.getInitializationPoller();
+    
+    //submit a job
+    FakeJobInProgress job = submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+    //Initialize the job
+    p.selectJobsToInitialize();
+    //Don't raise the status change event.
+    
+    //check in waiting and initialized jobs list.
+    assertTrue("Waiting jobs list does not contain the job",
+        mgr.getWaitingJobs("default").contains(job));
+    
+    assertTrue("Initialized job does not contain the job",
+        p.getInitializedJobList().contains(job.getJobID()));
+    
+    //fail the initalized job
+    taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
+    
+    //Check if the job is present in waiting queue
+    assertFalse("Waiting jobs list contains failed job",
+        mgr.getWaitingJobs("default").contains(job));
+    
+    //run the poller to do the cleanup
+    p.selectJobsToInitialize();
+    
+    //check for failed job in the initialized job list
+    assertFalse("Initialized jobs  contains failed job",
+        p.getInitializedJobList().contains(job.getJobID()));
+  }
+  
+  private void checkFailedWaitingJobMovement() throws IOException {
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    // submit a job
+    FakeJobInProgress job = submitJob(JobStatus.PREP, 1, 1, "default",
+        "u1");
+    
+    // check in waiting and initialized jobs list.
+    assertTrue("Waiting jobs list does not contain the job", mgr
+        .getWaitingJobs("default").contains(job));
+    // fail the waiting job
+    taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
+
+    // Check if the job is present in waiting queue
+    assertFalse("Waiting jobs list contains failed job", mgr
+        .getWaitingJobs("default").contains(job));
+  }
+  
   private void raiseStatusChangeEvents(JobQueuesManager mgr) {
-    Collection<JobInProgress> jips = mgr.getJobs("default");
+    Collection<JobInProgress> jips = mgr.getWaitingJobs("default");
     for(JobInProgress jip : jips) {
       if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
         JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,



Mime
View raw message