hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r685290 - in /hadoop/core/trunk: ./ src/mapred/org/apache/hadoop/mapred/
Date Tue, 12 Aug 2008 20:15:39 GMT
Author: omalley
Date: Tue Aug 12 13:15:38 2008
New Revision: 685290

URL: http://svn.apache.org/viewvc?rev=685290&view=rev
Log:
HADOOP-657. Free disk space should be modelled and used by the scheduler
to make scheduling decisions. (Ari Rabkin via omalley)

Added:
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Aug 12 13:15:38 2008
@@ -72,6 +72,9 @@
 
     HADOOP-153. Provides a way to skip bad records. (Sharad Agarwal via ddas)
 
+    HADOOP-657. Free disk space should be modelled and used by the scheduler
+    to make scheduling decisions. (Ari Rabkin via omalley)
+
   IMPROVEMENTS
 
     HADOOP-3732. Delay intialization of datanode block verification till

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Aug 12 13:15:38
2008
@@ -800,7 +800,8 @@
     private String splitClass;
     private BytesWritable bytes = new BytesWritable();
     private String[] locations;
-      
+    long dataLength;
+
     public void setBytes(byte[] data, int offset, int length) {
       bytes.set(data, offset, length);
     }
@@ -831,6 +832,7 @@
       
     public void readFields(DataInput in) throws IOException {
       splitClass = Text.readString(in);
+      dataLength = in.readLong();
       bytes.readFields(in);
       int len = WritableUtils.readVInt(in);
       locations = new String[len];
@@ -841,12 +843,21 @@
       
     public void write(DataOutput out) throws IOException {
       Text.writeString(out, splitClass);
+      out.writeLong(dataLength);
       bytes.write(out);
       WritableUtils.writeVInt(out, locations.length);
       for(int i = 0; i < locations.length; i++) {
         Text.writeString(out, locations[i]);
       }        
     }
+
+    public long getDataLength() {
+      return dataLength;
+    }
+    public void setDataLength(long l) {
+      dataLength = l;
+    }
+    
   }
     
   private static final int CURRENT_SPLIT_FILE_VERSION = 0;
@@ -871,6 +882,7 @@
       rawSplit.setClassName(split.getClass().getName());
       buffer.reset();
       split.write(buffer);
+      rawSplit.setDataLength(split.getLength());
       rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
       rawSplit.setLocations(split.getLocations());
       rawSplit.write(out);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 12 13:15:38
2008
@@ -119,6 +119,9 @@
   private Map<String, Integer> trackerToFailuresMap = 
     new TreeMap<String, Integer>();
     
+  //Confine estimation algorithms to an "oracle" class that JIP queries.
+  private ResourceEstimator resourceEstimator; 
+  
   long startTime;
   long finishTime;
 
@@ -129,6 +132,7 @@
   private JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
