hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r785392 [2/5] - in /hadoop/core/branches/HADOOP-4687/mapred: conf/ lib/ src/c++/ src/c++/task-controller/ src/contrib/dynamic-scheduler/ src/contrib/sqoop/ src/contrib/streaming/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ s...
Date Tue, 16 Jun 2009 20:54:28 GMT
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Counters.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/Counters.java Tue Jun 16 20:54:24 2009
@@ -36,6 +36,7 @@
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -398,6 +399,13 @@
    * @return the counter for that name
    */
   public synchronized Counter findCounter(String group, String name) {
+    if (name.equals("MAP_INPUT_BYTES")) {
+      group = FileInputFormat.COUNTER_GROUP; 
+      name = FileInputFormat.BYTES_READ; 
+      LOG.warn("Counter name MAP_INPUT_BYTES is deprecated. " +
+               "Use FileInputFormatCounters as group name and " +
+               " BYTES_READ as counter name instead");
+    }
     return getGroup(group).getCounterForName(name);
   }
 
@@ -411,7 +419,7 @@
    */
   @Deprecated
   public synchronized Counter findCounter(String group, int id, String name) {
-    return getGroup(group).getCounterForName(name);
+    return findCounter(group, name);
   }
 
   /**
@@ -432,7 +440,7 @@
    * @param amount amount by which counter is to be incremented
    */
   public synchronized void incrCounter(String group, String counter, long amount) {
-    getGroup(group).getCounterForName(counter).increment(amount);
+    findCounter(group, counter).increment(amount);
   }
   
   /**

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/DefaultTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/DefaultTaskController.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/DefaultTaskController.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/DefaultTaskController.java Tue Jun 16 20:54:24 2009
@@ -59,43 +59,7 @@
     context.shExec = shexec;
     shexec.execute();
   }
-  
-  /**
-   * Kills the JVM running the task stored in the context.
-   * 
-   * @param context the context storing the task running within the JVM
-   * that needs to be killed.
-   */
-  void killTaskJVM(TaskController.TaskControllerContext context) {
-    ShellCommandExecutor shexec = context.shExec;
-
-    if (shexec != null) {
-      Process process = shexec.getProcess();
-      if (Shell.WINDOWS) {
-        // Currently we don't use setsid on WINDOWS. So kill the process alone.
-        if (process != null) {
-          process.destroy();
-        }
-      }
-      else { // In addition to the task JVM, kill its subprocesses also.
-        String pid = context.pid;
-        if (pid != null) {
-          ProcessTree.destroy(pid, context.sleeptimeBeforeSigkill,
-              ProcessTree.isSetsidAvailable, false);
-          try {
-            if (process != null) {
-              LOG.info("Process exited with exit code:" + process.waitFor());
-            }
-          } catch (InterruptedException ie) {}
-        }
-        else if (process != null) {
-          // kill the task JVM alone, if we don't have the process group id
-          process.destroy();
-        }
-      }
-    }
-  }
-  
+    
   /**
    * Initialize the task environment.
    * 
@@ -123,5 +87,50 @@
   @Override
   void initializeJob(JobID jobId) {
   }
+
+  @Override
+  void terminateTask(TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    if (shexec != null) {
+      Process process = shexec.getProcess();
+      if (Shell.WINDOWS) {
+        // Currently we don't use setsid on WINDOWS. 
+        //So kill the process alone.
+        if (process != null) {
+          process.destroy();
+        }
+      }
+      else { // In addition to the task JVM, kill its subprocesses also.
+        String pid = context.pid;
+        if (pid != null) {
+          if(ProcessTree.isSetsidAvailable) {
+            ProcessTree.terminateProcessGroup(pid);
+          }else {
+            ProcessTree.terminateProcess(pid);
+          }
+        }
+      }
+    }
+  }
+  
+  @Override
+  void killTask(TaskControllerContext context) {
+    ShellCommandExecutor shexec = context.shExec;
+    if (shexec != null) {
+      if (Shell.WINDOWS) {
+        //We don't do send kill process signal in case of windows as 
+        //already we have done a process.destroy() in termintateTaskJVM()
+        return;
+      }
+      String pid = context.pid;
+      if (pid != null) {
+        if(ProcessTree.isSetsidAvailable) {
+          ProcessTree.killProcessGroup(pid);
+        }else {
+          ProcessTree.killProcess(pid);
+        }
+      }
+    }
+  }
   
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobClient.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobClient.java Tue Jun 16 20:54:24 2009
@@ -613,7 +613,7 @@
       for (String tmpjars: libjarsArr) {
         Path tmp = new Path(tmpjars);
         Path newPath = copyRemoteFiles(fs, libjarsDir, tmp, job, replication);
-        DistributedCache.addArchiveToClassPath(newPath, job);
+        DistributedCache.addFileToClassPath(newPath, job);
       }
     }
     
@@ -936,19 +936,16 @@
   throws IOException {
     FileStatus[] contents = fs.listStatus(jobDirPath);
     int matchCount = 0;
-    if (contents != null && contents.length >=3) {
+    if (contents != null && contents.length >=2) {
       for (FileStatus status : contents) {
         if ("job.xml".equals(status.getPath().getName())) {
           ++matchCount;
         }
-        if ("job.jar".equals(status.getPath().getName())) {
-          ++matchCount;
-        }
         if ("job.split".equals(status.getPath().getName())) {
           ++matchCount;
         }
       }
-      if (matchCount == 3) {
+      if (matchCount == 2) {
         return true;
       }
     }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobConf.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobConf.java Tue Jun 16 20:54:24 2009
@@ -123,111 +123,11 @@
    */
   public static final String DEFAULT_QUEUE_NAME = "default";
 
-  /**
-   * Cluster-wide configuration to be set by the administrators that provides
-   * default amount of maximum virtual memory for job's tasks. This has to be
-   * set on both the JobTracker node for the sake of scheduling decisions and on
-   * the TaskTracker nodes for the sake of memory management.
-   * 
-   * <p>
-   * 
-   * If a job doesn't specify its virtual memory requirement by setting
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY} to {@link #DISABLED_MEMORY_LIMIT},
-   * tasks are assured a memory limit set to this property. This property is
-   * disabled by default, and if not explicitly set to a valid value by the
-   * administrators and if a job doesn't specify its virtual memory
-   * requirements, the job's tasks will not be assured anything and may be
-   * killed by a TT that intends to control the total memory usage of the tasks
-   * via memory management functionality.
-   * 
-   * <p>
-   * 
-   * This value should in general be less than the cluster-wide configuration
-   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} . If not or if it not set,
-   * TaskTracker's memory management may be disabled and a scheduler's memory
-   * based scheduling decisions will be affected. Please refer to the
-   * documentation of the configured scheduler to see how this property is used.
-   */
-  public static final String MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY =
-      "mapred.task.default.maxvmem";
+  static final String MAPRED_JOB_MAP_MEMORY_MB_PROPERTY =
+      "mapred.job.map.memory.mb";
 
