hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From s..@apache.org
Subject svn commit: r1346214 [3/7] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/streaming/src/java/org/apache/hadoop/strea...
Date Tue, 05 Jun 2012 02:33:47 GMT
Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Jun  5 02:33:44 2012
@@ -1,20 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
 
 package org.apache.hadoop.mapred;
 
@@ -30,280 +30,264 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskController.DelayedProcessKiller;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.mapreduce.util.ProcessTree;
+import static org.apache.hadoop.mapred.TaskController.Signal;
 
 class JvmManager {
 
-  public static final Log LOG =
-    LogFactory.getLog(JvmManager.class);
+public static final Log LOG =
+  LogFactory.getLog(JvmManager.class);
 
-  private JvmManagerForType mapJvmManager;
+private JvmManagerForType mapJvmManager;
 
-  private JvmManagerForType reduceJvmManager;
-  
-  public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
-      File stdout,File stderr,long logSize, File workDir, 
-      Map<String,String> env, JobConf conf) {
-    return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf);
-  }
-  
-  public JvmManager(TaskTracker tracker) {
-    mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
-        true, tracker);
-    reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
-        false, tracker);
-  }
+private JvmManagerForType reduceJvmManager;
 
-  JvmManagerForType getJvmManagerForType(TaskType type) {
-    if (type.equals(TaskType.MAP)) {
-      return mapJvmManager;
-    } else if (type.equals(TaskType.REDUCE)) {
-      return reduceJvmManager;
-    }
-    return null;
-  }
-  
-  public void stop() {
-    mapJvmManager.stop();
-    reduceJvmManager.stop();
+public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
+    File stdout,File stderr,long logSize, File workDir, 
+    JobConf conf) {
+  return new JvmEnv(setup,vargs,stdout,stderr,workDir,conf);
+}
+
+public JvmManager(TaskTracker tracker) {
+  mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
+      true, tracker);
+  reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
+      false, tracker);
+}
+
+JvmManagerForType getJvmManagerForType(TaskType type) {
+  if (type.equals(TaskType.MAP)) {
+    return mapJvmManager;
+  } else if (type.equals(TaskType.REDUCE)) {
+    return reduceJvmManager;
   }
+  return null;
+}
 
