hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1346214 [4/7] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/streaming/src/java/org/apache/hadoop/strea...
Date Tue, 05 Jun 2012 02:33:47 GMT
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskController.java
Tue Jun  5 02:33:44 2012
@@ -14,11 +14,13 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -26,16 +28,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * Controls initialization, finalization and clean up of tasks, and
@@ -47,402 +45,249 @@ import org.apache.hadoop.classification.
  * performing the actual actions.
  * 
  * <br/>
+ * 
+ * NOTE: This class is internal only class and not intended for users!!
  */
-@InterfaceAudience.Private
 public abstract class TaskController implements Configurable {
   
+  /**
+   * The constants for the signals.
+   */
+  public enum Signal {
+    NULL(0, "NULL"), QUIT(3, "SIGQUIT"), 
+    KILL(9, "SIGKILL"), TERM(15, "SIGTERM");
+    private final int value;
+    private final String str;
+    private Signal(int value, String str) {
+      this.str = str;
+      this.value = value;
+    }
+    public int getValue() {
+      return value;
+    }
+    @Override
+    public String toString() {
+      return str;
+    }
+  }
+
   private Configuration conf;
-  
+
   public static final Log LOG = LogFactory.getLog(TaskController.class);
   
+  //Name of the executable script that will contain the child
+  // JVM command line. See writeCommand for details.
+  protected static final String COMMAND_FILE = "taskjvm.sh";
+  
+  protected LocalDirAllocator allocator;
+
+  final public static FsPermission TASK_LAUNCH_SCRIPT_PERMISSION =
+  FsPermission.createImmutable((short) 0700); // rwx--------
+  
   public Configuration getConf() {
     return conf;
   }
 
-  // The list of directory paths specified in the variable Configs.LOCAL_DIR
-  // This is used to determine which among the list of directories is picked up
-  // for storing data for a particular task.
-  protected String[] mapredLocalDirs;
-
   public void setConf(Configuration conf) {
     this.conf = conf;
-    mapredLocalDirs = conf.getTrimmedStrings(MRConfig.LOCAL_DIR);
   }
 
   /**
-   * Sets up the permissions of the following directories on all the configured
-   * disks:
-   * <ul>
-   * <li>mapreduce.cluster.local.directories</li>
-   * <li>Hadoop log directories</li>
-   * </ul>
+   * Does initialization and setup.
+   * @param allocator the local dir allocator to use
    */
-  public void setup() throws IOException {
-    FileSystem localFs = FileSystem.getLocal(conf);
-
-    for (String localDir : this.mapredLocalDirs) {
-      // Set up the mapreduce.cluster.local.directories.
-      File mapredlocalDir = new File(localDir);
-      if (!mapredlocalDir.isDirectory() && !mapredlocalDir.mkdirs()) {
-        LOG.warn("Unable to create mapreduce.cluster.local.directory : "
-            + mapredlocalDir.getPath());
-      } else {
-        localFs.setPermission(new Path(mapredlocalDir.getCanonicalPath()),
-                              new FsPermission((short)0755));
-      }
-    }
-
-    // Set up the user log directory
-    File taskLog = TaskLog.getUserLogDir();
-    if (!taskLog.isDirectory() && !taskLog.mkdirs()) {
-      LOG.warn("Unable to create taskLog directory : " + taskLog.getPath());
-    } else {
-      localFs.setPermission(new Path(taskLog.getCanonicalPath()),
-                            new FsPermission((short)0755));
-    }
-    DiskChecker.checkDir(TaskLog.getUserLogDir());
-  }
-
+  public abstract void setup(LocalDirAllocator allocator) throws IOException;
+  
   /**
-   * Take task-controller specific actions to initialize job. This involves
-   * setting appropriate permissions to job-files so as to secure the files to
-   * be accessible only by the user's tasks.
-   * 
+   * Create all of the directories necessary for the job to start and download
+   * all of the job and private distributed cache files.
+   * Creates both the user directories and the job log directory.
+   * @param user the user name
+   * @param jobid the job
+   * @param credentials a filename containing the job secrets
+   * @param jobConf the path to the localized configuration file
+   * @param taskTracker the connection the task tracker
+   * @param ttAddr the tasktracker's RPC address
    * @throws IOException
    */
-  abstract void initializeJob(JobInitializationContext context) throws IOException;
-
+  public abstract void initializeJob(String user, String jobid, 
+                                     Path credentials, Path jobConf,
+                                     TaskUmbilicalProtocol taskTracker,
+                                     InetSocketAddress ttAddr) 
+  throws IOException, InterruptedException;
+  
   /**
-   * Take task-controller specific actions to initialize the distributed cache
-   * file. This involves setting appropriate permissions for these files so as
-   * to secure them to be accessible only their owners.
-   * 
-   * @param context
+   * Create all of the directories for the task and launches the child jvm.
+   * @param user the user name
+   * @param jobId the jobId in question
+   * @param attemptId the attempt id (cleanup attempts have .cleanup suffix)
+   * @param setup list of shell commands to execute before the jvm
+   * @param jvmArguments list of jvm arguments
+   * @param currentWorkDirectory the full path of the cwd for the task
+   * @param stdout the file to redirect stdout to
+   * @param stderr the file to redirect stderr to
+   * @return the exit code for the task
    * @throws IOException
    */
-  public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException;
-
-  /**
-   * Launch a task JVM
-   * 
-   * This method defines how a JVM will be launched to run a task. Each
-   * task-controller should also do an
-   * {@link #initializeTask(TaskControllerContext)} inside this method so as to
-   * initialize the task before launching it. This is for reasons of
-   * task-controller specific optimizations w.r.t combining initialization and
-   * launching of tasks.
-   * 
-   * @param context the context associated to the task
-   */
-  abstract void launchTaskJVM(TaskControllerContext context)
-                                      throws IOException;
-
+  public abstract
+  int launchTask(String user, 
+                 String jobId,
+                 String attemptId,
+                 List<String> setup,
+                 List<String> jvmArguments,
+                 File currentWorkDirectory,
+                 String stdout,
+                 String stderr) throws IOException;
+  
   /**
-   * Top level cleanup a task JVM method.
-   * <ol>
-   * <li>Sends a graceful termiante signal to task JVM to allow subprocesses
-   * to cleanup.</li>
-   * <li>Sends a forceful kill signal to task JVM, terminating all its
-   * sub-processes forcefully.</li>
-   * </ol>
-   *
-   * @param context the task for which kill signal has to be sent.
+   * Send a signal to a task pid as the user. Always signal the process group.
+   * An implementation may elect to signal the pid directly if the former is
+   * unavailable or fails.
+   * @param user the user name
+   * @param taskPid the pid of the task
+   * @param signal the id of the signal to send
+   * @return false if the process does not exist
+   * @throws IOException If the task controller failed to signal the process
+   * (group), but the process exists.
    */
-  final void destroyTaskJVM(TaskControllerContext context) {
-    // Send SIGTERM to try to ask for a polite exit.
-    terminateTask(context);
-
-    try {
-      Thread.sleep(context.sleeptimeBeforeSigkill);
-    } catch (InterruptedException e) {
-      LOG.warn("Sleep interrupted : " +
-          StringUtils.stringifyException(e));
-    }
-
-    killTask(context);
-  }
-
-  /** Perform initializing actions required before a task can run.
-    * 
-    * For instance, this method can be used to setup appropriate
-    * access permissions for files and directories that will be
-    * used by tasks. Tasks use the job cache, log, and distributed cache
-    * directories and files as part of their functioning. Typically,
-    * these files are shared between the daemon and the tasks
-    * themselves. So, a TaskController that is launching tasks
-    * as different users can implement this method to setup
-    * appropriate ownership and permissions for these directories
-    * and files.
-    */
-  abstract void initializeTask(TaskControllerContext context)
-      throws IOException;
-
-  static class TaskExecContext {
-    // task being executed
-    Task task;
-  }
+  public abstract boolean signalTask(String user, int taskPid, 
+                                  Signal signal) throws IOException;
+  
   /**
-   * Contains task information required for the task controller.  
+   * Delete the user's files under all of the task tracker root directories.
+   * @param user the user name
+   * @param subDir the path relative to base directories
+   * @param baseDirs the base directories (absolute paths)
+   * @throws IOException
    */
-  static class TaskControllerContext extends TaskExecContext {
-    ShellCommandExecutor shExec;     // the Shell executor executing the JVM for this task.
-
-    // Information used only when this context is used for launching new tasks.
-    JvmEnv env;     // the JVM environment for the task.
-
-    // Information used only when this context is used for destroying a task jvm.
-    String pid; // process handle of task JVM.
-    long sleeptimeBeforeSigkill; // waiting time before sending SIGKILL to task JVM after
sending SIGTERM
-  }
-
+  public abstract void deleteAsUser(String user, 
+                                    String subDir,
+                                    String... baseDirs) throws IOException;
+  
   /**
-   * Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the file/dir
+   * Delete the user's files under the userlogs directory.
+   * @param user the user to work as
+   * @param subDir the path under the userlogs directory.
+   * @throws IOException
    */
-  static abstract class TaskControllerPathDeletionContext 
-  extends PathDeletionContext {
-    TaskController taskController;
-    String user;
-
-    /**
-     * mapredLocalDir is the base dir under which to-be-deleted jobLocalDir, 
-     * taskWorkDir or taskAttemptDir exists. fullPath of jobLocalDir, 
-     * taskAttemptDir or taskWorkDir is built using mapredLocalDir, jobId, 
-     * taskId, etc.
-     */
-    Path mapredLocalDir;
-
-    public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir,
-                                             TaskController taskController,
-                                             String user) {
-      super(fs, null);
-      this.taskController = taskController;
-      this.mapredLocalDir = mapredLocalDir;
+  public abstract void deleteLogAsUser(String user, 
+                                       String subDir) throws IOException;
+  
+  static class DeletionContext extends CleanupQueue.PathDeletionContext {
+    private TaskController controller;
+    private boolean isLog;
+    private String user;
+    private String subDir;
+    private String[] baseDirs;
+    DeletionContext(TaskController controller, boolean isLog, String user, 
+                    String subDir, String[] baseDirs) {
+      super(null, null);
+      this.controller = controller;
+      this.isLog = isLog;
       this.user = user;
+      this.subDir = subDir;
+      this.baseDirs = baseDirs;
     }
-
+    
     @Override
-    protected String getPathForCleanup() {
-      if (fullPath == null) {
-        fullPath = buildPathForDeletion();
+    protected void deletePath() throws IOException {
+      if (isLog) {
+        controller.deleteLogAsUser(user, subDir);
+      } else {
+        controller.deleteAsUser(user, subDir, baseDirs);
       }
-      return fullPath;
     }
 
-    /**
-     * Return the component of the path under the {@link #mapredLocalDir} to be 
-     * cleaned up. Its the responsibility of the class that extends 
-     * {@link TaskControllerPathDeletionContext} to provide the correct 
-     * component. For example 
-     *  - For task related cleanups, either the task-work-dir or task-local-dir
-     *    might be returned depending on jvm reuse.
-     *  - For job related cleanup, simply the job-local-dir might be returned.
-     */
-    abstract protected String getPath();
-    
-    /**
-     * Builds the path of taskAttemptDir OR taskWorkDir based on
-     * mapredLocalDir, jobId, taskId, etc
-     */
-    String buildPathForDeletion() {
-      return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + getPath();
+    @Override
+    public String toString() {
+      return (isLog ? "log(" : "dir(") +
+        user + "," + subDir + ")";
     }
   }
-
-  /** Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the task-work-dir or
-   * task-local-dir depending on whether the jvm is reused or not.
-   */
-  static class TaskControllerTaskPathDeletionContext 
-  extends TaskControllerPathDeletionContext {
-    final Task task;
-    final boolean isWorkDir;
-    
-    public TaskControllerTaskPathDeletionContext(FileSystem fs, 
-        Path mapredLocalDir, Task task, boolean isWorkDir, 
-        TaskController taskController) {
-      super(fs, mapredLocalDir, taskController, task.getUser());
-      this.task = task;
-      this.isWorkDir = isWorkDir;
-    }
-    
-    /**
-     * Returns the taskWorkDir or taskLocalDir based on whether 
-     * {@link TaskControllerTaskPathDeletionContext} is configured to delete
-     * the workDir.
-     */
-    @Override
-    protected String getPath() {
-      String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask())
-        : TaskTracker.getLocalTaskDir(task.getUser(),
-          task.getJobID().toString(), task.getTaskID().toString(),
-          task.isTaskCleanupTask());
-      return subDir;
+  
+   /**
+    * Returns the local unix user that a given job will run as.
+    */
+   public String getRunAsUser(JobConf conf) {
+     return System.getProperty("user.name");
+   }
+
+  //Write the JVM command line to a file under the specified directory
+  // Note that the JVM will be launched using a setuid executable, and
+  // could potentially contain strings defined by a user. Hence, to
+  // prevent special character attacks, we write the command line to
+  // a file and execute it.
+  protected static String writeCommand(String cmdLine, FileSystem fs,
+      Path commandFile) throws IOException {
+    PrintWriter pw = null;
+    LOG.info("Writing commands to " + commandFile);
+    try {
+      pw = new PrintWriter(FileSystem.create(
+            fs, commandFile, TASK_LAUNCH_SCRIPT_PERMISSION));
+      pw.write(cmdLine);
+    } catch (IOException ioe) {
+      LOG.error("Caught IOException while writing JVM command line to file. ",
+          ioe);
+    } finally {
+      if (pw != null) {
+        pw.close();
+      }
     }
-
-    /**
-     * Makes the path(and its subdirectories recursively) fully deletable by
-     * setting proper permissions(770) by task-controller
-     */
-    @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath, if not inited already
-      if (fs.exists(new Path(fullPath))) {
-        taskController.enableTaskForCleanup(this);
+    return commandFile.makeQualified(fs).toUri().getPath();
+  }
+  
+  protected void logOutput(String output) {
+    String shExecOutput = output;
+    if (shExecOutput != null) {
+      for (String str : shExecOutput.split("\n")) {
+        LOG.info(str);
       }
     }
   }
 
-  /** Contains info related to the path of the file/dir to be deleted. This info
-   * is needed by task-controller to build the full path of the job-local-dir.
-   */
-  static class TaskControllerJobPathDeletionContext 
-  extends TaskControllerPathDeletionContext {
-    final JobID jobId;
-    
-    public TaskControllerJobPathDeletionContext(FileSystem fs, 
-        Path mapredLocalDir, JobID id, String user, 
+  public static final boolean isSetsidAvailable = isSetsidSupported();
+  private static boolean isSetsidSupported() {
+    ShellCommandExecutor shexec = null;
+    boolean setsidSupported = true;
+    try {
+      String[] args = {"setsid", "bash", "-c", "echo $$"};
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (IOException ioe) {
+      LOG.warn("setsid is not available on this machine. So not using it.");
+      setsidSupported = false;
+    } finally { // handle the exit code
+      LOG.info("setsid exited with exit code " + shexec.getExitCode());
+    }
+    return setsidSupported;
+  }
+
+  public static class DelayedProcessKiller extends Thread {
+    private final String user;
+    private final int pid;
+    private final long delay;
+    private final Signal signal;
+    private final TaskController taskController;
+    public DelayedProcessKiller(String user, int pid, long delay, Signal signal,
         TaskController taskController) {
-      super(fs, mapredLocalDir, taskController, user);
-      this.jobId = id;
-    }
-    
-    /**
-     * Returns the jobLocalDir of the job to be cleaned up.
-     */
-    @Override
-    protected String getPath() {
-      return TaskTracker.getLocalJobDir(user, jobId.toString());
+      this.user = user;
+      this.pid = pid;
+      this.delay = delay;
+      this.signal = signal;
+      this.taskController = taskController;
+      setName("Task killer for " + pid);
+      setDaemon(false);
     }
-    
-    /**
-     * Makes the path(and its sub-directories recursively) fully deletable by
-     * setting proper permissions(770) by task-controller
-     */
     @Override
-    protected void enablePathForCleanup() throws IOException {
-      getPathForCleanup();// allow init of fullPath, if not inited already
-      if (fs.exists(new Path(fullPath))) {
-        taskController.enableJobForCleanup(this);
+    public void run() {
+      try {
+        Thread.sleep(delay);
+        taskController.signalTask(user, pid, signal);
+      } catch (InterruptedException e) {
+        return;
+      } catch (IOException e) {
+        LOG.warn("Exception when killing task " + pid, e);
       }
     }
   }
-  
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class InitializationContext {
-    public File workDir;
-    public String user;
-    
-    public InitializationContext() {
-    }
-    
-    public InitializationContext(String user, File workDir) {
-      this.user = user;
-      this.workDir = workDir;
-    }
-  }
-  
-  /**
-   * This is used for initializing the private localized files in distributed
-   * cache. Initialization would involve changing permission, ownership and etc.
-   */
-  @InterfaceAudience.Private
-  @InterfaceStability.Unstable
-  public static class DistributedCacheFileContext extends InitializationContext {
-    // base directory under which file has been localized
-    Path localizedBaseDir;
-    // the unique string used to construct the localized path
-    String uniqueString;
-
-    public DistributedCacheFileContext(String user, File workDir,
-        Path localizedBaseDir, String uniqueString) {
-      super(user, workDir);
-      this.localizedBaseDir = localizedBaseDir;
-      this.uniqueString = uniqueString;
-    }
-
-    public Path getLocalizedUniqueDir() {
-      return new Path(localizedBaseDir, new Path(TaskTracker
-          .getPrivateDistributedCacheDir(user), uniqueString));
-    }
-  }
-
-  static class JobInitializationContext extends InitializationContext {
-    JobID jobid;
-  }
-  
-  static class DebugScriptContext extends TaskExecContext {
-    List<String> args;
-    File workDir;
-    File stdout;
-  }
-
-  /**
-   * Sends a graceful terminate signal to taskJVM and it sub-processes. 
-   *   
-   * @param context task context
-   */
-  abstract void terminateTask(TaskControllerContext context);
-  
-  /**
-   * Sends a KILL signal to forcefully terminate the taskJVM and its
-   * sub-processes.
-   * 
-   * @param context task context
-   */
-  abstract void killTask(TaskControllerContext context);
-
-
-  /**
-   * Sends a QUIT signal to direct the task JVM (and sub-processes) to
-   * dump their stack to stdout.
-   *
-   * @param context task context.
-   */
-  abstract void dumpTaskStack(TaskControllerContext context);
-
-  /**
-   * Initialize user on this TaskTracer in a TaskController specific manner.
-   * 
-   * @param context
-   * @throws IOException
-   */
-  public abstract void initializeUser(InitializationContext context)
-      throws IOException;
-  
-  /**
-   * Launch the task debug script
-   * 
-   * @param context
-   * @throws IOException
-   */
-  abstract void runDebugScript(DebugScriptContext context) 
-      throws IOException;
-  
-  /**
-   * Enable the task for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException;
-  
-  /**
-   * Enable the job for cleanup by changing permissions of the path
-   * @param context   path deletion context
-   * @throws IOException
-   */
-  abstract void enableJobForCleanup(PathDeletionContext context)
-    throws IOException;
-
-  /**
-   * Returns the local unix user that a given job will run as.
-   */
-  String getRunAsUser(JobConf conf) {
-    return System.getProperty("user.name");
-  }
 }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskLog.java
Tue Jun  5 02:33:44 2012
@@ -43,8 +43,8 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -168,7 +168,14 @@ public class TaskLog {
 
   static File getAttemptDir(TaskAttemptID taskid, boolean isCleanup) {
     String cleanupSuffix = isCleanup ? ".cleanup" : "";
-    return new File(getJobDir(taskid.getJobID()), taskid + cleanupSuffix);
+    return getAttemptDir(taskid.getJobID().toString(), 
+        taskid.toString() + cleanupSuffix);
+  }
+  
+  static File getAttemptDir(String jobid, String taskid) {
+    // taskid should be fully formed and it should have the optional 
+    // .cleanup suffix
+    return new File(getJobDir(jobid), taskid);
   }
   private static long prevOutLength;
   private static long prevErrLength;
@@ -487,21 +494,23 @@ public class TaskLog {
     
     String stdout = FileUtil.makeShellPath(stdoutFilename);
     String stderr = FileUtil.makeShellPath(stderrFilename);    
-    StringBuffer mergedCmd = new StringBuffer();
+    StringBuilder mergedCmd = new StringBuilder();
     
     // Export the pid of taskJvm to env variable JVM_PID.
     // Currently pid is not used on Windows
     if (!Shell.WINDOWS) {
-      mergedCmd.append(" export JVM_PID=`echo $$` ; ");
+      mergedCmd.append("export JVM_PID=`echo $$` ; ");
     }
 
-    if (setup != null && setup.size() > 0) {
-      mergedCmd.append(addCommand(setup, false));
-      mergedCmd.append(";");
+    if (setup != null) {
+      for (String s : setup) {
+        mergedCmd.append(s);
+        mergedCmd.append("\n");
+      }
     }
     if (tailLength > 0) {
       mergedCmd.append("(");
-    } else if(ProcessTree.isSetsidAvailable && useSetsid &&
+    } else if(TaskController.isSetsidAvailable && useSetsid &&
         !Shell.WINDOWS) {
       mergedCmd.append("exec setsid ");
     } else {
@@ -574,7 +583,7 @@ public class TaskLog {
    */
   public static String addCommand(List<String> cmd, boolean isExecutable) 
   throws IOException {
-    StringBuffer command = new StringBuffer();
+    StringBuilder command = new StringBuilder();
     for(String s: cmd) {
     	command.append('\'');
       if (isExecutable) {
@@ -623,11 +632,21 @@ public class TaskLog {
   /**
    * Get the user log directory for the job jobid.
    * 
-   * @param jobid
+   * @param jobid string representation of the jobid
+   * @return user log directory for the job
+   */
+  public static File getJobDir(String jobid) {
+    return new File(getUserLogDir(), jobid);
+  }
+  
+  /**
+   * Get the user log directory for the job jobid.
+   * 
+   * @param jobid the jobid object
    * @return user log directory for the job
    */
   public static File getJobDir(JobID jobid) {
-    return new File(getUserLogDir(), jobid.toString());
+    return getJobDir(jobid.toString());
   }
 
 } // TaskLog

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
Tue Jun  5 02:33:44 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.mapred.TaskTrac
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -46,14 +45,13 @@ class TaskMemoryManagerThread extends Th
   private static Log LOG = LogFactory.getLog(TaskMemoryManagerThread.class);
 
   private TaskTracker taskTracker;
-  private long monitoringInterval;
-
-  private long maxMemoryAllowedForAllTasks;
   private long maxRssMemoryAllowedForAllTasks;
 
-  private Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
-  private Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
-  private List<TaskAttemptID> tasksToBeRemoved;
+  private final long monitoringInterval;
+  private final long maxMemoryAllowedForAllTasks;
+  private final Map<TaskAttemptID, ProcessTreeInfo> processTreeInfoMap;
+  private final Map<TaskAttemptID, ProcessTreeInfo> tasksToBeAdded;
+  private final List<TaskAttemptID> tasksToBeRemoved;
 
   private static final String MEMORY_USAGE_STRING =
     "Memory usage of ProcessTree %s for task-id %s : Virutal %d bytes, " +
@@ -91,7 +89,8 @@ class TaskMemoryManagerThread extends Th
     this.monitoringInterval = monitoringInterval;
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit, long memLimitPhysical) {
+  public void addTask(TaskAttemptID tid, long memLimit,
+      long memLimitPhysical) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
       ProcessTreeInfo ptInfo =
@@ -204,19 +203,10 @@ class TaskMemoryManagerThread extends Th
             if (pId != null) {
               // pId will be null, either if the JVM is not spawned yet or if
               // the JVM is removed from jvmIdToPid
-              long sleeptimeBeforeSigkill =
-                  taskTracker
-                      .getJobConf()
-                      .getLong(
-                          TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
-                          ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
-
-              // create process tree object
-              ProcfsBasedProcessTree pt =
-                  new ProcfsBasedProcessTree(pId,
-                      ProcessTree.isSetsidAvailable, sleeptimeBeforeSigkill);
               LOG.debug("Tracking ProcessTree " + pId + " for the first time");
 
+              ProcfsBasedProcessTree pt = new ProcfsBasedProcessTree(pId,
+                  TaskController.isSetsidAvailable);
               ptInfo.setPid(pId);
               ptInfo.setProcessTree(pt);
             }
@@ -280,9 +270,13 @@ class TaskMemoryManagerThread extends Th
             // Virtual or physical memory over limit. Fail the task and remove
             // the corresponding process tree
             LOG.warn(msg);
+            // warn if not a leader
+            if (!pTree.checkPidPgrpidForMatch()) {
+              LOG.error("Killed task process with PID " + pId +
+                  " but it is not a process group leader.");
+            }
+            // kill the task
             taskTracker.cleanUpOverMemoryTask(tid, true, msg);
-            // Now destroy the ProcessTree, remove it from monitoring map.
-            pTree.destroy(true/*in the background*/);
             it.remove();
             LOG.info("Removed ProcessTree with root " + pId);
           } else {
@@ -294,8 +288,7 @@ class TaskMemoryManagerThread extends Th
         } catch (Exception e) {
           // Log the exception and proceed to the next task.
           LOG.warn("Uncaught exception in TaskMemoryManager "
-              + "while managing memory of " + tid + " : "
-              + StringUtils.stringifyException(e));
+              + "while managing memory of " + tid, e);
         }
       }
 
@@ -518,8 +511,6 @@ class TaskMemoryManagerThread extends Th
     taskTracker.cleanUpOverMemoryTask(tid, false, msg);
     // Now destroy the ProcessTree, remove it from monitoring map.
     ProcessTreeInfo ptInfo = processTreeInfoMap.get(tid);
-    ProcfsBasedProcessTree pTree = ptInfo.getProcessTree();
-    pTree.destroy(true/*in the background*/);
     processTreeInfoMap.remove(tid);
     LOG.info("Removed ProcessTree with root " + ptInfo.getPID());
   }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java
(original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/TaskRunner.java
Tue Jun  5 02:33:44 2012
@@ -20,16 +20,16 @@ package org.apache.hadoop.mapred;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Vector;
+import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,21 +67,30 @@ abstract class TaskRunner extends Thread
   private boolean exitCodeSet = false;
   
   private static String SYSTEM_PATH_SEPARATOR = System.getProperty("path.separator");
+  static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
 
   
   private TaskTracker tracker;
   private TaskDistributedCacheManager taskDistributedCacheManager;
+  private String[] localdirs;
+  final private static Random rand;
+  static {
+    rand = new Random();
+  }
 
   protected JobConf conf;
   JvmManager jvmManager;
 
   public TaskRunner(TaskTracker.TaskInProgress tip, TaskTracker tracker, 
-      JobConf conf) {
+                    JobConf conf, TaskTracker.RunningJob rjob
+                    ) throws IOException {
     this.tip = tip;
     this.t = tip.getTask();
     this.tracker = tracker;
     this.conf = conf;
     this.jvmManager = tracker.getJvmManagerInstance();
+    this.localdirs = conf.getLocalDirs();
+    taskDistributedCacheManager = rjob.distCacheMgr;
   }
 
   public Task getTask() { return t; }
@@ -154,27 +163,13 @@ abstract class TaskRunner extends Thread
       //before preparing the job localize 
       //all the archives
       TaskAttemptID taskid = t.getTaskID();
-      final LocalDirAllocator lDirAlloc = new LocalDirAllocator(MRConfig.LOCAL_DIR);
-      final File workDir = formWorkDir(lDirAlloc, taskid, t.isTaskCleanupTask(), conf);
-
-      // We don't create any symlinks yet, so presence/absence of workDir
-      // actually on the file system doesn't matter.
-      tip.getUGI().doAs(new PrivilegedExceptionAction<Void>() {
-        public Void run() throws IOException {
-          taskDistributedCacheManager = 
-            tracker.getTrackerDistributedCacheManager()
-            .newTaskDistributedCacheManager(conf);
-          taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker
-              .getPrivateDistributedCacheDir(conf.getUser()), 
-          TaskTracker.getPublicDistributedCacheDir());
-          return null;
-        }
-      });
-
-      // Set up the child task's configuration. After this call, no localization
-      // of files should happen in the TaskTracker's process space. Any changes to
-      // the conf object after this will NOT be reflected to the child.
-      setupChildTaskConfiguration(lDirAlloc);
+    //simply get the location of the workDir and pass it to the child. The
+    //child will do the actual dir creation
+    final File workDir =
+     new File(new Path(localdirs[rand.nextInt(localdirs.length)],
+         TaskTracker.getTaskWorkDir(t.getUser(), taskid.getJobID().toString(),
+         taskid.toString(),
+         t.isTaskCleanupTask())).toString());
 
       // Build classpath
       List<String> classPaths =
@@ -189,7 +184,7 @@ abstract class TaskRunner extends Thread
       tracker.addToMemoryManager(t.getTaskID(), t.isMapTask(), conf);
 
       // set memory limit using ulimit if feasible and necessary ...
-      List<String> setup = getVMSetupCmd();
+      String setup = getVMSetupCmd();
 
       // Set up the redirection of the task's stdout and stderr streams
       File[] logFiles = prepareLogFiles(taskid, t.isTaskCleanupTask());
@@ -202,7 +197,20 @@ abstract class TaskRunner extends Thread
       errorInfo = getVMEnvironment(errorInfo, workDir, conf, env,
                                    taskid, logSize);
 
-      launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);
+      // flatten the env as a set of export commands
+      List <String> setupCmds = new ArrayList<String>();
+      for(Entry<String, String> entry : env.entrySet()) {
+        StringBuffer sb = new StringBuffer();
+        sb.append("export ");
+        sb.append(entry.getKey());
+        sb.append("=\"");
+        sb.append(entry.getValue());
+        sb.append("\"");
+        setupCmds.add(sb.toString());
+      }
+      setupCmds.add(setup);
+
+      launchJvmAndWait(setupCmds, vargs, stdout, stderr, logSize, workDir);
       tracker.getTaskTrackerInstrumentation().reportTaskEnd(t.getTaskID());
       if (exitCodeSet) {
         if (!killed && exitCode != 0) {
@@ -231,14 +239,6 @@ abstract class TaskRunner extends Thread
         LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
       }
     } finally {
-      try{
-        if (taskDistributedCacheManager != null) {
-          taskDistributedCacheManager.release();
-        }
-      }catch(IOException ie){
-        LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
-      }
-
       // It is safe to call TaskTracker.TaskInProgress.reportTaskFinished with
       // *false* since the task has either
       // a) SUCCEEDED - which means commit has been done
@@ -247,11 +247,11 @@ abstract class TaskRunner extends Thread
     }
   }
 
-  void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,
-      File stderr, long logSize, File workDir, Map<String, String> env)
-      throws InterruptedException {
+  void launchJvmAndWait(List <String> setup, Vector<String> vargs, File stdout,
+      File stderr, long logSize, File workDir)
+      throws InterruptedException, IOException {
     jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,
-        stderr, logSize, workDir, env, conf));
+        stderr, logSize, workDir, conf));
     synchronized (lock) {
       while (!done) {
         lock.wait();
@@ -303,7 +303,7 @@ abstract class TaskRunner extends Thread
                 .isTaskCleanupTask()), conf);
 
     // write the child's task configuration file to the local disk
-    writeLocalTaskFile(localTaskFile.toString(), conf);
+    JobLocalizer.writeLocalJobFile(localTaskFile, conf);
 
     // Set the final job file in the task. The child needs to know the correct
     // path to job.xml. So set this path accordingly.
@@ -313,21 +313,21 @@ abstract class TaskRunner extends Thread
   /**
    * @return
    */
-  private List<String> getVMSetupCmd() {
-
-    int ulimit = getChildUlimit(conf);
+  private String getVMSetupCmd() {
+    final int ulimit = getChildUlimit(conf);
     if (ulimit <= 0) {
-      return null;
+      return "";
     }
-    List<String> setup = null;
-    String[] ulimitCmd = Shell.getUlimitMemoryCommand(ulimit);
-    if (ulimitCmd != null) {
-      setup = new ArrayList<String>();
-      for (String arg : ulimitCmd) {
-        setup.add(arg);
-      }
+    String setup[] = Shell.getUlimitMemoryCommand(ulimit);
+    StringBuilder command = new StringBuilder();
+    for (String str : setup) {
+      command.append('\'');
+      command.append(str);
+      command.append('\'');
+      command.append(" ");
     }
-    return setup;
+    command.append("\n");
+    return command.toString();
   }
 
   /**
@@ -423,7 +423,7 @@ abstract class TaskRunner extends Thread
       vargs.add(javaOptsSplit[i]);
     }
 
-    Path childTmpDir = createChildTmpDir(workDir, conf);
+    Path childTmpDir = createChildTmpDir(workDir, conf, false);
     vargs.add("-Djava.io.tmpdir=" + childTmpDir);
 
     // Add classpath.
@@ -473,7 +473,7 @@ abstract class TaskRunner extends Thread
    * @throws IOException
    */
   static Path createChildTmpDir(File workDir,
-      JobConf conf)
+      JobConf conf, boolean createDir)
       throws IOException {
 
     // add java.io.tmpdir given by mapreduce.task.tmp.dir
@@ -483,10 +483,13 @@ abstract class TaskRunner extends Thread
     // if temp directory path is not absolute, prepend it with workDir.
     if (!tmpDir.isAbsolute()) {
       tmpDir = new Path(workDir.toString(), tmp);
-
-      FileSystem localFs = FileSystem.getLocal(conf);
-      if (!localFs.mkdirs(tmpDir) && localFs.getFileStatus(tmpDir).isFile()) {
-        throw new IOException("Mkdirs failed to create " + tmpDir.toString());
+      if (createDir) {
+        FileSystem localFs = FileSystem.getLocal(conf);
+        if (!localFs.mkdirs(tmpDir) && 
+            !localFs.getFileStatus(tmpDir).isDir()) {
+          throw new IOException("Mkdirs failed to create " +
+              tmpDir.toString());
+        }
       }
     }
     return tmpDir;
@@ -533,6 +536,7 @@ abstract class TaskRunner extends Thread
       ldLibraryPath.append(oldLdLibraryPath);
     }
     env.put("LD_LIBRARY_PATH", ldLibraryPath.toString());
+    env.put(HADOOP_WORK_DIR, workDir.toString());
     
     // put jobTokenFile name into env
     String jobTokenFile = conf.get(TokenCache.JOB_TOKENS_FILENAME);
@@ -593,25 +597,6 @@ abstract class TaskRunner extends Thread
   }
 
   /**
-   * Write the task specific job-configuration file.
-   * 
-   * @param localFs
-   * @throws IOException
-   */
-  private static void writeLocalTaskFile(String jobFile, JobConf conf)
-      throws IOException {
-    Path localTaskFile = new Path(jobFile);
-    FileSystem localFs = FileSystem.getLocal(conf);
-    localFs.delete(localTaskFile, true);
-    OutputStream out = localFs.create(localTaskFile);
-    try {
-      conf.writeXml(out);
-    } finally {
-      out.close();
-    }
-  }
-
-  /**
    * Prepare the Configs.LOCAL_DIR for the child. The child is sand-boxed now.
    * Whenever it uses LocalDirAllocator from now on inside the child, it will
    * only see files inside the attempt-directory. This is done in the Child's
@@ -635,15 +620,11 @@ abstract class TaskRunner extends Thread
   }
 
   /** Creates the working directory pathname for a task attempt. */ 
-  static File formWorkDir(LocalDirAllocator lDirAlloc, 
-      TaskAttemptID task, boolean isCleanup, JobConf conf) 
+  static Path formWorkDir(LocalDirAllocator lDirAlloc, JobConf conf) 
       throws IOException {
     Path workDir =
-        lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
-            conf.getUser(), task.getJobID().toString(), task.toString(),
-            isCleanup), conf);
-
-    return new File(workDir.toString());
+        lDirAlloc.getLocalPathToRead(MRConstants.WORKDIR, conf);
+    return workDir;
   }
 
   private static void appendSystemClasspaths(List<String> classPaths) {
@@ -735,7 +716,7 @@ abstract class TaskRunner extends Thread
       }
     }
 
-    createChildTmpDir(workDir, conf);
+    createChildTmpDir(workDir, conf, true);
   }
 
   /**
@@ -759,8 +740,10 @@ abstract class TaskRunner extends Thread
 
   /**
    * Kill the child process
+   * @throws InterruptedException 
+   * @throws IOException 
    */
-  public void kill() {
+  public void kill() throws IOException, InterruptedException {
     killed = true;
     jvmManager.taskKilled(this);
     signalDone();



Mime
View raw message