-  /**
-   * The maximum amount of memory any task of this job will use.
-   * 
-   * <p>
-   * 
-   * This value will be used by TaskTrackers for monitoring the memory usage of
-   * tasks of this jobs. If a TaskTracker's memory management functionality is
-   * enabled, each task of this job will be allowed to use a maximum virtual
-   * memory specified by this property. If the task's memory usage goes over
-   * this value, the task will be failed by the TT. If not set, the cluster-wide
-   * configuration {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is used as the
-   * default value for memory requirements. If this property cascaded with
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} becomes equal to -1, job's
-   * tasks will not be assured anything and may be killed by a TT that intends
-   * to control the total memory usage of the tasks via memory management
-   * functionality. If the memory management functionality is disabled on a TT,
-   * this value is ignored.
-   * 
-   * <p>
-   * 
-   * This value should also be not more than the cluster-wide configuration
-   * {@link #UPPER_LIMIT_ON_TASK_VMEM_PROPERTY} which has to be set by the site
-   * administrators.
-   * 
-   * <p>
-   * 
-   * This value may be used by schedulers that support scheduling based on job's
-   * memory requirements. In general, a task of this job will be scheduled on a
-   * TaskTracker only if the amount of virtual memory still unoccupied on the
-   * TaskTracker is greater than or equal to this value. But different
-   * schedulers can take different decisions. Please refer to the documentation
-   * of the scheduler being configured to see if it does memory based scheduling
-   * and if it does, how this property is used by that scheduler.
-   * 
-   * @see #setMaxVirtualMemoryForTask(long)
-   * @see #getMaxVirtualMemoryForTask()
-   */
-  public static final String MAPRED_TASK_MAXVMEM_PROPERTY =
-      "mapred.task.maxvmem";
-
-  /**
-   * The maximum amount of physical memory any task of a job will use.
-   * 
-   * <p>
-   * 
-   * This value may be used by schedulers that support scheduling based on job's
-   * memory requirements. In general, a task of this job will be scheduled on a
-   * TaskTracker, only if the amount of physical memory still unoccupied on the
-   * TaskTracker is greater than or equal to this value. But different
-   * schedulers can take different decisions. Please refer to the documentation
-   * of the scheduler being configured to see how it does memory based
-   * scheduling and how this variable is used by that scheduler.
-   * 
-   * @see #setMaxPhysicalMemoryForTask(long)
-   * @see #getMaxPhysicalMemoryForTask()
-   */
-  public static final String MAPRED_TASK_MAXPMEM_PROPERTY =
-      "mapred.task.maxpmem";
-
-  /**
-   * Cluster-wide configuration to be set by the site administrators that
-   * provides an upper limit on the maximum virtual memory that can be specified
-   * by a job. The job configuration {@link #MAPRED_TASK_MAXVMEM_PROPERTY} and
-   * the cluster-wide configuration
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} should, by definition, be
-   * less than this value. If the job configuration
-   * {@link #MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY} is more than this value,
-   * depending on the scheduler being configured, the job may be rejected or the
-   * job configuration may just be ignored.
-   * 
-   * <p>
-   * 
-   * If it is not set on a TaskTracker, TaskTracker's memory management will be
-   * disabled.
-   */
-  public static final String UPPER_LIMIT_ON_TASK_VMEM_PROPERTY =
-      "mapred.task.limit.maxvmem";
+  static final String MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.job.reduce.memory.mb";
 
   /**
    * Construct a map/reduce job configuration.
@@ -1491,53 +1391,23 @@
   public String getJobLocalDir() {
     return get("job.local.dir");
   }
-  
-  /**
-   * The maximum amount of memory any task of this job will use. See
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
-   * 
-   * @return The maximum amount of memory any task of this job will use, in
-   *         bytes.
-   * @see #setMaxVirtualMemoryForTask(long)
-   */
-  public long getMaxVirtualMemoryForTask() {
-    return getLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+
+  long getMemoryForMapTask() {
+    return getLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY,
+        DISABLED_MEMORY_LIMIT);
   }
 
-  /**
-   * Set the maximum amount of memory any task of this job can use. See
-   * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}
-   * 
-   * @param vmem Maximum amount of virtual memory in bytes any task of this job
-   *          can use.
-   * @see #getMaxVirtualMemoryForTask()
-   */
-  public void setMaxVirtualMemoryForTask(long vmem) {
-    setLong(JobConf.MAPRED_TASK_MAXVMEM_PROPERTY, vmem);
+  void setMemoryForMapTask(long mem) {
+    setLong(JobConf.MAPRED_JOB_MAP_MEMORY_MB_PROPERTY, mem);
   }
 
-  /**
-   * The maximum amount of physical memory any task of this job will use. See
-   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
-   * 
-   * @return The maximum amount of physical memory any task of this job will
-   *         use, in bytes.
-   * @see #setMaxPhysicalMemoryForTask(long)
-   */
-  public long getMaxPhysicalMemoryForTask() {
-    return getLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, DISABLED_MEMORY_LIMIT);
+  long getMemoryForReduceTask() {
+    return getLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY,
+        DISABLED_MEMORY_LIMIT);
   }
 