-  public boolean isJvmKnown(JVMId jvmId) {
-    if (jvmId.isMapJVM()) {
-      return mapJvmManager.isJvmknown(jvmId);
-    } else {
-      return reduceJvmManager.isJvmknown(jvmId);
-    }
+public void stop() throws IOException, InterruptedException {
+  mapJvmManager.stop();
+  reduceJvmManager.stop();
+}
+
+public boolean isJvmKnown(JVMId jvmId) {
+  if (jvmId.isMapJVM()) {
+    return mapJvmManager.isJvmknown(jvmId);
+  } else {
+    return reduceJvmManager.isJvmknown(jvmId);
   }
+}
 
-  /*
-   * Saves pid of the given taskJvm
-   */
-  void setPidToJvm(JVMId jvmId, String pid) {
-    if (jvmId.isMapJVM()) {
-      mapJvmManager.setPidForJvm(jvmId, pid);
-    }
-    else {
-      reduceJvmManager.setPidForJvm(jvmId, pid);
-    }
+/*
+ * Saves pid of the given taskJvm
+ */
+void setPidToJvm(JVMId jvmId, String pid) {
+  if (jvmId.isMapJVM()) {
+    mapJvmManager.setPidForJvm(jvmId, pid);
   }
-  
-  /*
-   * Returns the pid of the task
-   */
-  String getPid(TaskRunner t) {
-    if (t != null && t.getTask() != null) {
-      if (t.getTask().isMapTask()) {
-        return mapJvmManager.getPidByRunningTask(t);
-      } else {
-        return reduceJvmManager.getPidByRunningTask(t);
-      }
-    }
-    return null;
+  else {
+    reduceJvmManager.setPidForJvm(jvmId, pid);
   }
-  
-  public void launchJvm(TaskRunner t, JvmEnv env) {
+}
+
+/*
+ * Returns the pid of the task
+ */
+String getPid(TaskRunner t) {
+  if (t != null && t.getTask() != null) {
     if (t.getTask().isMapTask()) {
-      mapJvmManager.reapJvm(t, env);
+      return mapJvmManager.getPidByRunningTask(t);
     } else {
-      reduceJvmManager.reapJvm(t, env);
+      return reduceJvmManager.getPidByRunningTask(t);
     }
   }
+  return null;
+}
 
-  public TaskInProgress getTaskForJvm(JVMId jvmId)
-      throws IOException {
-    if (jvmId.isMapJVM()) {
-      return mapJvmManager.getTaskForJvm(jvmId);
-    } else {
-      return reduceJvmManager.getTaskForJvm(jvmId);
-    }
-  }
-  public void taskFinished(TaskRunner tr) {
-    if (tr.getTask().isMapTask()) {
-      mapJvmManager.taskFinished(tr);
-    } else {
-      reduceJvmManager.taskFinished(tr);
-    }
+public void launchJvm(TaskRunner t, JvmEnv env)
+    throws IOException, InterruptedException {
+  if (t.getTask().isMapTask()) {
+    mapJvmManager.reapJvm(t, env);
+  } else {
+    reduceJvmManager.reapJvm(t, env);
   }
+}
 
-  public void taskKilled(TaskRunner tr) {
-    if (tr.getTask().isMapTask()) {
-      mapJvmManager.taskKilled(tr);
-    } else {
-      reduceJvmManager.taskKilled(tr);
-    }
+public TaskInProgress getTaskForJvm(JVMId jvmId)
+    throws IOException {
+  if (jvmId.isMapJVM()) {
+    return mapJvmManager.getTaskForJvm(jvmId);
+  } else {
+    return reduceJvmManager.getTaskForJvm(jvmId);
   }
-
-  void dumpStack(TaskRunner tr) {
-    if (tr.getTask().isMapTask()) {
-      mapJvmManager.dumpStack(tr);
-    } else {
-      reduceJvmManager.dumpStack(tr);
-    }
+}
+public void taskFinished(TaskRunner tr) {
+  if (tr.getTask().isMapTask()) {
+    mapJvmManager.taskFinished(tr);
+  } else {
+    reduceJvmManager.taskFinished(tr);
   }
+}
 
-  public void killJvm(JVMId jvmId) {
-    if (jvmId.isMap) {
-      mapJvmManager.killJvm(jvmId);
-    } else {
-      reduceJvmManager.killJvm(jvmId);
-    }
-  }  
-
-  /**
-   * Adds the task's work dir to the cleanup queue of taskTracker for
-   * asynchronous deletion of work dir.
-   * @param tracker taskTracker
-   * @param task    the task whose work dir needs to be deleted
-   * @throws IOException
-   */
-  static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException {
-    tracker.getCleanupThread().addToQueue(
-        TaskTracker.buildTaskControllerTaskPathDeletionContexts(
-          tracker.getLocalFileSystem(),
-          tracker.getLocalFiles(tracker.getJobConf(), ""),
-          task, true /* workDir */,
-          tracker.getTaskController()));
-  }
-
-  static class JvmManagerForType {
-    //Mapping from the JVM IDs to running Tasks
-    Map <JVMId,TaskRunner> jvmToRunningTask = 
-      new HashMap<JVMId, TaskRunner>();
-    //Mapping from the tasks to JVM IDs
-    Map <TaskRunner,JVMId> runningTaskToJvm = 
-      new HashMap<TaskRunner, JVMId>();
-    //Mapping from the JVM IDs to Reduce JVM processes
-    Map <JVMId, JvmRunner> jvmIdToRunner = 
-      new HashMap<JVMId, JvmRunner>();
-    
-    int maxJvms;
-    boolean isMap;
-    
-    TaskTracker tracker;
-    
-    Random rand = new Random(System.currentTimeMillis());
-
-    public JvmManagerForType(int maxJvms, boolean isMap, 
-        TaskTracker tracker) {
-      this.maxJvms = maxJvms;
-      this.isMap = isMap;
-      this.tracker = tracker;
-    }
-
-    synchronized public void setRunningTaskForJvm(JVMId jvmId, 
-        TaskRunner t) {
-      jvmToRunningTask.put(jvmId, t);
-      runningTaskToJvm.put(t,jvmId);
-      jvmIdToRunner.get(jvmId).setTaskRunner(t);
-    }
-    
-    synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
-        throws IOException {
-      if (jvmToRunningTask.containsKey(jvmId)) {
-        //Incase of JVM reuse, tasks are returned to previously launched
-        //JVM via this method. However when a new task is launched
-        //the task being returned has to be initialized.
-        TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
-        JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
-        Task task = taskRunner.getTaskInProgress().getTask();
-
-        // Initialize task dirs
-        TaskControllerContext context =
-            new TaskController.TaskControllerContext();
-        context.env = jvmRunner.env;
-        context.task = task;
-        // If we are returning the same task as which the JVM was launched
-        // we don't initialize task once again.
-        if (!jvmRunner.env.conf.get(JobContext.TASK_ATTEMPT_ID).equals(
-            task.getTaskID().toString())) {
-          try {
-            tracker.getTaskController().initializeTask(context);
-          } catch (IOException e) {
-            LOG.warn("Failed to initialize the new task "
-                + task.getTaskID().toString() + " to be given to JVM with id "
-                + jvmId);
-            throw e;
-          }
-        }
-
-        return taskRunner.getTaskInProgress();
-      }
-      return null;
-    }
+public void taskKilled(TaskRunner tr
+                       ) throws IOException, InterruptedException {
+  if (tr.getTask().isMapTask()) {
+    mapJvmManager.taskKilled(tr);
+  } else {
+    reduceJvmManager.taskKilled(tr);
+  }
+}
 
-    synchronized String getPidByRunningTask(TaskRunner t) {
-      JVMId id = runningTaskToJvm.get(t);
-      if (id != null) {
-        return jvmIdToRunner.get(id).getPid();
-      }
-      return null;
-    }
+public void killJvm(JVMId jvmId) throws IOException, InterruptedException {
+  if (jvmId.isMap) {
+    mapJvmManager.killJvm(jvmId);
+  } else {
+    reduceJvmManager.killJvm(jvmId);
+  }
+}  
 
-    synchronized void setPidForJvm(JVMId jvmId, String pid) {
-      JvmRunner runner = jvmIdToRunner.get(jvmId);
-      assert runner != null : "Task must have a runner to set a pid";
-      runner.setPid(pid);
-    }
-    
-    synchronized public boolean isJvmknown(JVMId jvmId) {
-      return jvmIdToRunner.containsKey(jvmId);
-    }
+/**
+ * Adds the task's work dir to the cleanup queue of taskTracker for
+ * asynchronous deletion of work dir.
+ * @param tracker taskTracker
+ * @param task    the task whose work dir needs to be deleted
+ */
+static void deleteWorkDir(TaskTracker tracker, Task task) {
+  String user = task.getUser();
+  String jobid = task.getJobID().toString();
+  String taskid = task.getTaskID().toString();
+  String workDir = TaskTracker.getTaskWorkDir(user, jobid, taskid, 
+                                              task.isTaskCleanupTask());
+  tracker.getCleanupThread().addToQueue(
+   new TaskController.DeletionContext(tracker.getTaskController(), false,
+                                      user, 
+                                      workDir, tracker.getLocalDirs()));
+                                         
+}
 
-    synchronized public void taskFinished(TaskRunner tr) {
-      JVMId jvmId = runningTaskToJvm.remove(tr);
-      if (jvmId != null) {
-        jvmToRunningTask.remove(jvmId);
-        JvmRunner jvmRunner;
-        if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
-          jvmRunner.taskRan();
-        }
-      }
+static class JvmManagerForType {
+  //Mapping from the JVM IDs to running Tasks
+  Map <JVMId,TaskRunner> jvmToRunningTask = 
+    new HashMap<JVMId, TaskRunner>();
+  //Mapping from the tasks to JVM IDs
+  Map <TaskRunner,JVMId> runningTaskToJvm = 
+    new HashMap<TaskRunner, JVMId>();
+  //Mapping from the JVM IDs to Reduce JVM processes
+  Map <JVMId, JvmRunner> jvmIdToRunner = 
+    new HashMap<JVMId, JvmRunner>();
+  //Mapping from the JVM IDs to process IDs
+  Map <JVMId, String> jvmIdToPid =
+    new HashMap<JVMId, String>();
+  
+  final int maxJvms;
+  final boolean isMap;
+  final TaskTracker tracker;
+  final long sleeptimeBeforeSigkill;
+  final Random rand = new Random();
+
+  static final String DELAY_BEFORE_KILL_KEY =
+    "mapred.tasktracker.tasks.sleeptime-before-sigkill";
+  // number of milliseconds to wait between TERM and KILL.
+  private static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 250;
+
+  public JvmManagerForType(int maxJvms, boolean isMap, 
+      TaskTracker tracker) {
+    this.maxJvms = maxJvms;
+    this.isMap = isMap;
+    this.tracker = tracker;
+    sleeptimeBeforeSigkill =
+      tracker.getJobConf().getLong(DELAY_BEFORE_KILL_KEY,
+                                   DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+  }
+
+  synchronized public void setRunningTaskForJvm(JVMId jvmId, 
+      TaskRunner t) {
+    jvmToRunningTask.put(jvmId, t);
+    runningTaskToJvm.put(t,jvmId);
+    jvmIdToRunner.get(jvmId).setBusy(true);
+  }
+  
+  synchronized public TaskInProgress getTaskForJvm(JVMId jvmId)
+      throws IOException {
+    final TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+    return null == taskRunner ? null : taskRunner.getTaskInProgress();
+    //if (jvmToRunningTask.containsKey(jvmId)) {
+    //  //Incase of JVM reuse, tasks are returned to previously launched
+    //  //JVM via this method. However when a new task is launched
+    //  //the task being returned has to be initialized.
+    //  TaskRunner taskRunner = jvmToRunningTask.get(jvmId);
+    //  // TODO retained for MAPREDUCE-1100
+    //  JvmRunner jvmRunner = jvmIdToRunner.get(jvmId);
+    //  Task task = taskRunner.getTaskInProgress().getTask();
+
+    //  return taskRunner.getTaskInProgress();
+    //}
+    //return null;
+  }
+
+  synchronized String getPidByRunningTask(TaskRunner t) {
+    JVMId id = runningTaskToJvm.get(t);
+    if (id != null) {
+      return jvmIdToPid.get(id);
     }
+    return null;
+  }
 
-    synchronized public void taskKilled(TaskRunner tr) {
-      JVMId jvmId = runningTaskToJvm.remove(tr);
-      if (jvmId != null) {
-        jvmToRunningTask.remove(jvmId);
-        killJvm(jvmId);
-      }
-    }
+  synchronized void setPidForJvm(JVMId jvmId, String pid) {
+    JvmRunner runner = jvmIdToRunner.get(jvmId);
+    assert runner != null : "Task must have a runner to set a pid";
+    jvmIdToPid.put(jvmId, pid);
+  }
+  
+  synchronized public boolean isJvmknown(JVMId jvmId) {
+    return jvmIdToRunner.containsKey(jvmId);
+  }
 
-    synchronized public void killJvm(JVMId jvmId) {
+  synchronized public void taskFinished(TaskRunner tr) {
+    JVMId jvmId = runningTaskToJvm.remove(tr);
+    if (jvmId != null) {
+      jvmToRunningTask.remove(jvmId);
       JvmRunner jvmRunner;
       if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
-        killJvmRunner(jvmRunner);
+        jvmRunner.taskRan();
       }
     }
-    
-    private synchronized void killJvmRunner(JvmRunner jvmRunner) {
-      jvmRunner.kill();
-      removeJvm(jvmRunner.jvmId);
-    }
+  }
 
-    void dumpStack(TaskRunner tr) {
-      JvmRunner jvmRunner = null;
-      synchronized (this) {
-        JVMId jvmId = runningTaskToJvm.get(tr);
-        if (null != jvmId) {
-          jvmRunner = jvmIdToRunner.get(jvmId);
-        }
-      }
+  synchronized public void taskKilled(TaskRunner tr
+                                      ) throws IOException,
+                                               InterruptedException {
+    JVMId jvmId = runningTaskToJvm.remove(tr);
+    if (jvmId != null) {
+      jvmToRunningTask.remove(jvmId);
+      killJvm(jvmId);
+    }
+  }
 
-      // Don't want to hold JvmManager lock while dumping stacks for one
-      // task.
-      if (null != jvmRunner) {
-        jvmRunner.dumpChildStacks();
-      }
+  synchronized public void killJvm(JVMId jvmId)
+      throws IOException, InterruptedException {
+    JvmRunner jvmRunner;
+    if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) {
+      killJvmRunner(jvmRunner);
     }
+  }
+  
+  private synchronized void killJvmRunner(JvmRunner jvmRunner)
+      throws IOException, InterruptedException {
+    jvmRunner.kill();
+    removeJvm(jvmRunner.jvmId);
+  }
 
-    synchronized public void stop() {
+
+    synchronized public void stop()
+        throws IOException, InterruptedException {
       //since the kill() method invoked later on would remove
       //an entry from the jvmIdToRunner map, we create a
       //copy of the values and iterate over it (if we don't
@@ -320,7 +304,7 @@ class JvmManager {
       jvmIdToRunner.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        TaskRunner t, JvmEnv env) {
+        TaskRunner t, JvmEnv env) throws IOException, InterruptedException {
       if (t.getTaskInProgress().wasKilled()) {
         //the task was killed in-flight
         //no need to do the rest of the operations
@@ -409,7 +393,7 @@ class JvmManager {
 
     private synchronized void spawnNewJvm(JobID jobId, JvmEnv env,  
         TaskRunner t) {
-      JvmRunner jvmRunner = new JvmRunner(env,jobId);
+      JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask());
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
       //spawn the JVM in a new thread. Note that there will be very little
       //extra overhead of launching the new thread for a new JVM since
@@ -443,83 +427,90 @@ class JvmManager {
       volatile int numTasksRan;
       final int numTasksToRun;
       JVMId jvmId;
-      private ShellCommandExecutor shexec; // shell terminal for running the task
-      //context used for starting JVM
-      private TaskControllerContext initalContext;
+      volatile boolean busy = true;
+      private Task firstTask;
       
-      public JvmRunner(JvmEnv env, JobID jobId) {
+      public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) {
         this.env = env;
         this.jvmId = new JVMId(jobId, isMap, rand.nextInt());
         this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm();
-
-        this.initalContext = new TaskControllerContext();
-        initalContext.sleeptimeBeforeSigkill = tracker.getJobConf()
-          .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL,
-                   ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
+        this.firstTask = firstTask;
         LOG.info("In JvmRunner constructed JVM ID: " + jvmId);
       }
+
+      @Override
       public void run() {
-        runChild(env);
+        try {
+          runChild(env);
+        } catch (InterruptedException ie) {
+          return;
+        } catch (IOException e) {
+          LOG.warn("Caught IOException in JVMRunner", e);
+        } catch (Throwable e) {
+          LOG.error("Caught Throwable in JVMRunner. Aborting TaskTracker.", e);
+          System.exit(1);
+        } finally {
+          // TODO MR-1100
+          //jvmFinished();
+        }
       }
 
-      public void runChild(JvmEnv env) {
+      public void runChild(JvmEnv env)
+          throws IOException, InterruptedException {
+        int exitCode = 0;
         try {
           env.vargs.add(Integer.toString(jvmId.getId()));
-          //Launch the task controller to run task JVM
-          initalContext.env = env;
-          tracker.getTaskController().launchTaskJVM(initalContext);
+          TaskRunner runner = jvmToRunningTask.get(jvmId);
+          if (runner != null) {
+            Task task = runner.getTask();
+            //Launch the task controller to run task JVM
+            String user = task.getUser();
+            TaskAttemptID taskAttemptId = task.getTaskID();
+            String taskAttemptIdStr = task.isTaskCleanupTask() ? 
+                (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) :
+                  taskAttemptId.toString(); 
+                exitCode = tracker.getTaskController().launchTask(user,
+                    jvmId.jobId.toString(), taskAttemptIdStr, env.setup,
+                    env.vargs, env.workDir, env.stdout.toString(),
+                    env.stderr.toString());
+          }
         } catch (IOException ioe) {
           // do nothing
           // error and output are appropriately redirected
         } finally { // handle the exit code
-          shexec = initalContext.shExec;
-          if (shexec == null) {
-            return;
-          }
-
+          // although the process has exited before we get here,
+          // make sure the entire process group has also been killed.
           kill();
 
-          int exitCode = shexec.getExitCode();
           updateOnJvmExit(jvmId, exitCode);
           LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode
               + ". Number of tasks it ran: " + numTasksRan);
-          try {
-            // In case of jvm-reuse,
-            //the task jvm cleans up the common workdir for every 
-            //task at the beginning of each task in the task JVM.
-            //For the last task, we do it here.
-            if (env.conf.getNumTasksToExecutePerJvm() != 1) {
-              deleteWorkDir(tracker, initalContext.task);
-            }
-          } catch (IOException ie){}
-        }
-      }
-
-      synchronized void setPid(String pid) {
-        assert initalContext != null;
-        initalContext.pid = pid;
-      }
-
-      synchronized String getPid() {
-        if (initalContext != null) {
-          return initalContext.pid;
-        } else {
-          return null;
+          deleteWorkDir(tracker, firstTask);
         }
       }
 
       /** 
-       * Kills the process. Also kills its subprocesses if the process(root of subtree
-       * of processes) is created using setsid.
+       * Kills the process. Also kills its subprocesses if the process(root of
+       * subtree of processes) is created using setsid.
        */
-      synchronized void kill() {
+      synchronized void kill() throws IOException, InterruptedException {
         if (!killed) {
           TaskController controller = tracker.getTaskController();
           // Check inital context before issuing a kill to prevent situations
           // where kill is issued before task is launched.
-          if (initalContext != null && initalContext.env != null) {
-            // Destroy the task jvm
-            controller.destroyTaskJVM(initalContext);
+          String pidStr = jvmIdToPid.get(jvmId);
+          if (pidStr != null) {
+            String user = env.conf.getUser();
+            int pid = Integer.parseInt(pidStr);
+            // start a thread that will kill the process dead
+            if (sleeptimeBeforeSigkill > 0) {
+              controller.signalTask(user, pid, Signal.QUIT);
+              controller.signalTask(user, pid, Signal.TERM);
+              new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill,
+                  Signal.KILL, tracker.getTaskController()).start();
+            } else {
+              controller.signalTask(user, pid, Signal.KILL);
+            }
           } else {
             LOG.info(String.format("JVM Not killed %s but just removed", jvmId
                 .toString()));
@@ -528,46 +519,19 @@ class JvmManager {
         }
       }
 
-      /** Send a signal to the JVM requesting that it dump a stack trace,
-       * and wait for a timeout interval to give this signal time to be
-       * processed.
-       */
-      void dumpChildStacks() {
-        if (!killed) {
-          TaskController controller = tracker.getTaskController();
-          // Check inital context before issuing a signal to prevent situations
-          // where signal is issued before task is launched.
-          if (initalContext != null && initalContext.env != null) {
-            // signal the task jvm
-            controller.dumpTaskStack(initalContext);
-
-            // We're going to kill the jvm with SIGKILL after this,
-            // so we should wait for a few seconds first to ensure that
-            // the SIGQUIT has time to be processed.
-            try {
-              Thread.sleep(initalContext.sleeptimeBeforeSigkill);
-            } catch (InterruptedException e) {
-              LOG.warn("Sleep interrupted : " +
-                  StringUtils.stringifyException(e));
-            }
-          }
-        }
-      }
-
-      public synchronized void taskRan() {
-        initalContext.task = null;
+      public void taskRan() {
+        busy = false;
         numTasksRan++;
       }
       
       public boolean ranAll() {
         return(numTasksRan == numTasksToRun);
       }
-      public synchronized void setTaskRunner(TaskRunner runner) {
-        initalContext.task = runner.getTask();
-        assert initalContext.task != null;
+      public void setBusy(boolean busy) {
+        this.busy = busy;
       }
       public synchronized boolean isBusy() {
-        return initalContext.task != null;
+        return busy;
       }
     }
   }  
@@ -577,19 +541,15 @@ class JvmManager {
     File stdout;
     File stderr;
     File workDir;
-    long logSize;
     JobConf conf;
-    Map<String, String> env;
 
-    public JvmEnv(List<String> setup, Vector<String> vargs, File stdout, 
-        File stderr, long logSize, File workDir, Map<String,String> env,
-        JobConf conf) {
+    public JvmEnv(List <String> setup, Vector<String> vargs, File stdout, 
+        File stderr, File workDir, JobConf conf) {
       this.setup = setup;
       this.vargs = vargs;
       this.stdout = stdout;
       this.stderr = stderr;
       this.workDir = workDir;
-      this.env = env;
       this.conf = conf;
     }
   }

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jun  5 02:33:44 2012
@@ -14,30 +14,27 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 package org.apache.hadoop.mapred;
 
-import java.io.BufferedWriter;
 import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.PrintWriter;
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import static org.apache.hadoop.mapred.TaskController.Signal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * A {@link TaskController} that runs the task JVMs as the user 
@@ -48,8 +45,8 @@ import org.apache.hadoop.util.Shell.Shel
  * JVM and killing it when needed, and also initializing and
  * finalizing the task environment. 
  * <p> The setuid executable is launched using the command line:</p>
- * <p>task-controller mapreduce.job.user.name command command-args, where</p>
- * <p>mapreduce.job.user.name is the name of the owner who submits the job</p>
+ * <p>task-controller user-name command command-args, where</p>
+ * <p>user-name is the name of the owner who submits the job</p>
  * <p>command is one of the cardinal value of the 
  * {@link LinuxTaskController.TaskControllerCommands} enumeration</p>
  * <p>command-args depends on the command being launched.</p>
@@ -62,52 +59,78 @@ class LinuxTaskController extends TaskCo
 
   private static final Log LOG = 
             LogFactory.getLog(LinuxTaskController.class);
-
-  // Name of the executable script that will contain the child
-  // JVM command line. See writeCommand for details.
-  private static final String COMMAND_FILE = "taskjvm.sh";
   
   // Path to the setuid executable.
-  private static String taskControllerExe;
+  private String taskControllerExe;
+  private static final String TASK_CONTROLLER_EXEC_KEY =
+    "mapreduce.tasktracker.task-controller.exe";
   
-  static {
-    // the task-controller is expected to be under the $HADOOP_HOME/bin
-    // directory.
-    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
-    taskControllerExe = 
-        new File(hadoopBin, "task-controller").getAbsolutePath();
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    taskControllerExe = getTaskControllerExecutablePath(conf);
   }
-  
+
   public LinuxTaskController() {
     super();
   }
-  
+
+  protected String getTaskControllerExecutablePath(Configuration conf) {
+    File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin");
+    String defaultTaskController =
+      new File(hadoopBin, "task-controller").getAbsolutePath();
+    return null == conf
+      ? defaultTaskController
+      : conf.get(TASK_CONTROLLER_EXEC_KEY, defaultTaskController);
+  }
+
   /**
    * List of commands that the setuid script will execute.
    */
-  enum TaskControllerCommands {
-    INITIALIZE_USER,
-    INITIALIZE_JOB,
-    INITIALIZE_DISTRIBUTEDCACHE_FILE,
-    LAUNCH_TASK_JVM,
-    INITIALIZE_TASK,
-    TERMINATE_TASK_JVM,
-    KILL_TASK_JVM,
-    RUN_DEBUG_SCRIPT,
-    SIGQUIT_TASK_JVM,
-    ENABLE_TASK_FOR_CLEANUP,
-    ENABLE_JOB_FOR_CLEANUP
+  enum Commands {
+    INITIALIZE_JOB(0),
+    LAUNCH_TASK_JVM(1),
+    SIGNAL_TASK(2),
+    DELETE_AS_USER(3),
+    DELETE_LOG_AS_USER(4);
+
+    private int value;
+    Commands(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
+  }
+
+  /**
+   * Result codes returned from the C task-controller.
+   * These must match the values in task-controller.h.
+   */
+  enum ResultCode {
+    OK(0),
+    INVALID_USER_NAME(2),
+    INVALID_TASK_PID(9),
+    INVALID_TASKCONTROLLER_PERMISSIONS(22),
+    INVALID_CONFIG_FILE(24);
+
+    private final int value;
+    ResultCode(int value) {
+      this.value = value;
+    }
+    int getValue() {
+      return value;
+    }
   }
 
   @Override
-  public void setup() throws IOException {
-    super.setup();
-    
+  public void setup(LocalDirAllocator allocator) throws IOException {
+
     // Check the permissions of the task-controller binary by running it plainly.
-    // If permissions are correct, it returns an error code 1, else it returns 
+    // If permissions are correct, it returns an error code 1, else it returns
     // 24 or something else if some other bugs are also present.
     String[] taskControllerCmd =
-        new String[] { getTaskControllerExecutablePath() };
+        new String[] { taskControllerExe };
     ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd);
     try {
       shExec.execute();
@@ -120,52 +143,96 @@ class LinuxTaskController extends TaskCo
           + "permissions/ownership with exit code " + exitCode, e);
       }
     }
+    this.allocator = allocator;
   }
+  
 
-  /**
-   * Launch a task JVM that will run as the owner of the job.
-   * 
-   * This method launches a task JVM by executing a setuid executable that will
-   * switch to the user and run the task. Also does initialization of the first
-   * task in the same setuid process launch.
-   */
   @Override
-  void launchTaskJVM(TaskController.TaskControllerContext context) 
-                                        throws IOException {
-    JvmEnv env = context.env;
-    // get the JVM command line.
-    String cmdLine = 
-      TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr,
-          env.logSize, true);
-
-    StringBuffer sb = new StringBuffer();
-    //export out all the environment variable before child command as
-    //the setuid/setgid binaries would not be getting, any environmental
-    //variables which begin with LD_*.
-    for(Entry<String, String> entry : env.env.entrySet()) {
-      sb.append("export ");
-      sb.append(entry.getKey());
-      sb.append("=");
-      sb.append(entry.getValue());
-      sb.append("\n");
-    }
-    sb.append(cmdLine);
-    // write the command to a file in the
-    // task specific cache directory
-    writeCommand(sb.toString(), getTaskCacheDirectory(context, 
-        context.env.workDir));
-    
-    // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, 
-        context.env.workDir);
-    ShellCommandExecutor shExec =  buildTaskControllerExecutor(
-                                    TaskControllerCommands.LAUNCH_TASK_JVM, 
-                                    env.conf.getUser(),
-                                    launchTaskJVMArgs, env.workDir, env.env);
-    context.shExec = shExec;
+  public void initializeJob(String user, String jobid, Path credentials,
+                            Path jobConf, TaskUmbilicalProtocol taskTracker,
+                            InetSocketAddress ttAddr
+                            ) throws IOException, InterruptedException {
+    List<String> command = new ArrayList<String>(
+      Arrays.asList(taskControllerExe, 
+                    user, 
+                    Integer.toString(Commands.INITIALIZE_JOB.getValue()),
+                    jobid,
+                    credentials.toUri().getPath().toString(),
+                    jobConf.toUri().getPath().toString()));
+    File jvm =                                  // use same jvm as parent
+      new File(new File(System.getProperty("java.home"), "bin"), "java");
+    command.add(jvm.toString());
+    command.add("-classpath");
+    command.add(System.getProperty("java.class.path"));
+    command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir());
+    command.add("-Dhadoop.root.logger=INFO,console");
+    command.add(JobLocalizer.class.getName());  // main of JobLocalizer
+    command.add(user);
+    command.add(jobid);
+    // add the task tracker's reporting address
+    command.add(ttAddr.getHostName());
+    command.add(Integer.toString(ttAddr.getPort()));
+    String[] commandArray = command.toArray(new String[0]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("initializeJob: " + Arrays.toString(commandArray));
+    }
     try {
       shExec.execute();
+      if (LOG.isDebugEnabled()) {
+        logOutput(shExec.getOutput());
+      }
+    } catch (ExitCodeException e) {
+      int exitCode = shExec.getExitCode();
+      logOutput(shExec.getOutput());
+      throw new IOException("Job initialization failed (" + exitCode + ")", e);
+    }
+  }
+
+  @Override
+  public int launchTask(String user, 
+                                  String jobId,
+                                  String attemptId,
+                                  List<String> setup,
+                                  List<String> jvmArguments,
+                                  File currentWorkDirectory,
+                                  String stdout,
+                                  String stderr) throws IOException {
+
+    ShellCommandExecutor shExec = null;
+    try {
+      FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw();
+      long logSize = 0; //TODO, Ref BUG:2854624
+      // get the JVM command line.
+      String cmdLine = 
+        TaskLog.buildCommandLine(setup, jvmArguments,
+            new File(stdout), new File(stderr), logSize, true);
+
+      // write the command to a file in the
+      // task specific cache directory
+      Path p = new Path(allocator.getLocalPathForWrite(
+          TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId),
+          getConf()), COMMAND_FILE);
+      String commandFile = writeCommand(cmdLine, rawFs, p); 
+
+      String[] command = 
+        new String[]{taskControllerExe, 
+          user,
+          Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()),
+          jobId,
+          attemptId,
+          currentWorkDirectory.toString(),
+          commandFile};
+      shExec = new ShellCommandExecutor(command);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("launchTask: " + Arrays.toString(command));
+      }
+      shExec.execute();
     } catch (Exception e) {
+      if (shExec == null) {
+        return -1;
+      }
       int exitCode = shExec.getExitCode();
       LOG.warn("Exit code from task is : " + exitCode);
       // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was
@@ -177,481 +244,78 @@ class LinuxTaskController extends TaskCo
         LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
         logOutput(shExec.getOutput());
       }
-      throw new IOException(e);
+      return exitCode;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's launchTaskJVM follows:");
+      LOG.debug("Output from LinuxTaskController's launchTask follows:");
       logOutput(shExec.getOutput());
     }
+    return 0;
   }
-  
-  /**
-   * Launch the debug script process that will run as the owner of the job.
-   * 
-   * This method launches the task debug script process by executing a setuid
-   * executable that will switch to the user and run the task. 
-   */
+
   @Override
-  void runDebugScript(DebugScriptContext context) throws IOException {
-    String debugOut = FileUtil.makeShellPath(context.stdout);
-    String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut);
-    writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir));
-    // Call the taskcontroller with the right parameters.
-    List<String> launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir);
-    runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), 
-        launchTaskJVMArgs, context.workDir, null);
-  }
-  /**
-   * Helper method that runs a LinuxTaskController command
-   * 
-   * @param taskControllerCommand
-   * @param user
-   * @param cmdArgs
-   * @param env
-   * @throws IOException
-   */
-  private void runCommand(TaskControllerCommands taskControllerCommand, 
-      String user, List<String> cmdArgs, File workDir, Map<String, String> env)
-      throws IOException {
-
-    ShellCommandExecutor shExec =
-        buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, 
-                                    workDir, env);
-    try {
-      shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : "
-          + shExec.getExitCode());
-      LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : "
-          + StringUtils.stringifyException(e));
-      LOG.info("Output from LinuxTaskController's " 
-               + taskControllerCommand.toString() + " follows:");
-      logOutput(shExec.getOutput());
-      throw new IOException(e);
+  public void deleteAsUser(String user, String subDir, String... baseDirs) 
+  throws IOException {
+    List<String> command = new ArrayList<String>(
+        Arrays.asList(
+                   taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.DELETE_AS_USER.getValue()),
+                   subDir));
+    for (String baseDir : baseDirs) {
+      command.add(baseDir);
     }
+    String[] commandArray = command.toArray(new String[0]);
+    ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray);
     if (LOG.isDebugEnabled()) {
-      LOG.info("Output from LinuxTaskController's " 
-               + taskControllerCommand.toString() + " follows:");
-      logOutput(shExec.getOutput());
-    }
-  }
-
-  /**
-   * Returns list of arguments to be passed while initializing a new task. See
-   * {@code buildTaskControllerExecutor(TaskControllerCommands, String, 
-   * List<String>, JvmEnv)} documentation.
-   * 
-   * @param context
-   * @return Argument to be used while launching Task VM
-   */
-  private List<String> buildInitializeTaskArgs(TaskExecContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    String taskId = context.task.getTaskID().toString();
-    String jobId = getJobId(context);
-    commandArgs.add(jobId);
-    if (!context.task.isTaskCleanupTask()) {
-      commandArgs.add(taskId);
-    } else {
-      commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX);
+      LOG.debug("deleteAsUser: " + Arrays.toString(commandArray));
     }
-    return commandArgs;
+    shExec.execute();
   }
 
   @Override
-  void initializeTask(TaskControllerContext context)
-      throws IOException {
+  public void deleteLogAsUser(String user, String subDir) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()),
+                   subDir};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " 
-                + TaskControllerCommands.INITIALIZE_TASK.toString()
-                + " for " + context.task.getTaskID().toString());
-    }
-    runCommand(TaskControllerCommands.INITIALIZE_TASK, 
-        context.env.conf.getUser(),
-        buildInitializeTaskArgs(context), context.env.workDir, context.env.env);
-  }
-
-  /**
-   * Builds the args to be passed to task-controller for enabling of task for
-   * cleanup. Last arg in this List is either $attemptId or $attemptId/work
-   */
-  private List<String> buildTaskCleanupArgs(
-      TaskControllerTaskPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.task.getJobID().toString());
-
-    String workDir = "";
-    if (context.isWorkDir) {
-      workDir = "/work";
-    }
-    if (context.task.isTaskCleanupTask()) {
-      commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX
-                      + workDir);
-    } else {
-      commandArgs.add(context.task.getTaskID() + workDir);
+      LOG.debug("deleteLogAsUser: " + Arrays.toString(command));
     }
