hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r693360 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/
Date Tue, 09 Sep 2008 04:44:07 GMT
Author: ddas
Date: Mon Sep  8 21:44:06 2008
New Revision: 693360

URL: http://svn.apache.org/viewvc?rev=693360&view=rev
Log:
HADOOP-4100. Removes the cleanupTask scheduling from the Scheduler implementations and moves
it to the JobTracker. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Sep  8 21:44:06 2008
@@ -484,6 +484,10 @@
     HADOOP-3963. libhdfs does not exit on its own, instead it returns error 
     to the caller and behaves as a true library. (Pete Wyckoff via dhruba)
 
+    HADOOP-4100. Removes the cleanupTask scheduling from the Scheduler 
+    implementations and moves it to the JobTracker. 
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Mon Sep  8 21:44:06
2008
@@ -804,6 +804,11 @@
    * @return true/false
    */
   private synchronized boolean canLaunchCleanupTask() {
+    // check if the job is running
+    if (status.getRunState() != JobStatus.RUNNING) {
+      return false;
+    }
+    // check if cleanup task has been launched already. 
     if (launchedCleanup) {
       return false;
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Mon Sep
 8 21:44:06 2008
@@ -73,34 +73,6 @@
                                  0.01f);
   }
 
-  protected Task getCleanupTask(int numMaps, int numReduces,
-                                int maxMapTasks, int maxReduceTasks,
-                                TaskTrackerStatus taskTracker,
-                                int numTaskTrackers,
-                                Collection<JobInProgress> jobQueue) 
-  throws IOException {
-    Task t = null;
-    if (numMaps < maxMapTasks) {
-      for (JobInProgress job : jobQueue) {
-        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
-                       taskTrackerManager.getNumberOfUniqueHosts(), true);
-        if (t != null) {
-          return t;
-        }
-      }
-    }
-    if (numReduces < maxReduceTasks) {
-      for (JobInProgress job : jobQueue) {
-        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
-                       taskTrackerManager.getNumberOfUniqueHosts(), false);
-        if (t != null) {
-          return t;
-        }
-      }
-    }
-    return t;
-  }
-  
   @Override
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
@@ -119,17 +91,6 @@
     int numMaps = taskTracker.countMapTasks();
     int numReduces = taskTracker.countReduceTasks();
 
-
-    // cleanup task has the highest priority, it should be 
-    // launched as soon as the job is done.
-    synchronized (jobQueue) {
-      Task t = getCleanupTask(numMaps, numReduces, maxCurrentMapTasks,
-                 maxCurrentReduceTasks, taskTracker, numTaskTrackers, jobQueue);
-      if (t != null) {
-        return Collections.singletonList(t);
-      }
-    }
-
     //
     // Compute average map and reduce task numbers across pool
     //

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Mon Sep  8 21:44:06
2008
@@ -1221,7 +1221,10 @@
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
-        List<Task> tasks = taskScheduler.assignTasks(taskTrackerStatus);
+        List<Task> tasks = getCleanupTask(taskTrackerStatus);
+        if (tasks == null ) {
+          tasks = taskScheduler.assignTasks(taskTrackerStatus);
+        }
         if (tasks != null) {
           for (Task task : tasks) {
             expireLaunchingTasks.addNewTask(task.getTaskID());
@@ -1457,6 +1460,43 @@
     return null;
   }
   
+  private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
+  throws IOException {
+    int maxMapTasks = taskTracker.getMaxMapTasks();
+    int maxReduceTasks = taskTracker.getMaxReduceTasks();
+    int numMaps = taskTracker.countMapTasks();
+    int numReduces = taskTracker.countReduceTasks();
+    int numTaskTrackers = getClusterStatus().getTaskTrackers();
+    int numUniqueHosts = getNumberOfUniqueHosts();
+
+    Task t = null;
+    synchronized (jobs) {
+      if (numMaps < maxMapTasks) {
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+      }
+      if (numReduces < maxReduceTasks) {
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   /**
    * Grab the local fs name
    */

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=693360&r1=693359&r2=693360&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
(original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
Mon Sep  8 21:44:06 2008
@@ -74,16 +74,6 @@
     final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
     final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
 
-    // check if cleanup task can be launched
-    synchronized (jobQueue) {
-      task = getCleanupTask(mapTasksNumber, reduceTasksNumber,
-               maximumMapTasksNumber, maximumReduceTasksNumber,
-               taskTracker, numTaskTrackers, jobQueue);
-      if (task != null) {
-        return Collections.singletonList(task);
-      }
-    }
-
     /*
      * Statistics about the whole cluster. Most are approximate because of
      * concurrency



Mime
View raw message