-  /**
-   * Set the maximum amount of physical memory any task of this job can use. See
-   * {@link #MAPRED_TASK_MAXPMEM_PROPERTY}
-   * 
-   * @param pmem Maximum amount of physical memory in bytes any task of this job
-   *          can use.
-   * @see #getMaxPhysicalMemoryForTask()
-   */
-  public void setMaxPhysicalMemoryForTask(long pmem) {
-    setLong(JobConf.MAPRED_TASK_MAXPMEM_PROPERTY, pmem);
+  void setMemoryForReduceTask(long mem) {
+    setLong(JobConf.MAPRED_JOB_REDUCE_MEMORY_MB_PROPERTY, mem);
   }
 
   /**
@@ -1559,6 +1429,91 @@
     set("mapred.job.queue.name", queueName);
   }
   
+  /**
+   * Get the per-node limit on running maps for the job
+   * 
+   * @return per-node running map limit
+   */
+  public int getMaxMapsPerNode() {
+    return getInt("mapred.max.maps.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running maps for the job
+   * 
+   * @param limit per-node running map limit
+   */
+  public void setMaxMapsPerNode(int limit) {
+    setInt("mapred.max.maps.per.node", limit);
+  }
+  
+  /**
+   * Get the per-node limit on running reduces for the job
+   * 
+   * @return per-node running reduce limit
+   */
+  public int getMaxReducesPerNode() {
+    return getInt("mapred.max.reduces.per.node", -1);
+  }
+  
+  /**
+   * Set the per-node limit on running reduces for the job
+   * 
+   * @param limit per-node running reduce limit
+   */
+  public void setMaxReducesPerNode(int limit) {
+    setInt("mapred.max.reduces.per.node", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running maps for the job
+   * 
+   * @return cluster-wide running map limit
+   */
+  public int getRunningMapLimit() {
+    return getInt("mapred.running.map.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running maps for the job
+   * 
+   * @param limit cluster-wide running map limit
+   */
+  public void setRunningMapLimit(int limit) {
+    setInt("mapred.running.map.limit", limit);
+  }
+  
+  /**
+   * Get the cluster-wide limit on running reduces for the job
+   * 
+   * @return cluster-wide running reduce limit
+   */
+  public int getRunningReduceLimit() {
+    return getInt("mapred.running.reduce.limit", -1);
+  }
+  
+  /**
+   * Set the cluster-wide limit on running reduces for the job
+   * 
+   * @param limit cluster-wide running reduce limit
+   */
+  public void setRunningReduceLimit(int limit) {
+    setInt("mapred.running.reduce.limit", limit);
+  }
+  
+  /**
+   * Normalize the negative values in configuration
+   * 
+   * @param val
+   * @return normalized value
+   */
+  public static long normalizeMemoryConfigValue(long val) {
+    if (val < 0) {
+      val = DISABLED_MEMORY_LIMIT;
+    }
+    return val;
+  }
+
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobHistory.java Tue Jun 16 20:54:24 2009
@@ -20,7 +20,6 @@
 
 import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileFilter;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -121,8 +120,8 @@
     LAUNCH_TIME, TOTAL_MAPS, TOTAL_REDUCES, FAILED_MAPS, FAILED_REDUCES, 
     FINISHED_MAPS, FINISHED_REDUCES, JOB_STATUS, TASKID, HOSTNAME, TASK_TYPE, 
     ERROR, TASK_ATTEMPT_ID, TASK_STATUS, COPY_PHASE, SORT_PHASE, REDUCE_PHASE, 
-    SHUFFLE_FINISHED, SORT_FINISHED, COUNTERS, SPLITS, JOB_PRIORITY, HTTP_PORT, 
-    TRACKER_NAME, STATE_STRING, VERSION
+    SHUFFLE_FINISHED, SORT_FINISHED, MAP_FINISHED, COUNTERS, SPLITS,
+    JOB_PRIORITY, HTTP_PORT, TRACKER_NAME, STATE_STRING, VERSION
   }
 
   /**
@@ -876,6 +875,11 @@
         String logFileName = null;
         if (restarted) {
           logFileName = getJobHistoryFileName(jobConf, jobId);
+          if (logFileName == null) {
+            logFileName =
+              encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
+            
+          }
         } else {
           logFileName = 
             encodeJobHistoryFileName(getNewJobHistoryFileName(jobConf, jobId));
@@ -1394,29 +1398,56 @@
     /**
      * Log finish time of map task attempt. 
      * @param taskAttemptId task attempt id 
-     * @param finishTime finish time
+     * @param finishTime finish time of map task
      * @param hostName host name 
      * @deprecated Use 
-     * {@link #logFinished(TaskAttemptID, long, String, String, String, Counters)}
+     * {@link #logFinished(TaskAttemptID, long, long, String, String, String,
+     *  Counters)}
      */
     @Deprecated
     public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
-      logFinished(taskAttemptId, finishTime, hostName, Values.MAP.name(), "", 
-                  new Counters());
+      logFinished(taskAttemptId, finishTime, finishTime, hostName,
+                  Values.MAP.name(), "", new Counters());
     }
 
     /**
      * Log finish time of map task attempt. 
      * 
      * @param taskAttemptId task attempt id 
-     * @param finishTime finish time
+     * @param finishTime finish time of map task
+     * @param hostName host name 
+     * @param taskType Whether the attempt is cleanup or setup or map 
+     * @param stateString state string of the task attempt
+     * @param counter counters of the task attempt
+     * @deprecated Use 
+     * {@link #logFinished(TaskAttemptID, long, long, String, String, String,
+     *  Counters)}
+     */
+    @Deprecated
+    public static void logFinished(TaskAttemptID taskAttemptId,
+                                   long finishTime, 
+                                   String hostName,
+                                   String taskType,
+                                   String stateString, 
+                                   Counters counter) {
+      logFinished(taskAttemptId, finishTime, finishTime, hostName,
+          taskType, stateString, counter);
+    }
+    
+    /**
+     * Log finish time of map task attempt. 
+     * 
+     * @param taskAttemptId task attempt id 
+     * @param mapFinishTime finish time of map phase in map task
+     * @param finishTime finish time of map task
      * @param hostName host name 
      * @param taskType Whether the attempt is cleanup or setup or map 
      * @param stateString state string of the task attempt
      * @param counter counters of the task attempt
      */
     public static void logFinished(TaskAttemptID taskAttemptId, 
+                                   long mapFinishTime,
                                    long finishTime, 
                                    String hostName,
                                    String taskType,
@@ -1430,12 +1461,14 @@
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
+                                     Keys.MAP_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME, 
                                      Keys.STATE_STRING, Keys.COUNTERS},
                          new String[]{taskType, 
                                       taskAttemptId.getTaskID().toString(),
                                       taskAttemptId.toString(), 
                                       Values.SUCCESS.name(),  
+                                      String.valueOf(mapFinishTime),
                                       String.valueOf(finishTime), hostName, 
                                       stateString, 
                                       counter.makeEscapedCompactString()}); 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jun 16 20:54:24 2009
@@ -87,6 +87,12 @@
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
+  // Limits on concurrent running tasks per-node and cluster-wide
+  private int maxMapsPerNode;
+  private int maxReducesPerNode;
+  private int runningMapLimit;
+  private int runningReduceLimit;
+  
   int mapFailuresPercent = 0;
   int reduceFailuresPercent = 0;
   int failedMapTIPs = 0;
@@ -178,8 +184,6 @@
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
   private long inputLength = 0;
-  private long maxVirtualMemoryForTask;
-  private long maxPhysicalMemoryForTask;
   
   private Counters jobCounters = new Counters();
   
@@ -259,6 +263,11 @@
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxMapsPerNode = conf.getMaxMapsPerNode();
+    this.maxReducesPerNode = conf.getMaxReducesPerNode();
+    this.runningMapLimit = conf.getRunningMapLimit();
+    this.runningReduceLimit = conf.getRunningReduceLimit();
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
@@ -276,8 +285,6 @@
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
-    setMaxVirtualMemoryForTask(conf.getMaxVirtualMemoryForTask());
-    setMaxPhysicalMemoryForTask(conf.getMaxPhysicalMemoryForTask());
   }
 
   /**
@@ -385,7 +392,7 @@
       jobInitKillStatus.initStarted = true;
     }
 
-    LOG.debug("initializing " + this.jobId);
+    LOG.info("Initializing " + jobId);
 
     // log job info
     JobHistory.JobInfo.logSubmitted(getJobID(), conf, jobFile.toString(), 
@@ -418,6 +425,7 @@
                 (numMapTasks + numReduceTasks) +
                 " exceeds the configured limit " + maxTasks);
     }
+
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
     
@@ -428,38 +436,15 @@
                                    splits[i], 
                                    jobtracker, conf, this, i);
     }
-    LOG.info("Input size for job "+ jobId + " = " + inputLength);
+    LOG.info("Input size for job " + jobId + " = " + inputLength
+        + ". Number of splits = " + splits.length);
     if (numMapTasks > 0) { 
-      LOG.info("Split info for job:" + jobId + " with " + 
-               splits.length + " splits:");
       nonRunningMapCache = createCache(splits, maxLevel);
     }
         
     // set the launch time
     this.launchTime = System.currentTimeMillis();
 
-    // if no split is returned, job is considered completed and successful
-    if (numMapTasks == 0) {
-      // Finished time need to be setted here to prevent this job to be retired
-      // from the job tracker jobs at the next retire iteration.
-      this.finishTime = this.launchTime;
-      status.setSetupProgress(1.0f);
-      status.setMapProgress(1.0f);
-      status.setReduceProgress(1.0f);
-      status.setCleanupProgress(1.0f);
-      status.setRunState(JobStatus.SUCCEEDED);
-      tasksInited.set(true);
-      JobHistory.JobInfo.logInited(profile.getJobID(), 
-                                    this.launchTime, 0, 0);
-      JobHistory.JobInfo.logFinished(profile.getJobID(), 
-                                     this.finishTime, 0, 0, 0, 0,
-                                     getCounters());
-      // Special case because the Job is not queued
-      JobEndNotifier.registerNotification(this.getJobConf(), this.getStatus());
-
-      return;
-    }
-
     //
     // Create reduce tasks
     //
@@ -481,9 +466,11 @@
 
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
-    // cleanup map tip. This map is doesn't use split. 
-    // Just assign splits[0]
-    cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+
+    // cleanup map tip. This map doesn't use any splits. Just assign an empty
+    // split.
+    JobClient.RawSplit emptySplit = new JobClient.RawSplit();
+    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks);
     cleanup[0].setJobCleanupTask();
 
@@ -494,9 +481,10 @@
 
     // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
-    // setup map tip. This map is doesn't use split. 
-    // Just assign splits[0]
-    setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
+
+    // setup map tip. This map doesn't use any split. Just assign an empty
+    // split.
+    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
             jobtracker, conf, this, numMapTasks + 1 );
     setup[0].setJobSetupTask();
 
@@ -579,23 +567,6 @@
     JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
 
-  // Accessors for resources.
-  long getMaxVirtualMemoryForTask() {
-    return maxVirtualMemoryForTask;
-  }
-
-  void setMaxVirtualMemoryForTask(long maxVMem) {
-    maxVirtualMemoryForTask = maxVMem;
-  }
-
-  long getMaxPhysicalMemoryForTask() {
-    return maxPhysicalMemoryForTask;
-  }
-
-  void setMaxPhysicalMemoryForTask(long maxPMem) {
-    maxPhysicalMemoryForTask = maxPMem;
-  }
-
   // Update the job start/launch time (upon restart) and log to history
   synchronized void updateJobInfo(long startTime, long launchTime) {
     // log and change to the job's start/launch time
@@ -615,6 +586,14 @@
     return inputLength;
   }
  
+  boolean isCleanupLaunched() {
+    return launchedCleanup;
+  }
+
+  boolean isSetupLaunched() {
+    return launchedSetup;
+  }
+
   /**
    * Get the list of map tasks
    * @return the raw array of maps for this job
@@ -889,20 +868,11 @@
     if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
-        if (maps.length == 0) {
-          this.status.setMapProgress(1.0f);
-        } else {
           this.status.setMapProgress((float) (this.status.mapProgress() +
                                               progressDelta / maps.length));
-        }
       } else {
-        if (reduces.length == 0) {
-          this.status.setReduceProgress(1.0f);
-        } else {
-          this.status.setReduceProgress
-            ((float) (this.status.reduceProgress() +
-                      (progressDelta / reduces.length)));
-        }
+        this.status.setReduceProgress((float) (this.status.reduceProgress() + 
+                                           (progressDelta / reduces.length)));
       }
     }
   }
@@ -1129,8 +1099,10 @@
         status.getRunState() != JobStatus.PREP) {
       return false;
     }
-    // check if cleanup task has been launched already. 
-    if (launchedCleanup) {
+    // check if cleanup task has been launched already or if setup isn't
+    // launched already. The later check is useful when number of maps is
+    // zero.
+    if (launchedCleanup || !isSetupFinished()) {
       return false;
     }
     // check if job has failed or killed
@@ -1164,7 +1136,6 @@
       if (!canLaunchSetupTask()) {
         return null;
       }
-      
       String taskTracker = tts.getTrackerName();
       // Update the last-known clusterSize
       this.clusterSize = clusterSize;
@@ -1706,6 +1677,11 @@
                                           final int numUniqueHosts,
                                           final int maxCacheLevel,
                                           final double avgProgress) {
+    if (numMapTasks == 0) {
+      LOG.info("No maps to schedule for " + profile.getJobID());
+      return -1;
+    }
+
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
@@ -1714,6 +1690,10 @@
     //
     this.clusterSize = clusterSize;
 
+    if (!belowRunningTaskLimit(tts, true)) {
+      return -1;
+    }
+    
     if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
     }
@@ -1911,11 +1891,20 @@
                                              int clusterSize,
                                              int numUniqueHosts,
                                              double avgProgress) {
+    if (numReduceTasks == 0) {
+      LOG.info("No reduces to schedule for " + profile.getJobID());
+      return -1;
+    }
+
     String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
-    
+
     // Update the last-known clusterSize
     this.clusterSize = clusterSize;
+    
+    if (!belowRunningTaskLimit(tts, false)) {
+      return -1;
+    }
 
     if (!shouldRunOnTaskTracker(taskTracker)) {
       return -1;
@@ -1969,6 +1958,42 @@
     }
     return true;
   }
+  
+  /**
+   * Check whether we are below the running task limits (per node and cluster
+   * wide) for a given type of task on a given task tracker.
+   * 
+   * @param tts task tracker to check on
+   * @param map true if looking at map tasks, false for reduce tasks
+   * @return true if we are below both the cluster-wide and the per-node 
+   *         running task limit for the given type of task
+   */
+  private boolean belowRunningTaskLimit(TaskTrackerStatus tts, boolean map) {
+    int runningTasks = map ? runningMapTasks : runningReduceTasks;
+    int clusterLimit = map ? runningMapLimit : runningReduceLimit;
+    int perNodeLimit = map ? maxMapsPerNode  : maxReducesPerNode;
+    
+    // Check cluster-wide limit
+    if (clusterLimit != -1 && runningTasks >= clusterLimit) {
+      return false;
+    }
+    
+    // Check per-node limit
+    if (perNodeLimit != -1) {
+      int runningTasksOnNode = 0;
+      for (TaskStatus ts: tts.getTaskReports()) {
+        if (ts.getTaskID().getJobID().equals(jobId) && ts.getIsMap() == map &&
+            ts.getRunState().equals(TaskStatus.State.RUNNING)) {
+          runningTasksOnNode++;
+        }
+      }
+      if (runningTasksOnNode >= perNodeLimit) {
+        return false;
+      }
+    }
+    
+    return true;
+  }
 
   /**
    * A taskid assigned to this JobInProgress has reported in successfully.
@@ -2011,7 +2036,9 @@
                                        status.getTaskTracker(), 
                                        ttStatus.getHttpPort(), 
                                        taskType); 
-      JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
+      JobHistory.MapAttempt.logFinished(status.getTaskID(),
+                                        status.getMapFinishTime(),
+                                        status.getFinishTime(), 
                                         trackerHostname, taskType,
                                         status.getStateString(), 
                                         status.getCounters()); 
@@ -2109,6 +2136,12 @@
     if (this.status.getRunState() == JobStatus.RUNNING ) {
       this.status.setRunState(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
+      if (maps.length == 0) {
+        this.status.setMapProgress(1.0f);
+      }
+      if (reduces.length == 0) {
+        this.status.setReduceProgress(1.0f);
+      }
       this.finishTime = System.currentTimeMillis();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
@@ -2426,6 +2459,14 @@
     }
   }
 
+  boolean isSetupFinished() {
+    if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+        || setup[1].isFailed()) {
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Fail a task with a given reason, but without a status object.
    * 

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Jun 16 20:54:24 2009
@@ -116,9 +116,12 @@
   // The maximum number of blacklists for a tracker after which the 
   // tracker could be blacklisted across all jobs
   private int MAX_BLACKLISTS_PER_TRACKER = 4;
+  // Approximate number of heartbeats that could arrive JobTracker
+  // in a second
+  private int NUM_HEARTBEATS_IN_SECOND = 100;
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
-  private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
+  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
 
   private DNSToSwitchMapping dnsToSwitchMapping;
   private NetworkTopology clusterMap = new NetworkTopology();
@@ -1086,6 +1089,10 @@
         taskStatus.setShuffleFinishTime(shuffleTime);
         taskStatus.setSortFinishTime(sortTime);
       }
+      else if (type.equals(Values.MAP.name())) {
+        taskStatus.setMapFinishTime(
+            Long.parseLong(attempt.get(Keys.MAP_FINISHED)));
+      }
 
       // Add the counters
       String counterString = attempt.get(Keys.COUNTERS);
@@ -1187,17 +1194,38 @@
         shouldRecover = false;
 
         // write the jobtracker.info file
-        FSDataOutputStream out = FileSystem.create(fs, restartFile, filePerm);
-        out.writeInt(0);
-        out.close();
+        try {
+          FSDataOutputStream out = FileSystem.create(fs, restartFile, 
+                                                     filePerm);
+          out.writeInt(0);
+          out.close();
+        } catch (IOException ioe) {
+          LOG.warn("Writing to file " + restartFile + " failed!");
+          LOG.warn("FileSystem is not ready yet!");
+          fs.delete(restartFile, false);
+          throw ioe;
+        }
         return;
       }
 
       FSDataInputStream in = fs.open(restartFile);
-      // read the old count
-      restartCount = in.readInt();
-      ++restartCount; // increment the restart count
-      in.close();
+      try {
+        // read the old count
+        restartCount = in.readInt();
+        ++restartCount; // increment the restart count
+      } catch (IOException ioe) {
+        LOG.warn("System directory is garbled. Failed to read file " 
+                 + restartFile);
+        LOG.warn("Jobtracker recovery is not possible with garbled"
+                 + " system directory! Please delete the system directory and"
+                 + " restart the jobtracker. Note that deleting the system" 
+                 + " directory will result in loss of all the running jobs.");
+        throw new RuntimeException(ioe);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+      }
 
       // Write back the new restart count and rename the old info file
       //TODO This is similar to jobhistory recovery, maybe this common code
@@ -1432,6 +1460,10 @@
   Map<String, Set<JobID>> trackerToJobsToCleanup = 
     new HashMap<String, Set<JobID>>();
   
+  // (trackerID --> list of tasks to cleanup)
+  Map<String, Set<TaskAttemptID>> trackerToTasksToCleanup = 
+    new HashMap<String, Set<TaskAttemptID>>();
+  
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -1526,6 +1558,11 @@
   private final UserGroupInformation mrOwner;
   private final String supergroup;
 
+  long limitMaxMemForMapTasks;
+  long limitMaxMemForReduceTasks;
+  long memSizeForMapSlotOnJT;
+  long memSizeForReduceSlotOnJT;
+
   private QueueManager queueManager;
 
   /**
@@ -1552,6 +1589,8 @@
     MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
     MAX_BLACKLISTS_PER_TRACKER = 
         conf.getInt("mapred.max.tracker.blacklists", 4);
+    NUM_HEARTBEATS_IN_SECOND = 
+        conf.getInt("mapred.heartbeats.in.second", 100);
 
     //This configuration is there solely for tuning purposes and 
     //once this feature has been tested in real clusters and an appropriate
@@ -1564,6 +1603,8 @@
     this.conf = conf;
     JobConf jobConf = new JobConf(conf);
 
+    initializeTaskMemoryRelatedConfig();
+
     // Read the hosts/exclude files to restrict access to the jobtracker.
     this.hostsReader = new HostsFileReader(conf.get("mapred.hosts", ""),
                                            conf.get("mapred.hosts.exclude", ""));
@@ -1705,24 +1746,7 @@
         }
         LOG.info("problem cleaning system directory: " + systemDir, ie);
       }
-      Thread.sleep(SYSTEM_DIR_CLEANUP_RETRY_PERIOD);
-    }
-
-    // Prepare for recovery. This is done irrespective of the status of restart
-    // flag.
-    try {
-      recoveryManager.updateRestartCount();
-    } catch (IOException ioe) {
-      LOG.warn("Failed to initialize recovery manager. The Recovery manager "
-               + "failed to access the system files in the system dir (" 
-               + getSystemDir() + ")."); 
-      LOG.warn("It might be because the JobTracker failed to read/write system"
-               + " files (" + recoveryManager.getRestartCountFile() + " / " 
-               + recoveryManager.getTempRestartCountFile() + ") or the system "
-               + " file " + recoveryManager.getRestartCountFile() 
-               + " is missing!");
-      LOG.warn("Bailing out...");
-      throw ioe;
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
     }
     
     // Same with 'localDir' except it's always on the local disk.
@@ -1840,6 +1864,20 @@
    * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
+    // Prepare for recovery. This is done irrespective of the status of restart
+    // flag.
+    while (true) {
+      try {
+        recoveryManager.updateRestartCount();
+        break;
+      } catch (IOException ioe) {
+        LOG.warn("Failed to initialize recovery manager. ", ioe);
+        // wait for some time
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+        LOG.warn("Retrying...");
+      }
+    }
+
     taskScheduler.start();
     
     //  Start the recovery after starting the scheduler
@@ -1848,6 +1886,9 @@
     } catch (Throwable t) {
       LOG.warn("Recovery manager crashed! Ignoring.", t);
     }
+    // refresh the node list as the recovery manager might have added 
+    // disallowed trackers
+    refreshHosts();
     
     this.expireTrackersThread = new Thread(this.expireTrackers,
                                           "expireTrackers");
@@ -2660,7 +2701,7 @@
     int clusterSize = getClusterStatus().getTaskTrackers();
     int heartbeatInterval =  Math.max(
                                 (int)(1000 * Math.ceil((double)clusterSize / 
-                                                       CLUSTER_INCREMENT)),
+                                                       NUM_HEARTBEATS_IN_SECOND)),
                                 HEARTBEAT_INTERVAL_MIN) ;
     return heartbeatInterval;
   }
@@ -2804,8 +2845,8 @@
                                                               String taskTracker) {
     
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
+    List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
     if (taskIds != null) {
-      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
       for (TaskAttemptID killTaskId : taskIds) {
         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
         if (tip == null) {
@@ -2823,10 +2864,18 @@
           }
         }
       }
-            
-      return killList;
     }
-    return null;
+    
+    // add the stray attempts for uninited jobs
+    synchronized (trackerToTasksToCleanup) {
+      Set<TaskAttemptID> set = trackerToTasksToCleanup.remove(taskTracker);
+      if (set != null) {
+        for (TaskAttemptID id : set) {
+          killList.add(new KillTaskAction(id));
+        }
+      }
+    }
+    return killList;
   }
 
   /**
@@ -3033,6 +3082,15 @@
       throw ioe;
     }
 
+    // Check the job if it cannot run in the cluster because of invalid memory
+    // requirements.
+    try {
+      checkMemoryRequirements(job);
+    } catch (IOException ioe) {
+      new CleanupQueue().addToQueue(fs, getSystemDirectoryForJob(jobId));
+      throw ioe;
+    }
+
    return addJob(jobId, job); 
   }
 
@@ -3296,6 +3354,16 @@
   }
   
   TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
+
+  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+      "mapred.cluster.map.memory.mb";
+  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.cluster.reduce.memory.mb";
+
+  static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =
+      "mapred.cluster.max.map.memory.mb";
+  static final String MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY =
+      "mapred.cluster.max.reduce.memory.mb";
   
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
@@ -3483,6 +3551,19 @@
         continue;
       }
       
+      if (!job.inited()) {
+        // if job is not yet initialized ... kill the attempt
+        synchronized (trackerToTasksToCleanup) {
+          Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName);
+          if (tasks == null) {
+            tasks = new HashSet<TaskAttemptID>();
+            trackerToTasksToCleanup.put(trackerName, tasks);
+          }
+          tasks.add(taskId);
+        }
+        continue;
+      }
+
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
@@ -3547,6 +3628,10 @@
       trackerToJobsToCleanup.remove(trackerName);
     }
     
+    synchronized (trackerToTasksToCleanup) {
+      trackerToTasksToCleanup.remove(trackerName);
+    }
+    
     // Inform the recovery manager
     recoveryManager.unMarkTracker(trackerName);
     
@@ -3610,7 +3695,12 @@
   public synchronized void refreshNodes() throws IOException {
     // check access
     PermissionChecker.checkSuperuserPrivilege(mrOwner, supergroup);
-
+    
+    // call the actual api
+    refreshHosts();
+  }
+  
+  private synchronized void refreshHosts() throws IOException {
     // Reread the config to get mapred.hosts and mapred.hosts.exclude filenames.
     // Update the file names and refresh internal includes and excludes list
     LOG.info("Refreshing hosts information");
@@ -3762,4 +3852,81 @@
         UserGroupInformation.getCurrentUGI().getUserName());
     this.queueManager.refreshAcls(new Configuration(this.conf));
   }
+
+  private void initializeTaskMemoryRelatedConfig() {
+    memSizeForMapSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    memSizeForReduceSlotOnJT =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForMapTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    limitMaxMemForReduceTasks =
+        JobConf.normalizeMemoryConfigValue(conf.getLong(
+            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+            JobConf.DISABLED_MEMORY_LIMIT));
+    LOG.info(new StringBuilder().append("Scheduler configured with ").append(
+        "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append(
+        " limitMaxMemForMapTasks, limitMaxMemForReduceTasks) (").append(
+        memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT)
+        .append(", ").append(limitMaxMemForMapTasks).append(", ").append(
+            limitMaxMemForReduceTasks).append(")"));
+  }
+
+  private boolean perTaskMemoryConfigurationSetOnJT() {
+    if (limitMaxMemForMapTasks == JobConf.DISABLED_MEMORY_LIMIT
+        || limitMaxMemForReduceTasks == JobConf.DISABLED_MEMORY_LIMIT
+        || memSizeForMapSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT
+        || memSizeForReduceSlotOnJT == JobConf.DISABLED_MEMORY_LIMIT) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Check the job if it has invalid requirements and throw and IOException if does have.
+   * 
+   * @param job
+   * @throws IOException 
+   */
+  private void checkMemoryRequirements(JobInProgress job)
+      throws IOException {
+    if (!perTaskMemoryConfigurationSetOnJT()) {
+      LOG.debug("Per-Task memory configuration is not set on JT. "
+          + "Not checking the job for invalid memory requirements.");
+      return;
+    }
+
+    boolean invalidJob = false;
+    String msg = "";
+    long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
+    long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+
+    if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
+        || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {
+      invalidJob = true;
+      msg = "Invalid job requirements.";
+    }
+
+    if (maxMemForMapTask > limitMaxMemForMapTasks
+        || maxMemForReduceTask > limitMaxMemForReduceTasks) {
+      invalidJob = true;
+      msg = "Exceeds the cluster's max-memory-limit.";
+    }
+
+    if (invalidJob) {
+      StringBuilder jobStr =
+          new StringBuilder().append(job.getJobID().toString()).append("(")
+              .append(maxMemForMapTask).append(" memForMapTasks ").append(
+                  maxMemForReduceTask).append(" memForReduceTasks): ");
+      LOG.warn(jobStr.toString() + msg);
+
+      throw new IOException(jobStr.toString() + msg);
+    }
+  }
 }

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Jun 16 20:54:24 2009
@@ -438,7 +438,7 @@
                 .getLong("mapred.tasktracker.tasks.sleeptime-before-sigkill",
                     ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
-            controller.killTaskJVM(initalContext);
+            controller.destroyTaskJVM(initalContext);
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId
                 .toString()));

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jun 16 20:54:24 2009
@@ -24,12 +24,14 @@
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
@@ -107,6 +109,7 @@
    */
   enum TaskCommands {
     LAUNCH_TASK_JVM,
+    TERMINATE_TASK_JVM,
     KILL_TASK_JVM
   }
   
@@ -126,38 +129,65 @@
       TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
           env.logSize, true);
 