-
-    return commandArgs;
+    shExec.execute();
   }
 
-  /**
-   * Builds the args to be passed to task-controller for enabling of job for
-   * cleanup. Last arg in this List is $jobid.
-   */
-  private List<String> buildJobCleanupArgs(
-      TaskControllerJobPathDeletionContext context) {
-    List<String> commandArgs = new ArrayList<String>(2);
-    commandArgs.add(context.mapredLocalDir.toUri().getPath());
-    commandArgs.add(context.jobId.toString());
-
-    return commandArgs;
-  }
-  
-  /**
-   * Enables the task for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
-  @Override
-  void enableTaskForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (context instanceof TaskControllerTaskPathDeletionContext) {
-      TaskControllerTaskPathDeletionContext tContext =
-        (TaskControllerTaskPathDeletionContext) context;
-      enablePathForCleanup(tContext, 
-                           TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP,
-                           buildTaskCleanupArgs(tContext));
-    }
-    else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-          + "TaskControllerTaskPathDeletionContext.");
-    }
-  }
-
-  /**
-   * Enables the job for cleanup by changing permissions of the specified path
-   * in the local filesystem
-   */
   @Override
-  void enableJobForCleanup(PathDeletionContext context)
-      throws IOException {
-    if (context instanceof TaskControllerJobPathDeletionContext) {
-      TaskControllerJobPathDeletionContext tContext =
-        (TaskControllerJobPathDeletionContext) context;
-      enablePathForCleanup(tContext, 
-                           TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP,
-                           buildJobCleanupArgs(tContext));
-    } else {
-      throw new IllegalArgumentException("PathDeletionContext provided is not "
-                  + "TaskControllerJobPathDeletionContext.");
-    }
-  }
-  
-  /**
-   * Enable a path for cleanup
-   * @param c {@link TaskControllerPathDeletionContext} for the path to be 
-   *          cleaned up
-   * @param command {@link TaskControllerCommands} for task/job cleanup
-   * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable 
-   *                    path cleanup
-   */
-  private void enablePathForCleanup(TaskControllerPathDeletionContext c,
-                                    TaskControllerCommands command,
-                                    List<String> cleanupArgs) {
+  public boolean signalTask(String user, int taskPid, 
+                         Signal signal) throws IOException {
+    String[] command = 
+      new String[]{taskControllerExe, 
+                   user,
+                   Integer.toString(Commands.SIGNAL_TASK.getValue()),
+                   Integer.toString(taskPid),
+                   Integer.toString(signal.getValue())};
+    ShellCommandExecutor shExec = new ShellCommandExecutor(command);
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to do " + command.toString() + " for " + c.fullPath);
-    }
-
-    if ( c.user != null && c.fs instanceof LocalFileSystem) {
-      try {
-        runCommand(command, c.user, cleanupArgs, null, null);
-      } catch(IOException e) {
-        LOG.warn("Unable to change permissions for " + c.fullPath);
-      }
-    }
-    else {
-      throw new IllegalArgumentException("Either user is null or the " 
-                  + "file system is not local file system.");
-    }
-  }
-
-  private void logOutput(String output) {
-    String shExecOutput = output;
-    if (shExecOutput != null) {
-      for (String str : shExecOutput.split("\n")) {
-        LOG.info(str);
-      }
+      LOG.debug("signalTask: " + Arrays.toString(command));
     }
-  }
-
-  private String getJobId(TaskExecContext context) {
-    String taskId = context.task.getTaskID().toString();
-    TaskAttemptID tId = TaskAttemptID.forName(taskId);
-    String jobId = tId.getJobID().toString();
-    return jobId;
-  }
-
-  /**
-   * Returns list of arguments to be passed while launching task VM.
-   * See {@code buildTaskControllerExecutor(TaskControllerCommands, 
-   * String, List<String>, JvmEnv)} documentation.
-   * @param context
-   * @return Argument to be used while launching Task VM
-   */
-  private List<String> buildLaunchTaskArgs(TaskExecContext context, 
-      File workDir) {
-    List<String> commandArgs = new ArrayList<String>(3);
-    LOG.debug("getting the task directory as: " 
-        + getTaskCacheDirectory(context, workDir));
-    LOG.debug("getting the tt_root as " +getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context, workDir)), 
-        context) );
-    commandArgs.add(getDirectoryChosenForTask(
-        new File(getTaskCacheDirectory(context, workDir)), 
-        context));
-    commandArgs.addAll(buildInitializeTaskArgs(context));
-    return commandArgs;
-  }
-
-  // Get the directory from the list of directories configured
-  // in Configs.LOCAL_DIR chosen for storing data pertaining to
-  // this task.
-  private String getDirectoryChosenForTask(File directory,
-      TaskExecContext context) {
-    String jobId = getJobId(context);
-    String taskId = context.task.getTaskID().toString();
-    for (String dir : mapredLocalDirs) {
-      File mapredDir = new File(dir);
-      File taskDir =
-          new File(mapredDir, TaskTracker.getTaskWorkDir(context.task
-              .getUser(), jobId, taskId, context.task.isTaskCleanupTask()))
-              .getParentFile();
-      if (directory.equals(taskDir)) {
-        return dir;
-      }
-    }
-
-    LOG.error("Couldn't parse task cache directory correctly");
-    throw new IllegalArgumentException("invalid task cache directory "
-        + directory.getAbsolutePath());
-  }
-
-  /**
-   * Builds the command line for launching/terminating/killing task JVM.
-   * Following is the format for launching/terminating/killing task JVM
-   * <br/>
-   * For launching following is command line argument:
-   * <br/>
-   * {@code mapreduce.job.user.name command tt-root job_id task_id} 
-   * <br/>
-   * For terminating/killing task jvm.
-   * {@code mapreduce.job.user.name command tt-root task-pid}
-   * 
-   * @param command command to be executed.
-   * @param userName mapreduce.job.user.name
-   * @param cmdArgs list of extra arguments
-   * @param workDir working directory for the task-controller
-   * @param env JVM environment variables.
-   * @return {@link ShellCommandExecutor}
-   * @throws IOException
-   */
-  private ShellCommandExecutor buildTaskControllerExecutor(
-      TaskControllerCommands command, String userName, List<String> cmdArgs,
-      File workDir, Map<String, String> env)
-      throws IOException {
-    String[] taskControllerCmd = new String[3 + cmdArgs.size()];
-    taskControllerCmd[0] = getTaskControllerExecutablePath();
-    taskControllerCmd[1] = userName;
-    taskControllerCmd[2] = String.valueOf(command.ordinal());
-    int i = 3;
-    for (String cmdArg : cmdArgs) {
-      taskControllerCmd[i++] = cmdArg;
-    }
-    if (LOG.isDebugEnabled()) {
-      for (String cmd : taskControllerCmd) {
-        LOG.debug("taskctrl command = " + cmd);
-      }
-    }
-    ShellCommandExecutor shExec = null;
-    if(workDir != null && workDir.exists()) {
-      shExec = new ShellCommandExecutor(taskControllerCmd,
-          workDir, env);
-    } else {
-      shExec = new ShellCommandExecutor(taskControllerCmd);
-    }
-    
-    return shExec;
-  }
-  
-  // Return the task specific directory under the cache.
-  private String getTaskCacheDirectory(TaskExecContext context, 
-      File workDir) {
-    // In the case of JVM reuse, the task specific directory
-    // is different from what is set with respect with
-    // env.workDir. Hence building this from the taskId everytime.
-    String taskId = context.task.getTaskID().toString();
-    File cacheDirForJob = workDir.getParentFile().getParentFile();
-    if(context.task.isTaskCleanupTask()) {
-      taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX;
-    }
-    return new File(cacheDirForJob, taskId).getAbsolutePath(); 
-  }
-  
-  // Write the JVM command line to a file under the specified directory
-  // Note that the JVM will be launched using a setuid executable, and
-  // could potentially contain strings defined by a user. Hence, to
-  // prevent special character attacks, we write the command line to
-  // a file and execute it.
-  private void writeCommand(String cmdLine, 
-                                      String directory) throws IOException {
-    
-    PrintWriter pw = null;
-    String commandFile = directory + File.separator + COMMAND_FILE;
-    LOG.info("Writing commands to " + commandFile);
-    LOG.info("--------Commands Begin--------");
-    LOG.info(cmdLine);
-    LOG.info("--------Commands End--------");
-    try {
-      FileWriter fw = new FileWriter(commandFile);
-      BufferedWriter bw = new BufferedWriter(fw);
-      pw = new PrintWriter(bw);
-      pw.write(cmdLine);
-    } catch (IOException ioe) {
-      LOG.error("Caught IOException while writing JVM command line to file. "
-                + ioe.getMessage());
-    } finally {
-      if (pw != null) {
-        pw.close();
-      }
-      // set execute permissions for all on the file.
-      File f = new File(commandFile);
-      if (f.exists()) {
-        f.setReadable(true, false);
-        f.setExecutable(true, false);
-      }
-    }
-  }
-
-  private List<String> buildInitializeJobCommandArgs(
-      JobInitializationContext context) {
-    List<String> initJobCmdArgs = new ArrayList<String>();
-    initJobCmdArgs.add(context.jobid.toString());
-    return initJobCmdArgs;
-  }
-
-  @Override
-  void initializeJob(JobInitializationContext context)
-      throws IOException {
-    LOG.debug("Going to initialize job " + context.jobid.toString()
-        + " on the TT");
-    runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user,
-        buildInitializeJobCommandArgs(context), context.workDir, null);
-  }
-
-  @Override
-  public void initializeDistributedCacheFile(DistributedCacheFileContext context)
-      throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Going to initialize distributed cache for " + context.user
-          + " with localizedBaseDir " + context.localizedBaseDir + 
-          " and uniqueString " + context.uniqueString);
-    }
-    List<String> args = new ArrayList<String>();
-    // Here, uniqueString might start with '-'. Adding -- in front of the 
-    // arguments indicates that they are non-option parameters.
-    args.add("--");
-    args.add(context.localizedBaseDir.toString());
-    args.add(context.uniqueString);
-    runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, 
-        context.user, args, context.workDir, null);
-  }
-
-  @Override
-  public void initializeUser(InitializationContext context)
-      throws IOException {
-    LOG.debug("Going to initialize user directories for " + context.user
-        + " on the TT");
-    runCommand(TaskControllerCommands.INITIALIZE_USER, context.user,
-        new ArrayList<String>(), context.workDir, null);
-  }
-
-  /**
-   * API which builds the command line to be pass to LinuxTaskController
-   * binary to terminate/kill the task. See 
-   * {@code buildTaskControllerExecutor(TaskControllerCommands, 
-   * String, List<String>, JvmEnv)} documentation.
-   * 
-   * 
-   * @param context context of task which has to be passed kill signal.
-   * 
-   */
-  private List<String> buildKillTaskCommandArgs(TaskControllerContext 
-      context){
-    List<String> killTaskJVMArgs = new ArrayList<String>();
-    killTaskJVMArgs.add(context.pid);
-    return killTaskJVMArgs;
-  }
-  
-  /**
-   * Convenience method used to sending appropriate signal to the task
-   * VM
-   * @param context
-   * @param command
-   * @throws IOException
-   */
-  protected void signalTask(TaskControllerContext context,
-      TaskControllerCommands command) throws IOException{
-    if(context.task == null) {
-      LOG.info("Context task is null; not signaling the JVM");
-      return;
-    }
-    ShellCommandExecutor shExec = buildTaskControllerExecutor(
-        command, context.env.conf.getUser(), 
-        buildKillTaskCommandArgs(context), context.env.workDir,
-        context.env.env);
     try {
       shExec.execute();
-    } catch (Exception e) {
-      LOG.warn("Output from task-contoller is : " + shExec.getOutput());
-      throw new IOException(e);
-    }
-  }
-  
-  @Override
-  void terminateTask(TaskControllerContext context) {
-    try {
-      signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending kill to the Task VM " + 
-          StringUtils.stringifyException(e));
-    }
-  }
-  
-  @Override
-  void killTask(TaskControllerContext context) {
-    try {
-      signalTask(context, TaskControllerCommands.KILL_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending destroy to the Task VM " + 
-          StringUtils.stringifyException(e));
-    }
-  }
-
-  @Override
-  void dumpTaskStack(TaskControllerContext context) {
-    try {
-      signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM);
-    } catch (Exception e) {
-      LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " +
-          StringUtils.stringifyException(e));
+    } catch (ExitCodeException e) {
+      int ret_code = shExec.getExitCode();
+      if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) {
+        return false;
+      }
+      logOutput(shExec.getOutput());
+      throw new IOException("Problem signalling task " + taskPid + " with " +
+                            signal + "; exit = " + ret_code);
     }