+  private long inputLength = 0;
 
   // Per-job counters
   public static enum Counter { 
@@ -220,6 +224,7 @@
     this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.resourceEstimator = new ResourceEstimator(this);
   }
 
   /**
@@ -335,10 +340,12 @@
     numMapTasks = splits.length;
     maps = new TaskInProgress[numMapTasks];
     for(int i=0; i < numMapTasks; ++i) {
+      inputLength += splits[i].getDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
                                    jobtracker, conf, this, i);
     }
+    LOG.info("Input size for job "+ jobId + " = " + inputLength);
     if (numMapTasks > 0) { 
       LOG.info("Split info for job:" + jobId);
       nonRunningMapCache = createCache(splits, maxLevel);
@@ -434,6 +441,10 @@
       this.priority = priority;
     }
   }
+  
+  long getInputLength() {
+    return inputLength;
+  }
  
   /**
    * Get the list of map tasks
@@ -1076,6 +1087,17 @@
     Node node = jobtracker.getNode(tts.getHost());
     Node nodeParentAtMaxLevel = null;
     
+
+    long outSize = resourceEstimator.getEstimatedMapOutputSize();
+    if(tts.getAvailableSpace() < outSize) {
+      LOG.warn("No room for map task. Node " + node + 
+               " has " + tts.getAvailableSpace() + 
+               " bytes free; but we expect map to take " + outSize);
+
+      return -1; //see if a different TIP might work better. 
+    }
+    
+    
     // For scheduling a map task, we have two caches and a list (optional)
     //  I)   one for non-running task
     //  II)  one for running task (this is for handling speculation)
@@ -1272,6 +1294,15 @@
       return -1;
     }
 
+    long outSize = resourceEstimator.getEstimatedReduceInputSize();
+    if(tts.getAvailableSpace() < outSize) {
+      LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
+               tts.getAvailableSpace() + 
+               " bytes free; but we expect reduce input to take " + outSize);
+
+      return -1; //see if a different TIP might work better. 
+    }
+    
     // 1. check for a never-executed reduce tip
     // reducers don't have a cache and so pass -1 to explicitly call that out
     tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
@@ -1342,6 +1373,7 @@
 
     // Mark the TIP as complete
     tip.completed(taskid);
+    resourceEstimator.updateWithCompletedTask(status, tip);
 
     // Update jobhistory 
     String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(

Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java?rev=685290&view=auto
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java (added)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceEstimator.java Tue Aug 12
13:15:38 2008
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.util.concurrent.atomic.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Class responsible for modeling the resource consumption of running tasks.
+ * 
+ * For now, we just do temp space for maps
+ * 
+ * There is one ResourceEstimator per JobInProgress
+ *
+ */
+public class ResourceEstimator {
+
+  //Log with JobInProgress
+  private static final Log LOG = LogFactory.getLog(
+      "org.apache.hadoop.mapred.ResourceEstimator");
+
+
+  /**
+   * Estimated ratio of output to input size for map tasks. 
+   */
+  private double mapBlowupRatio;
+  private double estimateWeight;
+  private JobInProgress job;
+
+  //guess a factor of two blowup due to temp space for merge
+  public static final double INITIAL_BLOWUP_GUESS = 1; 
+
+  //initial estimate is weighted as much as this fraction of the real datapoints
+  static final double INITIAL_EST_WEIGHT_PERCENT = 0.05; 
+
+
+  public ResourceEstimator(JobInProgress job) {
+    mapBlowupRatio = INITIAL_BLOWUP_GUESS;
+    this.job = job;
+    estimateWeight = INITIAL_EST_WEIGHT_PERCENT * job.desiredMaps();
+  }
+
+
+  /**
+   * Have private access methods to abstract away synchro.
+   * @return
+   */
+  private synchronized double getBlowupRatio() {
+    return mapBlowupRatio;
+  }
+
+  private synchronized void setBlowupRatio(double b)  {
+    mapBlowupRatio = b;
+  }
+
+
+
+  public void updateWithCompletedTask(TaskStatus ts, TaskInProgress tip) {
+
+    //-1 indicates error, which we don't average in.
+    if(tip.isMapTask() &&  ts.getOutputSize() != -1)  {
+      double blowupOnThisTask = ts.getOutputSize() / 
+        (double) tip.getMapInputSize();
+      
+      LOG.info("measured blowup on " + tip.getTIPId() + " was " +
+          ts.getOutputSize() + "/" +tip.getMapInputSize() + " = " 
+          + blowupOnThisTask);
+      
+      double newEstimate = blowupOnThisTask / estimateWeight + 
+          ((estimateWeight - 1) / estimateWeight) * getBlowupRatio();
+      estimateWeight++; 
+      setBlowupRatio(newEstimate);
+    }
+  }
+
+  /**
+   * 
+   * @return estimated length of this job's average map output
+   * @throws IOException if the split's getLength() does.
+   */
+  public long getEstimatedMapOutputSize()  {
+    double blowup =getBlowupRatio();
+    long estimate =  
+      (long) (job.getInputLength() * blowup / job.desiredMaps() * 2.0);
+    LOG.info("estimate map will take " + estimate +
+        " bytes. (blowup = 2*" + blowup + ")");
+    return estimate;
+  }
+
+
+  //estimate that each reduce gets an equal share of total map output
+  public long getEstimatedReduceInputSize() {
+    return 
+       getEstimatedMapOutputSize() * job.desiredMaps() / job.desiredReduces();
+  }
+  
+
+}

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Tue Aug 12 13:15:38
2008
@@ -820,6 +820,14 @@
     return ret.toString();
   }
 