+    StringBuffer sb = new StringBuffer();
+    //export out all the environment variable before child command as
+    //the setuid/setgid binaries would not be getting, any environmental
+    //variables which begin with LD_*.
+    for(Entry<String, String> entry : env.env.entrySet()) {
+      sb.append("export ");
+      sb.append(entry.getKey());
+      sb.append("=");
+      sb.append(entry.getValue());
+      sb.append("\n");
+    }
+    sb.append(cmdLine);
     // write the command to a file in the
     // task specific cache directory
-    writeCommand(cmdLine, getTaskCacheDirectory(context));
+    writeCommand(sb.toString(), getTaskCacheDirectory(context));
     
     // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildTaskCommandArgs(context);
+    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context);
     ShellCommandExecutor shExec =  buildTaskControllerExecutor(
                                     TaskCommands.LAUNCH_TASK_JVM, 
                                     env.conf.getUser(),
                                     launchTaskJVMArgs, env);
     context.shExec = shExec;
-    shExec.execute();
-    LOG.debug("output after executing task jvm = " + shExec.getOutput());
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while launching task JVM : " + 
+          StringUtils.stringifyException(e));
+      LOG.warn("Exit code from task is : " + shExec.getExitCode());
+      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      throw new IOException(e);
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("output after executing task jvm = " + shExec.getOutput()); 
+    }
   }
 