-  }
-
-  protected String getTaskControllerExecutablePath() {
-    return taskControllerExe;
+    return true;
   }
 
   @Override
-  String getRunAsUser(JobConf conf) {
+  public String getRunAsUser(JobConf conf) {
     return conf.getUser();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jun  5 02:33:44 2012
@@ -38,7 +38,6 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
@@ -79,6 +78,7 @@ public class LocalJobRunner implements C
   private AtomicInteger map_tasks = new AtomicInteger(0);
   private int reduce_tasks = 0;
   final Random rand = new Random();
+  private final TaskController taskController = new DefaultTaskController();
   
   private JobTrackerInstrumentation myMetrics = null;
 
@@ -116,7 +116,7 @@ public class LocalJobRunner implements C
     private FileSystem localFs;
     boolean killed = false;
     
-    private TrackerDistributedCacheManager trackerDistributerdCacheManager;
+    private TrackerDistributedCacheManager trackerDistributedCacheManager;
     private TaskDistributedCacheManager taskDistributedCacheManager;
 
     public long getProtocolVersion(String protocol, long clientVersion) {
@@ -134,14 +134,12 @@ public class LocalJobRunner implements C
 
       // Manage the distributed cache.  If there are files to be copied,
       // this will trigger localFile to be re-written again.
-      this.trackerDistributerdCacheManager =
-          new TrackerDistributedCacheManager(conf, new DefaultTaskController());
+      this.trackerDistributedCacheManager =
+        new TrackerDistributedCacheManager(conf);
       this.taskDistributedCacheManager = 
-          trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf);
-      taskDistributedCacheManager.setup(
-          new LocalDirAllocator(MRConfig.LOCAL_DIR), 
-          new File(systemJobDir.toString()),
-          "archive", "archive");
+        trackerDistributedCacheManager.newTaskDistributedCacheManager(
+            jobid, conf);
+      taskDistributedCacheManager.setupCache(conf, "archive", "archive");
       
       if (DistributedCache.getSymlink(conf)) {
         // This is not supported largely because, 
@@ -458,7 +456,7 @@ public class LocalJobRunner implements C
           localFs.delete(localJobFile, true);              // delete local copy
           // Cleanup distributed cache
           taskDistributedCacheManager.release();
-          trackerDistributerdCacheManager.purgeCache();
+          trackerDistributedCacheManager.purgeCache();
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
         }
@@ -532,6 +530,14 @@ public class LocalJobRunner implements C
     public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
     }
+
+    @Override
+    public void updatePrivateDistributedCacheSizes(
+                                                   org.apache.hadoop.mapreduce.JobID jobId,
+                                                   long[] sizes)
+                                                                throws IOException {
+      trackerDistributedCacheManager.setArchiveSizes(jobId, sizes);
+    }
     
     public boolean canCommit(TaskAttemptID taskid) 
     throws IOException {
@@ -578,6 +584,7 @@ public class LocalJobRunner implements C
     this.fs = FileSystem.getLocal(conf);
     this.conf = conf;
     myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf));