+  public long getMapInputSize() {
+    if(isMapTask()) {
+      return rawSplit.getDataLength();
+    } else {
+      return 0;
+    }
+  }
+  
   public void clearSplit() {
     rawSplit.clearBytes();
   }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Tue Aug 12 13:15:38
2008
@@ -52,6 +52,7 @@
     
   private long startTime; 
   private long finishTime; 
+  private long outputSize;
     
   private Phase phase = Phase.STARTING; 
   private Counters counters;
@@ -225,6 +226,21 @@
   }
   
   /**
+   * Returns the number of bytes of output from this map.
+   */
+  public long getOutputSize() {
+    return outputSize;
+  }
+  
+  /**
+   * Set the size on disk of this task's output.
+   * @param l the number of map output bytes
+   */
+  void setOutputSize(long l)  {
+    outputSize = l;
+  }
+  
+  /**
    * Get the list of maps from which output-fetches failed.
    * 
    * @return the list of maps from which output-fetches failed.
@@ -278,6 +294,7 @@
     
     this.phase = status.getPhase();
     this.counters = status.getCounters();
+    this.outputSize = status.outputSize;
   }
   
   /**
@@ -313,6 +330,7 @@
     out.writeLong(startTime);
     out.writeLong(finishTime);
     out.writeBoolean(includeCounters);
+    out.writeLong(outputSize);
     if (includeCounters) {
       counters.write(out);
     }
@@ -330,6 +348,7 @@
     this.finishTime = in.readLong(); 
     counters = new Counters();
     this.includeCounters = in.readBoolean();
+    this.outputSize = in.readLong();
     if (includeCounters) {
       counters.readFields(in);
     }

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Tue Aug 12 13:15:38
2008
@@ -1025,6 +1025,7 @@
     if (askForNewTask) {
       checkLocalDirs(fConf.getLocalDirs());
       askForNewTask = enoughFreeSpace(localMinSpaceStart);
+      status.setAvailableSpace( getFreeSpace() );
     }
       
     //
@@ -1243,8 +1244,8 @@
   }
     
   /**
-   * Check if all of the local directories have enough
-   * free space
+   * Check if any of the local directories has enough
+   * free space  (more than minSpace)
    * 
    * If not, do not try to get a new task assigned 
    * @return
@@ -1254,6 +1255,11 @@
     if (minSpace == 0) {
       return true;
     }
+    return minSpace < getFreeSpace();
+  }
+  
+  private long getFreeSpace() throws IOException {
+    long biggestSeenSoFar = 0;
     String[] localDirs = fConf.getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
       DF df = null;
@@ -1264,14 +1270,52 @@
         localDirsDf.put(localDirs[i], df);
       }
 
-      if (df.getAvailable() > minSpace)
-        return true;
+      long availOnThisVol = df.getAvailable();
+      if (availOnThisVol > biggestSeenSoFar) {
+        biggestSeenSoFar = availOnThisVol;
+      }
     }
-
-    return false;
+    
+    //Should ultimately hold back the space we expect running tasks to use but 
+    //that estimate isn't currently being passed down to the TaskTrackers    
+    return biggestSeenSoFar;
   }
     
   /**
+   * Try to get the size of output for this task.
+   * Returns -1 if it can't be found.
+   * @return
+   */
+  long tryToGetOutputSize(TaskAttemptID taskId, JobConf conf) {
+    
+    try{
+      TaskInProgress tip;
+      synchronized(this) {
+        tip = tasks.get(taskId);
+      }
+      if(tip == null)
+         return -1;
+      
+      MapOutputFile mapOutputFile = new MapOutputFile();
+      mapOutputFile.setJobId(taskId.getJobID());
+      mapOutputFile.setConf(conf);
+      
+      Path tmp_output =  mapOutputFile.getOutputFile(taskId);
+      if(tmp_output == null)
+        return 0;
+      FileSystem localFS = FileSystem.getLocal(conf);
+      FileStatus stat = localFS.getFileStatus(tmp_output);
+      if(stat == null)
+        return 0;
+      else
+        return stat.getLen();
+    } catch(IOException e) {
+      LOG.info(e);
+      return -1;
+    }
+  }
+  
+  /**
    * Start a new task.
    * All exceptions are handled locally, so that we don't mess up the
    * task tracker.
@@ -1596,6 +1640,8 @@
       this.done = true;
       
       LOG.info("Task " + task.getTaskID() + " is done.");
+      LOG.info("reported output size for " + task.getTaskID() +  "  was " + taskStatus.getOutputSize());
+
     }
 
     /**
@@ -1660,7 +1706,7 @@
                                      localJobConf). toString());
               } catch (IOException e) {
                 LOG.warn("Working Directory of the task " + task.getTaskID() +
-                		 "doesnt exist. Throws expetion " +
+                		 "doesnt exist. Caught exception " +
                           StringUtils.stringifyException(e));
               }
               // Build the command  
@@ -2195,6 +2241,7 @@
     for(TaskInProgress tip: runningTasks.values()) {
       TaskStatus status = tip.getStatus();
       status.setIncludeCounters(sendCounters);
+      status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf));
       // send counters for finished or failed tasks.
       if (status.getRunState() != TaskStatus.State.RUNNING) {
         status.setIncludeCounters(true);

Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=685290&r1=685289&r2=685290&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Aug 12
13:15:38 2008
@@ -48,11 +48,13 @@
   volatile long lastSeen;
   private int maxMapTasks;
   private int maxReduceTasks;
+  long availableSpace; //space available on this node
     
   /**
    */
   public TaskTrackerStatus() {
     taskReports = new ArrayList<TaskStatus>();
+    this.availableSpace = Long.MAX_VALUE; //not measured by default.
   }
 
   /**
@@ -69,6 +71,7 @@
     this.failures = failures;
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
+    this.availableSpace = Long.MAX_VALUE; //not measured by default.
   }
 
   /**
@@ -166,6 +169,20 @@
   public int getMaxReduceTasks() {
     return maxReduceTasks;
   }  
+  
+  /**
+   * Will return LONG_MAX if space hasn't been measured yet.
+   * @return bytes of available local disk space on this tasktracker.
+   */
+  public long getAvailableSpace() {
+    return availableSpace;
+  }
+  
+  public void setAvailableSpace(long a) {
+    availableSpace = a;
+  }
+  
+  
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
@@ -177,6 +194,7 @@
     out.writeInt(maxMapTasks);
     out.writeInt(maxReduceTasks);
     out.writeInt(taskReports.size());
+    out.writeLong(availableSpace);
     for (TaskStatus taskStatus : taskReports) {
       TaskStatus.writeTaskStatus(out, taskStatus);
     }
@@ -191,6 +209,7 @@
     this.maxReduceTasks = in.readInt();
     taskReports.clear();
     int numTasks = in.readInt();
+    this.availableSpace = in.readLong();
     for (int i = 0; i < numTasks; i++) {
       taskReports.add(TaskStatus.readTaskStatus(in));
     }



Mime
View raw message