-  // convenience API for building command arguments for specific commands
-  private List<String> buildTaskCommandArgs(TaskControllerContext context) {
+  /**
+   * Returns list of arguments to be passed while launching task VM.
+   * See {@code buildTaskControllerExecutor(TaskCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * @param context
+   * @return Argument to be used while launching Task VM
+   */
+  private List<String> buildLaunchTaskArgs(TaskControllerContext context) {
     List<String> commandArgs = new ArrayList<String>(3);
     String taskId = context.task.getTaskID().toString();
     String jobId = getJobId(context);
+    LOG.debug("getting the task directory as: " 
+        + getTaskCacheDirectory(context));
+    commandArgs.add(getDirectoryChosenForTask(
+        new File(getTaskCacheDirectory(context)), 
+        context));
     commandArgs.add(jobId);
     if(!context.task.isTaskCleanupTask()) {
       commandArgs.add(taskId);
     }else {
       commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
     }
-    
-    LOG.debug("getting the task directory as: " 
-                + getTaskCacheDirectory(context));
-    commandArgs.add(getDirectoryChosenForTask(
-                              new File(getTaskCacheDirectory(context)), 
-                              context));
     return commandArgs;
   }
   
@@ -173,7 +203,7 @@
   // in mapred.local.dir chosen for storing data pertaining to
   // this task.
   private String getDirectoryChosenForTask(File directory,
-                                            TaskControllerContext context) {
+      TaskControllerContext context) {
     String jobId = getJobId(context);
     String taskId = context.task.getTaskID().toString();
     for (String dir : mapredLocalDirs) {
@@ -184,43 +214,13 @@
         return dir;
       }
     }
-    
+
     LOG.error("Couldn't parse task cache directory correctly");
     throw new IllegalArgumentException("invalid task cache directory "
-                + directory.getAbsolutePath());
+        + directory.getAbsolutePath());
   }
   
   /**
-   * Kill a launched task JVM running as the user of the job.
-   * 
-   * This method will launch the task controller setuid executable
-   * that in turn will kill the task JVM by sending a kill signal.
-   * @param context the context storing the task running within the JVM
-   * that needs to be killed.
-   */
-  void killTaskJVM(TaskControllerContext context) {
-   
-    if(context.task == null) {
-      LOG.info("Context task null not killing the JVM");
-      return;
-    }
-    
-    JvmEnv env = context.env;
-    List<String> killTaskJVMArgs = buildTaskCommandArgs(context);
-    try {
-      ShellCommandExecutor shExec = buildTaskControllerExecutor(
-                                      TaskCommands.KILL_TASK_JVM,
-                                      context.env.conf.getUser(),
-                                      killTaskJVMArgs, 
-                                      context.env);
-      shExec.execute();
-      LOG.debug("Command output :" +shExec.getOutput());
-    } catch (IOException ioe) {
-      LOG.warn("IOException in killing task: " + ioe.getMessage());
-    }
-  }
-
-  /**
    * Setup appropriate permissions for directories and files that
    * are used by the task.
    * 
@@ -281,9 +281,24 @@
       LOG.warn("Could not change permissions for directory " + dir);
     }
   }
-  
-  // convenience API to create the executor for launching the
-  // setuid script.
+  /**
+   * Builds the command line for launching/terminating/killing task JVM.
+   * Following is the format for launching/terminating/killing task JVM
+   * <br/>
+   * For launching following is command line argument:
+   * <br/>
+   * {@code user-name command tt-root job_id task_id} 
+   * <br/>
+   * For terminating/killing task jvm.
+   * {@code user-name command tt-root task-pid}
+   * 
+   * @param command command to be executed.
+   * @param userName user name
+   * @param cmdArgs list of extra arguments
+   * @param env JVM environment variables.
+   * @return {@link ShellCommandExecutor}
+   * @throws IOException
+   */
   private ShellCommandExecutor buildTaskControllerExecutor(TaskCommands command, 
                                           String userName, 
                                           List<String> cmdArgs, JvmEnv env) 