+    taskController.setConf(conf);
   }
 
   // JobSubmissionProtocol methods

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jun  5 02:33:44 2012
@@ -129,8 +129,10 @@ class MapTask extends Task {
   
   @Override
   public TaskRunner createRunner(TaskTracker tracker, 
-      TaskTracker.TaskInProgress tip) {
-    return new MapTaskRunner(tip, tracker, this.conf);
+                                 TaskTracker.TaskInProgress tip,
+                                 TaskTracker.RunningJob rjob
+                                 ) throws IOException {
+    return new MapTaskRunner(tip, tracker, this.conf, rjob);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Tue Jun  5 02:33:44 2012
@@ -17,14 +17,17 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.log4j.Level;
 
 /** Runs a map task. */
 class MapTaskRunner extends TaskRunner {
   
-  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) {
-    super(task, tracker, conf);
+  public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf,
+      TaskTracker.RunningJob rjob) throws IOException {
+    super(task, tracker, conf, rjob);
   }
   
   @Override

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jun  5 02:33:44 2012
@@ -141,9 +141,10 @@ public class ReduceTask extends Task {
   }
 
   @Override
-  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip) 
-  throws IOException {
-    return new ReduceTaskRunner(tip, tracker, this.conf);
+  public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip,
+                                 TaskTracker.RunningJob rjob
+                                 ) throws IOException {
+    return new ReduceTaskRunner(tip, tracker, this.conf, rjob);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Jun  5 02:33:44 2012
@@ -26,9 +26,10 @@ import org.apache.log4j.Level;
 class ReduceTaskRunner extends TaskRunner {
 
   public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, 
-                          JobConf conf) throws IOException {
+                          JobConf conf, TaskTracker.RunningJob rjob
+                          ) throws IOException {
     
-    super(task, tracker, conf);
+    super(task, tracker, conf, rjob);
   }
 
   public void close() throws IOException {

Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1346214&r1=1346213&r2=1346214&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Tue Jun  5 02:33:44 2012
@@ -468,7 +468,9 @@ abstract public class Task implements Wr
   /** Return an approprate thread runner for this task. 
    * @param tip TODO*/
   public abstract TaskRunner createRunner(TaskTracker tracker, 
-      TaskTracker.TaskInProgress tip) throws IOException;
+                                          TaskTracker.TaskInProgress tip, 
+                                          TaskTracker.RunningJob rjob
+                                          ) throws IOException;
 
   /** The number of milliseconds between progress reports. */
   public static final int PROGRESS_INTERVAL = 3000;



Mime
View raw message