hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r417789 - in /lucene/hadoop/trunk: CHANGES.txt conf/hadoop-default.xml src/java/org/apache/hadoop/mapred/TaskTracker.java
Date Wed, 28 Jun 2006 15:28:27 GMT
Author: cutting
Date: Wed Jun 28 08:28:27 2006
New Revision: 417789

URL: http://svn.apache.org/viewvc?rev=417789&view=rev
Log:
HADOOP-27.  Don't allocate tasks to trackers whose local free space is too low.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=417789&r1=417788&r2=417789&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jun 28 08:28:27 2006
@@ -77,6 +77,9 @@
 18. HADOOP-328.  Add an option to the "distcp" command to ignore read
     errors while copying.  (omalley via cutting)
 
+19. HADOOP-27.  Don't allocate tasks to trackers whose local free
+    space is too low.  (Johan Oskarson via cutting)
+
 
 Release 0.3.2 - 2006-06-09
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?rev=417789&r1=417788&r2=417789&view=diff
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Jun 28 08:28:27 2006
@@ -234,6 +234,27 @@
 </property>
 
 <property>
+  <name>mapred.local.dir.minspacestart</name>
+  <value>0</value>
+  <description>If the space in mapred.local.dir drops under this, 
+  do not ask for more tasks.
+  Value in bytes.
+  </description>
+</property>
+
+<property>
+  <name>mapred.local.dir.minspacekill</name>
+  <value>0</value>
+  <description>If the space in mapred.local.dir drops under this, 
+  	do not ask more tasks until all the current ones have finished and 
+  	cleaned up. Also, to save the rest of the tasks we have running, 
+  	kill one of them, to clean up some space. Start with the reduce tasks,
+  	then go with the ones that have finished the least.
+  	Value in bytes.
+  </description>
+</property>
+
+<property>
   <name>mapred.map.tasks</name>
   <value>2</value>
   <description>The default number of map tasks per job.  Typically set

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=417789&r1=417788&r2=417789&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jun 28 08:28:27
2006
@@ -65,6 +65,18 @@
     int mapTotal = 0;
     int reduceTotal = 0;
     boolean justStarted = true;
+    
+    //dir -> DF
+    Map localDirsDf = new HashMap();
+    long minSpaceStart = 0;
+    //must have this much space free to start new tasks
+    boolean acceptNewTasks = true;
+    long minSpaceKill = 0;
+    //if we run under this limit, kill one task
+    //and make sure we never receive any new jobs
+    //until all the old tasks have been cleaned up.
+    //this is if a machine is so full it's only good
+    //for serving map output to the other nodes
 
     static Random r = new Random();
     FileSystem fs = null;
@@ -119,7 +131,12 @@
         this.runningTasks = new TreeMap();
         this.mapTotal = 0;
         this.reduceTotal = 0;
-
+        this.acceptNewTasks = true;
+        
+        this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
+        this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
+        
+        
         // port numbers
         this.taskReportPort = this.fConf.getInt("mapred.task.tracker.report.port", 50050);
 
@@ -331,11 +348,14 @@
             // Check if we should create a new Task
             //
             try {
-              if (mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) {
+              if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
acceptNewTasks) {
                   checkLocalDirs(fConf.getLocalDirs());
-                  Task t = jobClient.pollForNewTask(taskTrackerName);
-                  if (t != null) {
-                    startNewTask(t);
+                  
+                  if (enoughFreeSpace(minSpaceStart)) {
+                    Task t = jobClient.pollForNewTask(taskTrackerName);
+                    if (t != null) {
+                      startNewTask(t);
+                    }
                   }
               }
             } catch (DiskErrorException de ) {
@@ -403,12 +423,99 @@
               LOG.info("Problem getting closed tasks: " +
                        StringUtils.stringifyException(ie));
             }
+            
+            //Check if we're dangerously low on disk space
+            // If so, kill jobs to free up space and make sure
+            // we don't accept any new tasks
+            // Try killing the reduce jobs first, since I believe they
+            // use up most space
+            // Then pick the one with least progress
+            
+            if (!enoughFreeSpace(minSpaceKill)) {
+              acceptNewTasks=false; 
+              //we give up! do not accept new tasks until
+              //all the ones running have finished and they're all cleared up
+              synchronized (this) {
+                TaskInProgress killMe = null;
+
+                for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
+                  TaskInProgress tip = (TaskInProgress) it.next();
+                  if ((tip.getRunState() == TaskStatus.RUNNING) &&
+                      !tip.wasKilled) {
+                        	
+                    if (killMe == null) {
+                      killMe = tip;
+
+                    } else if (!tip.getTask().isMapTask()) {
+                      //reduce task, give priority
+                      if (killMe.getTask().isMapTask() || 
+                          (tip.getTask().getProgress().get() < 
+                           killMe.getTask().getProgress().get())) {
+
+                        killMe = tip;
+                      }
+
+                    } else if (killMe.getTask().isMapTask() &&
+                               tip.getTask().getProgress().get() < 
+                               killMe.getTask().getProgress().get()) {
+                      //map task, only add if the progress is lower
+
+                      killMe = tip;
+                    }
+                  }
+                }
+
+                if (killMe!=null) {
+                  String msg = "Tasktracker running out of space. Killing task.";
+                  LOG.info(killMe.getTask().getTaskId() + ": " + msg);
+                  killMe.reportDiagnosticInfo(msg);
+                  try {
+                    killMe.killAndCleanup(true);
+                  } catch (IOException ie) {
+                    LOG.info("Problem cleaning task up: " +
+                             StringUtils.stringifyException(ie));
+                  }
+                }
+              }
+            }
+
+
+            //we've cleaned up, resume normal operation
+            if (!acceptNewTasks && tasks.isEmpty()) {
+                acceptNewTasks=true;
+            }
         }
 
         return 0;
     }
 
     /**
+     * Check if all of the local directories have enough
+     * free space
+     * 
+     * If not, do not try to get a new task assigned 
+     * @return
+     * @throws IOException 
+     */
+    private boolean enoughFreeSpace(long minSpace) throws IOException {
+      String[] localDirs = fConf.getLocalDirs();
+      for (int i = 0; i < localDirs.length; i++) {
+        DF df = null;
+        if (localDirsDf.containsKey(localDirs[i])) {
+          df = (DF) localDirsDf.get(localDirs[i]);
+        } else {
+          df = new DF(localDirs[i], fConf);
+          localDirsDf.put(localDirs[i], df);
+        }
+
+        if (df.getAvailable() < minSpace)
+          return false;
+      }
+
+      return true;
+    }
+
+	/**
      * Start a new task.
      * All exceptions are handled locally, so that we don't mess up the
      * task tracker.



Mime
View raw message