@@ -420,6 +435,67 @@
     }
   }
 
+  /**
+   * API which builds the command line to be pass to LinuxTaskController
+   * binary to terminate/kill the task. See 
+   * {@code buildTaskControllerExecutor(TaskCommands, 
+   * String, List<String>, JvmEnv)} documentation.
+   * 
+   * 
+   * @param context context of task which has to be passed kill signal.
+   * 
+   */
+  private List<String> buildKillTaskCommandArgs(TaskControllerContext 
+      context){
+    List<String> killTaskJVMArgs = new ArrayList<String>();
+    killTaskJVMArgs.add(context.pid);
+    return killTaskJVMArgs;
+  }
+  
+  /**
+   * Convenience method used to sending appropriate Kill signal to the task 
+   * VM
+   * @param context
+   * @param command
+   * @throws IOException
+   */
+  private void finishTask(TaskControllerContext context,
+      TaskCommands command) throws IOException{
+    if(context.task == null) {
+      LOG.info("Context task null not killing the JVM");
+      return;
+    }
+    ShellCommandExecutor shExec = buildTaskControllerExecutor(
+        command, context.env.conf.getUser(), 
+        buildKillTaskCommandArgs(context), context.env);
+    try {
+      shExec.execute();
+    } catch (Exception e) {
+      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
+      throw new IOException(e);
+    }
+  }
+  
+  @Override
+  void terminateTask(TaskControllerContext context) {
+    try {
+      finishTask(context, TaskCommands.TERMINATE_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending kill to the Task VM " + 
+          StringUtils.stringifyException(e));
+    }
+  }
+  
+  @Override
+  void killTask(TaskControllerContext context) {
+    try {
+      finishTask(context, TaskCommands.KILL_TASK_JVM);
+    } catch (Exception e) {
+      LOG.warn("Exception thrown while sending destroy to the Task VM " + 
+          StringUtils.stringifyException(e));
+    }
+  }
+
   protected String getTaskControllerExecutablePath() {
     return taskControllerExe;
   }  

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MRConstants.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MRConstants.java Tue Jun 16 20:54:24 2009
@@ -27,8 +27,6 @@
   //
   public static final int HEARTBEAT_INTERVAL_MIN = 3 * 1000;
   
-  public static final int CLUSTER_INCREMENT = 100;
-
   public static final long COUNTER_UPDATE_INTERVAL = 60 * 1000;
 
   //

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTask.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jun 16 20:54:24 2009
@@ -51,6 +51,7 @@
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapred.Merger.Segment;
 import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.util.IndexedSortable;
@@ -73,8 +74,13 @@
 
   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
 
+  private Progress mapPhase;
+  private Progress sortPhase;
+  
+  
   {   // set phase for this task
     setPhase(TaskStatus.Phase.MAP); 
+    getProgress().setStatus("map");
   }
 
   public MapTask() {
@@ -152,7 +158,9 @@
       throws IOException{
       rawIn = raw;
       inputRecordCounter = reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
-      inputByteCounter = reporter.getCounter(TaskCounter.MAP_INPUT_BYTES);
+      inputByteCounter = reporter.getCounter(
+                           FileInputFormat.COUNTER_GROUP,
+                           FileInputFormat.BYTES_READ);
       this.reporter = reporter;
     }
 
@@ -273,6 +281,11 @@
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, ClassNotFoundException, InterruptedException {
 
+    if (isMapTask()) {
+      mapPhase = getProgress().addPhase("map", 0.667f);
+      sortPhase  = getProgress().addPhase("sort", 0.333f);
+    }
+    
     // start thread that will handle communication with parent
     TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
     reporter.startCommunicationThread();
@@ -348,6 +361,9 @@
 
     try {
       runner.run(in, collector, reporter);      
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
       collector.flush();
     } finally {
       //close
@@ -510,6 +526,9 @@
 
       input.initialize(split, mapperContext);
       mapper.run(mapperContext);
+      mapPhase.complete();
+      setPhase(TaskStatus.Phase.SORT);
+      statusUpdate(umbilical);
       input.close();
       output.close(mapperContext);
     } catch (NoSuchMethodException e) {
@@ -573,8 +592,8 @@
     
   }
 
-  class MapOutputBuffer<K extends Object, V extends Object> 
-  implements MapOutputCollector<K, V>, IndexedSortable {
+  class MapOutputBuffer<K extends Object, V extends Object>
+      implements MapOutputCollector<K, V>, IndexedSortable {
     private final int partitions;
     private final Partitioner<K, V> partitioner;
     private final JobConf job;
@@ -616,6 +635,8 @@
     private volatile Throwable sortSpillException = null;
     private final int softRecordLimit;
     private final int softBufferLimit;
+    private int recordRemaining;
+    private int bufferRemaining;
     private final int minSpillsForCombine;
     private final IndexedSorter sorter;
     private final ReentrantLock spillLock = new ReentrantLock();
@@ -663,8 +684,8 @@
       if ((sortmb & 0x7FF) != sortmb) {
         throw new IOException("Invalid \"io.sort.mb\": " + sortmb);
       }
-      sorter = ReflectionUtils.newInstance(
-            job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
+      sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
+            QuickSort.class, IndexedSorter.class), job);
       LOG.info("io.sort.mb = " + sortmb);
       // buffers and accounting
       int maxMemUsage = sortmb << 20;
@@ -677,6 +698,8 @@
       kvindices = new int[recordCapacity * ACCTSIZE];
       softBufferLimit = (int)(kvbuffer.length * spillper);
       softRecordLimit = (int)(kvoffsets.length * spillper);
+      recordRemaining = softRecordLimit;
+      bufferRemaining = softBufferLimit;
       LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
       // k/v serialization
@@ -744,38 +767,52 @@
                               + value.getClass().getName());
       }
       final int kvnext = (kvindex + 1) % kvoffsets.length;
-      spillLock.lock();
-      try {
-        boolean kvfull;
-        do {
-          if (sortSpillException != null) {
-            throw (IOException)new IOException("Spill failed"
-                ).initCause(sortSpillException);
-          }
-          // sufficient acct space
-          kvfull = kvnext == kvstart;
-          final boolean kvsoftlimit = ((kvnext > kvend)
-              ? kvnext - kvend > softRecordLimit
-              : kvend - kvnext <= kvoffsets.length - softRecordLimit);
-          if (kvstart == kvend && kvsoftlimit) {
-            LOG.info("Spilling map output: record full = " + kvsoftlimit);
-            startSpill();
-          }
-          if (kvfull) {
-            try {
-              while (kvstart != kvend) {
-                reporter.progress();
-                spillDone.await();
+      if (--recordRemaining <= 0) {
+        // Possible for check to remain < zero, if soft limit remains
+        // in force but unsatisfiable because spill is in progress
+        spillLock.lock();
+        try {
+          boolean kvfull;
+          do {
+            if (sortSpillException != null) {
+              throw (IOException)new IOException("Spill failed"
+                  ).initCause(sortSpillException);
+            }
+            // sufficient acct space
+            kvfull = kvnext == kvstart;
+            final boolean kvsoftlimit = ((kvnext > kvend)
+                ? kvnext - kvend > softRecordLimit
+                : kvend - kvnext <= kvoffsets.length - softRecordLimit);
+            if (kvstart == kvend && kvsoftlimit) {
+              LOG.info("Spilling map output: record full = " + kvfull);
+              startSpill();
+            }
+            if (kvfull) {
+              try {
+                while (kvstart != kvend) {
+                  reporter.progress();
+                  spillDone.await();
+                }
+              } catch (InterruptedException e) {
+                throw (IOException)new IOException(
+                    "Collector interrupted while waiting for the writer"
+                    ).initCause(e);
               }
-            } catch (InterruptedException e) {
-              throw (IOException)new IOException(
-                  "Collector interrupted while waiting for the writer"
-                  ).initCause(e);
             }
-          }
-        } while (kvfull);
-      } finally {
-        spillLock.unlock();
+          } while (kvfull);
+          final int softOff = kvend + softRecordLimit;
+          recordRemaining = Math.min(
+              // out of acct space
+              (kvnext < kvstart
+                 ? kvstart - kvnext
+                 : kvoffsets.length - kvnext + kvstart),
+              // soft limit
+              (kvend < kvnext
+                 ? softOff - kvnext
+                 : kvnext + (softOff - kvoffsets.length)));
+        } finally {
+          spillLock.unlock();
+        }
       }
 
       try {
@@ -886,7 +923,7 @@
        * likely result in data loss or corruption.
        * @see #markRecord()
        */
-      protected synchronized void reset() throws IOException {
+      protected void reset() throws IOException {
         // spillLock unnecessary; If spill wraps, then
         // bufindex < bufstart < bufend so contention is impossible
         // a stale value for bufstart does not affect correctness, since
@@ -912,7 +949,7 @@
       private final byte[] scratch = new byte[1];
 
       @Override
-      public synchronized void write(int v)
+      public void write(int v)
           throws IOException {
         scratch[0] = (byte)v;
         write(scratch, 0, 1);
@@ -926,69 +963,86 @@
        *    deserialize into the collection buffer.
        */
       @Override
-      public synchronized void write(byte b[], int off, int len)
+      public void write(byte b[], int off, int len)
           throws IOException {
         boolean buffull = false;
         boolean wrap = false;
-        spillLock.lock();
-        try {
-          do {
-            if (sortSpillException != null) {
-              throw (IOException)new IOException("Spill failed"
-                  ).initCause(sortSpillException);
-            }
+        bufferRemaining -= len;
+        if (bufferRemaining <= 0) {
+          // writing these bytes could exhaust available buffer space
+          // check if spill or blocking is necessary
+          spillLock.lock();
+          try {
+            do {
+              if (sortSpillException != null) {
+                throw (IOException)new IOException("Spill failed"
+                    ).initCause(sortSpillException);
+              }
 
-            // sufficient buffer space?
-            if (bufstart <= bufend && bufend <= bufindex) {
-              buffull = bufindex + len > bufvoid;
-              wrap = (bufvoid - bufindex) + bufstart > len;
-            } else {
-              // bufindex <= bufstart <= bufend
-              // bufend <= bufindex <= bufstart
-              wrap = false;
-              buffull = bufindex + len > bufstart;
-            }
+              // sufficient buffer space?
+              if (bufstart <= bufend && bufend <= bufindex) {
+                buffull = bufindex + len > bufvoid;
+                wrap = (bufvoid - bufindex) + bufstart > len;
+              } else {
+                // bufindex <= bufstart <= bufend
+                // bufend <= bufindex <= bufstart
+                wrap = false;
+                buffull = bufindex + len > bufstart;
+              }
 
-            if (kvstart == kvend) {
-              // spill thread not running
-              if (kvend != kvindex) {
-                // we have records we can spill
-                final boolean bufsoftlimit = (bufindex > bufend)
-                  ? bufindex - bufend > softBufferLimit
-                  : bufend - bufindex < bufvoid - softBufferLimit;
-                if (bufsoftlimit || (buffull && !wrap)) {
-                  LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
-                  startSpill();
+              if (kvstart == kvend) {
+                // spill thread not running
+                if (kvend != kvindex) {
+                  // we have records we can spill
+                  final boolean bufsoftlimit = (bufindex > bufend)
+                    ? bufindex - bufend > softBufferLimit
+                    : bufend - bufindex < bufvoid - softBufferLimit;
+                  if (bufsoftlimit || (buffull && !wrap)) {
+                    LOG.info("Spilling map output: buffer full= " + (buffull && !wrap));
+                    startSpill();
+                  }
+                } else if (buffull && !wrap) {
+                  // We have no buffered records, and this record is too large
+                  // to write into kvbuffer. We must spill it directly from
+                  // collect
+                  final int size = ((bufend <= bufindex)
+                    ? bufindex - bufend
+                    : (bufvoid - bufend) + bufindex) + len;
+                  bufstart = bufend = bufindex = bufmark = 0;
+                  kvstart = kvend = kvindex = 0;
+                  bufvoid = kvbuffer.length;
+                  throw new MapBufferTooSmallException(size + " bytes");
                 }
-              } else if (buffull && !wrap) {
-                // We have no buffered records, and this record is too large
-                // to write into kvbuffer. We must spill it directly from
-                // collect
-                final int size = ((bufend <= bufindex)
-                  ? bufindex - bufend
-                  : (bufvoid - bufend) + bufindex) + len;
-                bufstart = bufend = bufindex = bufmark = 0;
-                kvstart = kvend = kvindex = 0;
-                bufvoid = kvbuffer.length;
-                throw new MapBufferTooSmallException(size + " bytes");
               }
-            }
 
-            if (buffull && !wrap) {
-              try {
-                while (kvstart != kvend) {
-                  reporter.progress();
-                  spillDone.await();
+              if (buffull && !wrap) {
+                try {
+                  while (kvstart != kvend) {
+                    reporter.progress();
+                    spillDone.await();
+                  }
+                } catch (InterruptedException e) {
+                    throw (IOException)new IOException(
+                        "Buffer interrupted while waiting for the writer"
+                        ).initCause(e);
                 }
-              } catch (InterruptedException e) {
-                  throw (IOException)new IOException(
-                      "Buffer interrupted while waiting for the writer"
-                      ).initCause(e);
               }
-            }
-          } while (buffull && !wrap);
-        } finally {
-          spillLock.unlock();
+            } while (buffull && !wrap);
+            final int softOff = bufend + softBufferLimit;
+            bufferRemaining = Math.min(
+                // out of buffer space
+                (bufindex < bufstart
+                   ? bufstart - bufindex
+                   : kvbuffer.length - bufindex + bufstart),
+                // soft limit
+                (bufend < bufindex
+                   ? softOff - bufindex
+                   : bufindex + (softOff - kvbuffer.length)));
+          } finally {
+            spillLock.unlock();
+          }
+        } else {
+          buffull = bufindex + len > bufvoid;
         }
         // here, we know that we have sufficient space to write
         if (buffull) {
@@ -1000,11 +1054,12 @@
         }
         System.arraycopy(b, off, kvbuffer, bufindex, len);
         bufindex += len;
+        bufferRemaining -= len;
       }
     }
 
-    public synchronized void flush() throws IOException, ClassNotFoundException,
-                                            InterruptedException {
+    public void flush() throws IOException, ClassNotFoundException,
+           InterruptedException {
       LOG.info("Starting flush of map output");
       spillLock.lock();
       try {
@@ -1084,7 +1139,7 @@
       }
     }
 
-    private synchronized void startSpill() {
+    private void startSpill() {
       LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
                "; bufvoid = " + bufvoid);
       LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
@@ -1385,6 +1440,9 @@
         return;
       }
       {
+        sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+        Merger.considerFinalMergeForProgress();
+        
         IndexRecord rec = new IndexRecord();
         final SpillRecord spillRec = new SpillRecord(partitions);
         for (int parts = 0; parts < partitions; parts++) {
@@ -1406,14 +1464,17 @@
             }
           }
 
+          int mergeFactor = job.getInt("io.sort.factor", 100);
+          // sort the segments only if there are intermediate merges
+          boolean sortSegments = segmentList.size() > mergeFactor;
           //merge
           @SuppressWarnings("unchecked")
           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
-                         keyClass, valClass,
-                         segmentList, job.getInt("io.sort.factor", 100),
+                         keyClass, valClass, codec,
+                         segmentList, mergeFactor,
                          new Path(mapId.toString()),
-                         job.getOutputKeyComparator(), reporter,
-                         null, spilledRecordsCounter);
+                         job.getOutputKeyComparator(), reporter, sortSegments,
+                         null, spilledRecordsCounter, sortPhase.phase());
 
           //write merged output to disk
           long segmentStart = finalOut.getPos();
@@ -1430,6 +1491,8 @@
           //close
           writer.close();
 
+          sortPhase.startNextPhase();
+          
           // record offsets
           rec.startOffset = segmentStart;
           rec.rawLength = writer.getRawLength();

Modified: hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTaskStatus.java?rev=785392&r1=785391&r2=785392&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/java/org/apache/hadoop/mapred/MapTaskStatus.java Tue Jun 16 20:54:24 2009
@@ -18,9 +18,16 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
 
 class MapTaskStatus extends TaskStatus {
 
+  private long mapFinishTime;
+  private long sortFinishTime;
+  
   public MapTaskStatus() {}
 
   public MapTaskStatus(TaskAttemptID taskid, float progress,
@@ -35,6 +42,19 @@
     return true;
   }
 
+  /**
+   * Sets finishTime. 
+   * @param finishTime finish time of task.
+   */
+  @Override
+  void setFinishTime(long finishTime) {
+    super.setFinishTime(finishTime);
+    if (mapFinishTime == 0) {
+      mapFinishTime = finishTime;
+    }
+    setSortFinishTime(finishTime);
+  }
+  
   @Override
   public long getShuffleFinishTime() {
     throw new UnsupportedOperationException("getShuffleFinishTime() not supported for MapTask");
@@ -46,12 +66,43 @@
   }
 
   @Override
+  public long getMapFinishTime() {
+    return mapFinishTime;
+  }
+  
+  @Override
+  void setMapFinishTime(long mapFinishTime) {
+    this.mapFinishTime = mapFinishTime;
+  }
+
+  @Override
   public long getSortFinishTime() {
-    throw new UnsupportedOperationException("getSortFinishTime() not supported for MapTask");
+    return sortFinishTime;
   }
 
   @Override
   void setSortFinishTime(long sortFinishTime) {
-    throw new UnsupportedOperationException("setSortFinishTime() not supported for MapTask");
+    this.sortFinishTime = sortFinishTime;
+  }
+  
+  @Override
+  synchronized void statusUpdate(TaskStatus status) {
+    super.statusUpdate(status);
+    
+    if (status.getMapFinishTime() != 0) {
+      this.mapFinishTime = status.getMapFinishTime();
+    }
+  }
+  
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    mapFinishTime = in.readLong();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeLong(mapFinishTime);
   }
 }



Mime
View raw message