Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C1ACAC7D9 for ; Tue, 5 Jun 2012 02:34:22 +0000 (UTC) Received: (qmail 66613 invoked by uid 500); 5 Jun 2012 02:34:22 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 66569 invoked by uid 500); 5 Jun 2012 02:34:22 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 66561 invoked by uid 99); 5 Jun 2012 02:34:22 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2012 02:34:22 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 Jun 2012 02:34:13 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4F74C2388A5B; Tue, 5 Jun 2012 02:33:51 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1346214 [3/7] - in /hadoop/common/branches/branch-0.22/mapreduce: ./ src/c++/task-controller/ src/c++/task-controller/impl/ src/c++/task-controller/test/ src/c++/task-controller/tests/ src/contrib/streaming/src/java/org/apache/hadoop/strea... Date: Tue, 05 Jun 2012 02:33:47 -0000 To: mapreduce-commits@hadoop.apache.org From: shv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120605023351.4F74C2388A5B@eris.apache.org> Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/JvmManager.java Tue Jun 5 02:33:44 2012 @@ -1,20 +1,20 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ package org.apache.hadoop.mapred; @@ -30,280 +30,264 @@ import java.util.Vector; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapred.TaskController.TaskControllerContext; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; +import org.apache.hadoop.mapred.TaskController; +import org.apache.hadoop.mapred.TaskController.DelayedProcessKiller; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.mapreduce.util.ProcessTree; +import static org.apache.hadoop.mapred.TaskController.Signal; class JvmManager { - public static final Log LOG = - LogFactory.getLog(JvmManager.class); +public static final Log LOG = + LogFactory.getLog(JvmManager.class); - private JvmManagerForType mapJvmManager; +private JvmManagerForType mapJvmManager; - private JvmManagerForType reduceJvmManager; - - public JvmEnv constructJvmEnv(List setup, Vectorvargs, - File stdout,File stderr,long logSize, File workDir, - Map env, JobConf conf) { - return new JvmEnv(setup,vargs,stdout,stderr,logSize,workDir,env,conf); - } - - public JvmManager(TaskTracker tracker) { - mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), - true, tracker); - reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(), - false, tracker); - } +private JvmManagerForType reduceJvmManager; - JvmManagerForType getJvmManagerForType(TaskType type) { - if (type.equals(TaskType.MAP)) { - return mapJvmManager; - } else if (type.equals(TaskType.REDUCE)) { - return reduceJvmManager; - } - return null; - } - - public void stop() { - mapJvmManager.stop(); - reduceJvmManager.stop(); +public JvmEnv constructJvmEnv(List setup, Vectorvargs, + File stdout,File stderr,long logSize, File workDir, + JobConf conf) { + return new JvmEnv(setup,vargs,stdout,stderr,workDir,conf); +} + +public JvmManager(TaskTracker tracker) { + mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), + true, tracker); + reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(), + false, tracker); +} + +JvmManagerForType getJvmManagerForType(TaskType type) { + if (type.equals(TaskType.MAP)) { + return mapJvmManager; + } else if (type.equals(TaskType.REDUCE)) { + return reduceJvmManager; } + return null; +} - public boolean isJvmKnown(JVMId jvmId) { - if (jvmId.isMapJVM()) { - return mapJvmManager.isJvmknown(jvmId); - } else { - return reduceJvmManager.isJvmknown(jvmId); - } +public void stop() throws IOException, InterruptedException { + mapJvmManager.stop(); + reduceJvmManager.stop(); +} + +public boolean isJvmKnown(JVMId jvmId) { + if (jvmId.isMapJVM()) { + return mapJvmManager.isJvmknown(jvmId); + } else { + return reduceJvmManager.isJvmknown(jvmId); } +} - /* - * Saves pid of the given taskJvm - */ - void setPidToJvm(JVMId jvmId, String pid) { - if (jvmId.isMapJVM()) { - mapJvmManager.setPidForJvm(jvmId, pid); - } - else { - reduceJvmManager.setPidForJvm(jvmId, pid); - } +/* + * Saves pid of the given taskJvm + */ +void setPidToJvm(JVMId jvmId, String pid) { + if (jvmId.isMapJVM()) { + mapJvmManager.setPidForJvm(jvmId, pid); } - - /* - * Returns the pid of the task - */ - String getPid(TaskRunner t) { - if (t != null && t.getTask() != null) { - if (t.getTask().isMapTask()) { - return mapJvmManager.getPidByRunningTask(t); - } else { - return reduceJvmManager.getPidByRunningTask(t); - } - } - return null; + else { + reduceJvmManager.setPidForJvm(jvmId, pid); } - - public void launchJvm(TaskRunner t, JvmEnv env) { +} + +/* + * Returns the pid of the task + */ +String getPid(TaskRunner t) { + if (t != null && t.getTask() != null) { if (t.getTask().isMapTask()) { - mapJvmManager.reapJvm(t, env); + return mapJvmManager.getPidByRunningTask(t); } else { - reduceJvmManager.reapJvm(t, env); + return reduceJvmManager.getPidByRunningTask(t); } } + return null; +} - public TaskInProgress getTaskForJvm(JVMId jvmId) - throws IOException { - if (jvmId.isMapJVM()) { - return mapJvmManager.getTaskForJvm(jvmId); - } else { - return reduceJvmManager.getTaskForJvm(jvmId); - } - } - public void taskFinished(TaskRunner tr) { - if (tr.getTask().isMapTask()) { - mapJvmManager.taskFinished(tr); - } else { - reduceJvmManager.taskFinished(tr); - } +public void launchJvm(TaskRunner t, JvmEnv env) + throws IOException, InterruptedException { + if (t.getTask().isMapTask()) { + mapJvmManager.reapJvm(t, env); + } else { + reduceJvmManager.reapJvm(t, env); } +} - public void taskKilled(TaskRunner tr) { - if (tr.getTask().isMapTask()) { - mapJvmManager.taskKilled(tr); - } else { - reduceJvmManager.taskKilled(tr); - } +public TaskInProgress getTaskForJvm(JVMId jvmId) + throws IOException { + if (jvmId.isMapJVM()) { + return mapJvmManager.getTaskForJvm(jvmId); + } else { + return reduceJvmManager.getTaskForJvm(jvmId); } - - void dumpStack(TaskRunner tr) { - if (tr.getTask().isMapTask()) { - mapJvmManager.dumpStack(tr); - } else { - reduceJvmManager.dumpStack(tr); - } +} +public void taskFinished(TaskRunner tr) { + if (tr.getTask().isMapTask()) { + mapJvmManager.taskFinished(tr); + } else { + reduceJvmManager.taskFinished(tr); } +} - public void killJvm(JVMId jvmId) { - if (jvmId.isMap) { - mapJvmManager.killJvm(jvmId); - } else { - reduceJvmManager.killJvm(jvmId); - } - } - - /** - * Adds the task's work dir to the cleanup queue of taskTracker for - * asynchronous deletion of work dir. - * @param tracker taskTracker - * @param task the task whose work dir needs to be deleted - * @throws IOException - */ - static void deleteWorkDir(TaskTracker tracker, Task task) throws IOException { - tracker.getCleanupThread().addToQueue( - TaskTracker.buildTaskControllerTaskPathDeletionContexts( - tracker.getLocalFileSystem(), - tracker.getLocalFiles(tracker.getJobConf(), ""), - task, true /* workDir */, - tracker.getTaskController())); - } - - static class JvmManagerForType { - //Mapping from the JVM IDs to running Tasks - Map jvmToRunningTask = - new HashMap(); - //Mapping from the tasks to JVM IDs - Map runningTaskToJvm = - new HashMap(); - //Mapping from the JVM IDs to Reduce JVM processes - Map jvmIdToRunner = - new HashMap(); - - int maxJvms; - boolean isMap; - - TaskTracker tracker; - - Random rand = new Random(System.currentTimeMillis()); - - public JvmManagerForType(int maxJvms, boolean isMap, - TaskTracker tracker) { - this.maxJvms = maxJvms; - this.isMap = isMap; - this.tracker = tracker; - } - - synchronized public void setRunningTaskForJvm(JVMId jvmId, - TaskRunner t) { - jvmToRunningTask.put(jvmId, t); - runningTaskToJvm.put(t,jvmId); - jvmIdToRunner.get(jvmId).setTaskRunner(t); - } - - synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) - throws IOException { - if (jvmToRunningTask.containsKey(jvmId)) { - //Incase of JVM reuse, tasks are returned to previously launched - //JVM via this method. However when a new task is launched - //the task being returned has to be initialized. - TaskRunner taskRunner = jvmToRunningTask.get(jvmId); - JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); - Task task = taskRunner.getTaskInProgress().getTask(); - - // Initialize task dirs - TaskControllerContext context = - new TaskController.TaskControllerContext(); - context.env = jvmRunner.env; - context.task = task; - // If we are returning the same task as which the JVM was launched - // we don't initialize task once again. - if (!jvmRunner.env.conf.get(JobContext.TASK_ATTEMPT_ID).equals( - task.getTaskID().toString())) { - try { - tracker.getTaskController().initializeTask(context); - } catch (IOException e) { - LOG.warn("Failed to initialize the new task " - + task.getTaskID().toString() + " to be given to JVM with id " - + jvmId); - throw e; - } - } - - return taskRunner.getTaskInProgress(); - } - return null; - } +public void taskKilled(TaskRunner tr + ) throws IOException, InterruptedException { + if (tr.getTask().isMapTask()) { + mapJvmManager.taskKilled(tr); + } else { + reduceJvmManager.taskKilled(tr); + } +} - synchronized String getPidByRunningTask(TaskRunner t) { - JVMId id = runningTaskToJvm.get(t); - if (id != null) { - return jvmIdToRunner.get(id).getPid(); - } - return null; - } +public void killJvm(JVMId jvmId) throws IOException, InterruptedException { + if (jvmId.isMap) { + mapJvmManager.killJvm(jvmId); + } else { + reduceJvmManager.killJvm(jvmId); + } +} - synchronized void setPidForJvm(JVMId jvmId, String pid) { - JvmRunner runner = jvmIdToRunner.get(jvmId); - assert runner != null : "Task must have a runner to set a pid"; - runner.setPid(pid); - } - - synchronized public boolean isJvmknown(JVMId jvmId) { - return jvmIdToRunner.containsKey(jvmId); - } +/** + * Adds the task's work dir to the cleanup queue of taskTracker for + * asynchronous deletion of work dir. + * @param tracker taskTracker + * @param task the task whose work dir needs to be deleted + */ +static void deleteWorkDir(TaskTracker tracker, Task task) { + String user = task.getUser(); + String jobid = task.getJobID().toString(); + String taskid = task.getTaskID().toString(); + String workDir = TaskTracker.getTaskWorkDir(user, jobid, taskid, + task.isTaskCleanupTask()); + tracker.getCleanupThread().addToQueue( + new TaskController.DeletionContext(tracker.getTaskController(), false, + user, + workDir, tracker.getLocalDirs())); + +} - synchronized public void taskFinished(TaskRunner tr) { - JVMId jvmId = runningTaskToJvm.remove(tr); - if (jvmId != null) { - jvmToRunningTask.remove(jvmId); - JvmRunner jvmRunner; - if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) { - jvmRunner.taskRan(); - } - } +static class JvmManagerForType { + //Mapping from the JVM IDs to running Tasks + Map jvmToRunningTask = + new HashMap(); + //Mapping from the tasks to JVM IDs + Map runningTaskToJvm = + new HashMap(); + //Mapping from the JVM IDs to Reduce JVM processes + Map jvmIdToRunner = + new HashMap(); + //Mapping from the JVM IDs to process IDs + Map jvmIdToPid = + new HashMap(); + + final int maxJvms; + final boolean isMap; + final TaskTracker tracker; + final long sleeptimeBeforeSigkill; + final Random rand = new Random(); + + static final String DELAY_BEFORE_KILL_KEY = + "mapred.tasktracker.tasks.sleeptime-before-sigkill"; + // number of milliseconds to wait between TERM and KILL. + private static final long DEFAULT_SLEEPTIME_BEFORE_SIGKILL = 250; + + public JvmManagerForType(int maxJvms, boolean isMap, + TaskTracker tracker) { + this.maxJvms = maxJvms; + this.isMap = isMap; + this.tracker = tracker; + sleeptimeBeforeSigkill = + tracker.getJobConf().getLong(DELAY_BEFORE_KILL_KEY, + DEFAULT_SLEEPTIME_BEFORE_SIGKILL); + } + + synchronized public void setRunningTaskForJvm(JVMId jvmId, + TaskRunner t) { + jvmToRunningTask.put(jvmId, t); + runningTaskToJvm.put(t,jvmId); + jvmIdToRunner.get(jvmId).setBusy(true); + } + + synchronized public TaskInProgress getTaskForJvm(JVMId jvmId) + throws IOException { + final TaskRunner taskRunner = jvmToRunningTask.get(jvmId); + return null == taskRunner ? null : taskRunner.getTaskInProgress(); + //if (jvmToRunningTask.containsKey(jvmId)) { + // //Incase of JVM reuse, tasks are returned to previously launched + // //JVM via this method. However when a new task is launched + // //the task being returned has to be initialized. + // TaskRunner taskRunner = jvmToRunningTask.get(jvmId); + // // TODO retained for MAPREDUCE-1100 + // JvmRunner jvmRunner = jvmIdToRunner.get(jvmId); + // Task task = taskRunner.getTaskInProgress().getTask(); + + // return taskRunner.getTaskInProgress(); + //} + //return null; + } + + synchronized String getPidByRunningTask(TaskRunner t) { + JVMId id = runningTaskToJvm.get(t); + if (id != null) { + return jvmIdToPid.get(id); } + return null; + } - synchronized public void taskKilled(TaskRunner tr) { - JVMId jvmId = runningTaskToJvm.remove(tr); - if (jvmId != null) { - jvmToRunningTask.remove(jvmId); - killJvm(jvmId); - } - } + synchronized void setPidForJvm(JVMId jvmId, String pid) { + JvmRunner runner = jvmIdToRunner.get(jvmId); + assert runner != null : "Task must have a runner to set a pid"; + jvmIdToPid.put(jvmId, pid); + } + + synchronized public boolean isJvmknown(JVMId jvmId) { + return jvmIdToRunner.containsKey(jvmId); + } - synchronized public void killJvm(JVMId jvmId) { + synchronized public void taskFinished(TaskRunner tr) { + JVMId jvmId = runningTaskToJvm.remove(tr); + if (jvmId != null) { + jvmToRunningTask.remove(jvmId); JvmRunner jvmRunner; if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) { - killJvmRunner(jvmRunner); + jvmRunner.taskRan(); } } - - private synchronized void killJvmRunner(JvmRunner jvmRunner) { - jvmRunner.kill(); - removeJvm(jvmRunner.jvmId); - } + } - void dumpStack(TaskRunner tr) { - JvmRunner jvmRunner = null; - synchronized (this) { - JVMId jvmId = runningTaskToJvm.get(tr); - if (null != jvmId) { - jvmRunner = jvmIdToRunner.get(jvmId); - } - } + synchronized public void taskKilled(TaskRunner tr + ) throws IOException, + InterruptedException { + JVMId jvmId = runningTaskToJvm.remove(tr); + if (jvmId != null) { + jvmToRunningTask.remove(jvmId); + killJvm(jvmId); + } + } - // Don't want to hold JvmManager lock while dumping stacks for one - // task. - if (null != jvmRunner) { - jvmRunner.dumpChildStacks(); - } + synchronized public void killJvm(JVMId jvmId) + throws IOException, InterruptedException { + JvmRunner jvmRunner; + if ((jvmRunner = jvmIdToRunner.get(jvmId)) != null) { + killJvmRunner(jvmRunner); } + } + + private synchronized void killJvmRunner(JvmRunner jvmRunner) + throws IOException, InterruptedException { + jvmRunner.kill(); + removeJvm(jvmRunner.jvmId); + } - synchronized public void stop() { + + synchronized public void stop() + throws IOException, InterruptedException { //since the kill() method invoked later on would remove //an entry from the jvmIdToRunner map, we create a //copy of the values and iterate over it (if we don't @@ -320,7 +304,7 @@ class JvmManager { jvmIdToRunner.remove(jvmId); } private synchronized void reapJvm( - TaskRunner t, JvmEnv env) { + TaskRunner t, JvmEnv env) throws IOException, InterruptedException { if (t.getTaskInProgress().wasKilled()) { //the task was killed in-flight //no need to do the rest of the operations @@ -409,7 +393,7 @@ class JvmManager { private synchronized void spawnNewJvm(JobID jobId, JvmEnv env, TaskRunner t) { - JvmRunner jvmRunner = new JvmRunner(env,jobId); + JvmRunner jvmRunner = new JvmRunner(env, jobId, t.getTask()); jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner); //spawn the JVM in a new thread. Note that there will be very little //extra overhead of launching the new thread for a new JVM since @@ -443,83 +427,90 @@ class JvmManager { volatile int numTasksRan; final int numTasksToRun; JVMId jvmId; - private ShellCommandExecutor shexec; // shell terminal for running the task - //context used for starting JVM - private TaskControllerContext initalContext; + volatile boolean busy = true; + private Task firstTask; - public JvmRunner(JvmEnv env, JobID jobId) { + public JvmRunner(JvmEnv env, JobID jobId, Task firstTask) { this.env = env; this.jvmId = new JVMId(jobId, isMap, rand.nextInt()); this.numTasksToRun = env.conf.getNumTasksToExecutePerJvm(); - - this.initalContext = new TaskControllerContext(); - initalContext.sleeptimeBeforeSigkill = tracker.getJobConf() - .getLong(TTConfig.TT_SLEEP_TIME_BEFORE_SIG_KILL, - ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL); + this.firstTask = firstTask; LOG.info("In JvmRunner constructed JVM ID: " + jvmId); } + + @Override public void run() { - runChild(env); + try { + runChild(env); + } catch (InterruptedException ie) { + return; + } catch (IOException e) { + LOG.warn("Caught IOException in JVMRunner", e); + } catch (Throwable e) { + LOG.error("Caught Throwable in JVMRunner. Aborting TaskTracker.", e); + System.exit(1); + } finally { + // TODO MR-1100 + //jvmFinished(); + } } - public void runChild(JvmEnv env) { + public void runChild(JvmEnv env) + throws IOException, InterruptedException { + int exitCode = 0; try { env.vargs.add(Integer.toString(jvmId.getId())); - //Launch the task controller to run task JVM - initalContext.env = env; - tracker.getTaskController().launchTaskJVM(initalContext); + TaskRunner runner = jvmToRunningTask.get(jvmId); + if (runner != null) { + Task task = runner.getTask(); + //Launch the task controller to run task JVM + String user = task.getUser(); + TaskAttemptID taskAttemptId = task.getTaskID(); + String taskAttemptIdStr = task.isTaskCleanupTask() ? + (taskAttemptId.toString() + TaskTracker.TASK_CLEANUP_SUFFIX) : + taskAttemptId.toString(); + exitCode = tracker.getTaskController().launchTask(user, + jvmId.jobId.toString(), taskAttemptIdStr, env.setup, + env.vargs, env.workDir, env.stdout.toString(), + env.stderr.toString()); + } } catch (IOException ioe) { // do nothing // error and output are appropriately redirected } finally { // handle the exit code - shexec = initalContext.shExec; - if (shexec == null) { - return; - } - + // although the process has exited before we get here, + // make sure the entire process group has also been killed. kill(); - int exitCode = shexec.getExitCode(); updateOnJvmExit(jvmId, exitCode); LOG.info("JVM : " + jvmId + " exited with exit code " + exitCode + ". Number of tasks it ran: " + numTasksRan); - try { - // In case of jvm-reuse, - //the task jvm cleans up the common workdir for every - //task at the beginning of each task in the task JVM. - //For the last task, we do it here. - if (env.conf.getNumTasksToExecutePerJvm() != 1) { - deleteWorkDir(tracker, initalContext.task); - } - } catch (IOException ie){} - } - } - - synchronized void setPid(String pid) { - assert initalContext != null; - initalContext.pid = pid; - } - - synchronized String getPid() { - if (initalContext != null) { - return initalContext.pid; - } else { - return null; + deleteWorkDir(tracker, firstTask); } } /** - * Kills the process. Also kills its subprocesses if the process(root of subtree - * of processes) is created using setsid. + * Kills the process. Also kills its subprocesses if the process(root of + * subtree of processes) is created using setsid. */ - synchronized void kill() { + synchronized void kill() throws IOException, InterruptedException { if (!killed) { TaskController controller = tracker.getTaskController(); // Check inital context before issuing a kill to prevent situations // where kill is issued before task is launched. - if (initalContext != null && initalContext.env != null) { - // Destroy the task jvm - controller.destroyTaskJVM(initalContext); + String pidStr = jvmIdToPid.get(jvmId); + if (pidStr != null) { + String user = env.conf.getUser(); + int pid = Integer.parseInt(pidStr); + // start a thread that will kill the process dead + if (sleeptimeBeforeSigkill > 0) { + controller.signalTask(user, pid, Signal.QUIT); + controller.signalTask(user, pid, Signal.TERM); + new DelayedProcessKiller(user, pid, sleeptimeBeforeSigkill, + Signal.KILL, tracker.getTaskController()).start(); + } else { + controller.signalTask(user, pid, Signal.KILL); + } } else { LOG.info(String.format("JVM Not killed %s but just removed", jvmId .toString())); @@ -528,46 +519,19 @@ class JvmManager { } } - /** Send a signal to the JVM requesting that it dump a stack trace, - * and wait for a timeout interval to give this signal time to be - * processed. - */ - void dumpChildStacks() { - if (!killed) { - TaskController controller = tracker.getTaskController(); - // Check inital context before issuing a signal to prevent situations - // where signal is issued before task is launched. - if (initalContext != null && initalContext.env != null) { - // signal the task jvm - controller.dumpTaskStack(initalContext); - - // We're going to kill the jvm with SIGKILL after this, - // so we should wait for a few seconds first to ensure that - // the SIGQUIT has time to be processed. - try { - Thread.sleep(initalContext.sleeptimeBeforeSigkill); - } catch (InterruptedException e) { - LOG.warn("Sleep interrupted : " + - StringUtils.stringifyException(e)); - } - } - } - } - - public synchronized void taskRan() { - initalContext.task = null; + public void taskRan() { + busy = false; numTasksRan++; } public boolean ranAll() { return(numTasksRan == numTasksToRun); } - public synchronized void setTaskRunner(TaskRunner runner) { - initalContext.task = runner.getTask(); - assert initalContext.task != null; + public void setBusy(boolean busy) { + this.busy = busy; } public synchronized boolean isBusy() { - return initalContext.task != null; + return busy; } } } @@ -577,19 +541,15 @@ class JvmManager { File stdout; File stderr; File workDir; - long logSize; JobConf conf; - Map env; - public JvmEnv(List setup, Vector vargs, File stdout, - File stderr, long logSize, File workDir, Map env, - JobConf conf) { + public JvmEnv(List setup, Vector vargs, File stdout, + File stderr, File workDir, JobConf conf) { this.setup = setup; this.vargs = vargs; this.stdout = stdout; this.stderr = stderr; this.workDir = workDir; - this.env = env; this.conf = conf; } } Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LinuxTaskController.java Tue Jun 5 02:33:44 2012 @@ -14,30 +14,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ +*/ package org.apache.hadoop.mapred; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.io.PrintWriter; +import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; -import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; -import org.apache.hadoop.mapred.JvmManager.JvmEnv; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Shell.ExitCodeException; import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.util.StringUtils; +import static org.apache.hadoop.mapred.TaskController.Signal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * A {@link TaskController} that runs the task JVMs as the user @@ -48,8 +45,8 @@ import org.apache.hadoop.util.Shell.Shel * JVM and killing it when needed, and also initializing and * finalizing the task environment. *

The setuid executable is launched using the command line:

- *

task-controller mapreduce.job.user.name command command-args, where

- *

mapreduce.job.user.name is the name of the owner who submits the job

+ *

task-controller user-name command command-args, where

+ *

user-name is the name of the owner who submits the job

*

command is one of the cardinal value of the * {@link LinuxTaskController.TaskControllerCommands} enumeration

*

command-args depends on the command being launched.

@@ -62,52 +59,78 @@ class LinuxTaskController extends TaskCo private static final Log LOG = LogFactory.getLog(LinuxTaskController.class); - - // Name of the executable script that will contain the child - // JVM command line. See writeCommand for details. - private static final String COMMAND_FILE = "taskjvm.sh"; // Path to the setuid executable. - private static String taskControllerExe; + private String taskControllerExe; + private static final String TASK_CONTROLLER_EXEC_KEY = + "mapreduce.tasktracker.task-controller.exe"; - static { - // the task-controller is expected to be under the $HADOOP_HOME/bin - // directory. - File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin"); - taskControllerExe = - new File(hadoopBin, "task-controller").getAbsolutePath(); + @Override + public void setConf(Configuration conf) { + super.setConf(conf); + taskControllerExe = getTaskControllerExecutablePath(conf); } - + public LinuxTaskController() { super(); } - + + protected String getTaskControllerExecutablePath(Configuration conf) { + File hadoopBin = new File(System.getenv("HADOOP_HOME"), "bin"); + String defaultTaskController = + new File(hadoopBin, "task-controller").getAbsolutePath(); + return null == conf + ? defaultTaskController + : conf.get(TASK_CONTROLLER_EXEC_KEY, defaultTaskController); + } + /** * List of commands that the setuid script will execute. */ - enum TaskControllerCommands { - INITIALIZE_USER, - INITIALIZE_JOB, - INITIALIZE_DISTRIBUTEDCACHE_FILE, - LAUNCH_TASK_JVM, - INITIALIZE_TASK, - TERMINATE_TASK_JVM, - KILL_TASK_JVM, - RUN_DEBUG_SCRIPT, - SIGQUIT_TASK_JVM, - ENABLE_TASK_FOR_CLEANUP, - ENABLE_JOB_FOR_CLEANUP + enum Commands { + INITIALIZE_JOB(0), + LAUNCH_TASK_JVM(1), + SIGNAL_TASK(2), + DELETE_AS_USER(3), + DELETE_LOG_AS_USER(4); + + private int value; + Commands(int value) { + this.value = value; + } + int getValue() { + return value; + } + } + + /** + * Result codes returned from the C task-controller. + * These must match the values in task-controller.h. + */ + enum ResultCode { + OK(0), + INVALID_USER_NAME(2), + INVALID_TASK_PID(9), + INVALID_TASKCONTROLLER_PERMISSIONS(22), + INVALID_CONFIG_FILE(24); + + private final int value; + ResultCode(int value) { + this.value = value; + } + int getValue() { + return value; + } } @Override - public void setup() throws IOException { - super.setup(); - + public void setup(LocalDirAllocator allocator) throws IOException { + // Check the permissions of the task-controller binary by running it plainly. - // If permissions are correct, it returns an error code 1, else it returns + // If permissions are correct, it returns an error code 1, else it returns // 24 or something else if some other bugs are also present. String[] taskControllerCmd = - new String[] { getTaskControllerExecutablePath() }; + new String[] { taskControllerExe }; ShellCommandExecutor shExec = new ShellCommandExecutor(taskControllerCmd); try { shExec.execute(); @@ -120,52 +143,96 @@ class LinuxTaskController extends TaskCo + "permissions/ownership with exit code " + exitCode, e); } } + this.allocator = allocator; } + - /** - * Launch a task JVM that will run as the owner of the job. - * - * This method launches a task JVM by executing a setuid executable that will - * switch to the user and run the task. Also does initialization of the first - * task in the same setuid process launch. - */ @Override - void launchTaskJVM(TaskController.TaskControllerContext context) - throws IOException { - JvmEnv env = context.env; - // get the JVM command line. - String cmdLine = - TaskLog.buildCommandLine(env.setup, env.vargs, env.stdout, env.stderr, - env.logSize, true); - - StringBuffer sb = new StringBuffer(); - //export out all the environment variable before child command as - //the setuid/setgid binaries would not be getting, any environmental - //variables which begin with LD_*. - for(Entry entry : env.env.entrySet()) { - sb.append("export "); - sb.append(entry.getKey()); - sb.append("="); - sb.append(entry.getValue()); - sb.append("\n"); - } - sb.append(cmdLine); - // write the command to a file in the - // task specific cache directory - writeCommand(sb.toString(), getTaskCacheDirectory(context, - context.env.workDir)); - - // Call the taskcontroller with the right parameters. - List launchTaskJVMArgs = buildLaunchTaskArgs(context, - context.env.workDir); - ShellCommandExecutor shExec = buildTaskControllerExecutor( - TaskControllerCommands.LAUNCH_TASK_JVM, - env.conf.getUser(), - launchTaskJVMArgs, env.workDir, env.env); - context.shExec = shExec; + public void initializeJob(String user, String jobid, Path credentials, + Path jobConf, TaskUmbilicalProtocol taskTracker, + InetSocketAddress ttAddr + ) throws IOException, InterruptedException { + List command = new ArrayList( + Arrays.asList(taskControllerExe, + user, + Integer.toString(Commands.INITIALIZE_JOB.getValue()), + jobid, + credentials.toUri().getPath().toString(), + jobConf.toUri().getPath().toString())); + File jvm = // use same jvm as parent + new File(new File(System.getProperty("java.home"), "bin"), "java"); + command.add(jvm.toString()); + command.add("-classpath"); + command.add(System.getProperty("java.class.path")); + command.add("-Dhadoop.log.dir=" + TaskLog.getBaseLogDir()); + command.add("-Dhadoop.root.logger=INFO,console"); + command.add(JobLocalizer.class.getName()); // main of JobLocalizer + command.add(user); + command.add(jobid); + // add the task tracker's reporting address + command.add(ttAddr.getHostName()); + command.add(Integer.toString(ttAddr.getPort())); + String[] commandArray = command.toArray(new String[0]); + ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray); + if (LOG.isDebugEnabled()) { + LOG.debug("initializeJob: " + Arrays.toString(commandArray)); + } try { shExec.execute(); + if (LOG.isDebugEnabled()) { + logOutput(shExec.getOutput()); + } + } catch (ExitCodeException e) { + int exitCode = shExec.getExitCode(); + logOutput(shExec.getOutput()); + throw new IOException("Job initialization failed (" + exitCode + ")", e); + } + } + + @Override + public int launchTask(String user, + String jobId, + String attemptId, + List setup, + List jvmArguments, + File currentWorkDirectory, + String stdout, + String stderr) throws IOException { + + ShellCommandExecutor shExec = null; + try { + FileSystem rawFs = FileSystem.getLocal(getConf()).getRaw(); + long logSize = 0; //TODO, Ref BUG:2854624 + // get the JVM command line. + String cmdLine = + TaskLog.buildCommandLine(setup, jvmArguments, + new File(stdout), new File(stderr), logSize, true); + + // write the command to a file in the + // task specific cache directory + Path p = new Path(allocator.getLocalPathForWrite( + TaskTracker.getPrivateDirTaskScriptLocation(user, jobId, attemptId), + getConf()), COMMAND_FILE); + String commandFile = writeCommand(cmdLine, rawFs, p); + + String[] command = + new String[]{taskControllerExe, + user, + Integer.toString(Commands.LAUNCH_TASK_JVM.getValue()), + jobId, + attemptId, + currentWorkDirectory.toString(), + commandFile}; + shExec = new ShellCommandExecutor(command); + + if (LOG.isDebugEnabled()) { + LOG.debug("launchTask: " + Arrays.toString(command)); + } + shExec.execute(); } catch (Exception e) { + if (shExec == null) { + return -1; + } int exitCode = shExec.getExitCode(); LOG.warn("Exit code from task is : " + exitCode); // 143 (SIGTERM) and 137 (SIGKILL) exit codes means the task was @@ -177,481 +244,78 @@ class LinuxTaskController extends TaskCo LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); logOutput(shExec.getOutput()); } - throw new IOException(e); + return exitCode; } if (LOG.isDebugEnabled()) { - LOG.info("Output from LinuxTaskController's launchTaskJVM follows:"); + LOG.debug("Output from LinuxTaskController's launchTask follows:"); logOutput(shExec.getOutput()); } + return 0; } - - /** - * Launch the debug script process that will run as the owner of the job. - * - * This method launches the task debug script process by executing a setuid - * executable that will switch to the user and run the task. - */ + @Override - void runDebugScript(DebugScriptContext context) throws IOException { - String debugOut = FileUtil.makeShellPath(context.stdout); - String cmdLine = TaskLog.buildDebugScriptCommandLine(context.args, debugOut); - writeCommand(cmdLine, getTaskCacheDirectory(context, context.workDir)); - // Call the taskcontroller with the right parameters. - List launchTaskJVMArgs = buildLaunchTaskArgs(context, context.workDir); - runCommand(TaskControllerCommands.RUN_DEBUG_SCRIPT, context.task.getUser(), - launchTaskJVMArgs, context.workDir, null); - } - /** - * Helper method that runs a LinuxTaskController command - * - * @param taskControllerCommand - * @param user - * @param cmdArgs - * @param env - * @throws IOException - */ - private void runCommand(TaskControllerCommands taskControllerCommand, - String user, List cmdArgs, File workDir, Map env) - throws IOException { - - ShellCommandExecutor shExec = - buildTaskControllerExecutor(taskControllerCommand, user, cmdArgs, - workDir, env); - try { - shExec.execute(); - } catch (Exception e) { - LOG.warn("Exit code from " + taskControllerCommand.toString() + " is : " - + shExec.getExitCode()); - LOG.warn("Exception thrown by " + taskControllerCommand.toString() + " : " - + StringUtils.stringifyException(e)); - LOG.info("Output from LinuxTaskController's " - + taskControllerCommand.toString() + " follows:"); - logOutput(shExec.getOutput()); - throw new IOException(e); + public void deleteAsUser(String user, String subDir, String... baseDirs) + throws IOException { + List command = new ArrayList( + Arrays.asList( + taskControllerExe, + user, + Integer.toString(Commands.DELETE_AS_USER.getValue()), + subDir)); + for (String baseDir : baseDirs) { + command.add(baseDir); } + String[] commandArray = command.toArray(new String[0]); + ShellCommandExecutor shExec = new ShellCommandExecutor(commandArray); if (LOG.isDebugEnabled()) { - LOG.info("Output from LinuxTaskController's " - + taskControllerCommand.toString() + " follows:"); - logOutput(shExec.getOutput()); - } - } - - /** - * Returns list of arguments to be passed while initializing a new task. See - * {@code buildTaskControllerExecutor(TaskControllerCommands, String, - * List, JvmEnv)} documentation. - * - * @param context - * @return Argument to be used while launching Task VM - */ - private List buildInitializeTaskArgs(TaskExecContext context) { - List commandArgs = new ArrayList(3); - String taskId = context.task.getTaskID().toString(); - String jobId = getJobId(context); - commandArgs.add(jobId); - if (!context.task.isTaskCleanupTask()) { - commandArgs.add(taskId); - } else { - commandArgs.add(taskId + TaskTracker.TASK_CLEANUP_SUFFIX); + LOG.debug("deleteAsUser: " + Arrays.toString(commandArray)); } - return commandArgs; + shExec.execute(); } @Override - void initializeTask(TaskControllerContext context) - throws IOException { + public void deleteLogAsUser(String user, String subDir) throws IOException { + String[] command = + new String[]{taskControllerExe, + user, + Integer.toString(Commands.DELETE_LOG_AS_USER.getValue()), + subDir}; + ShellCommandExecutor shExec = new ShellCommandExecutor(command); if (LOG.isDebugEnabled()) { - LOG.debug("Going to do " - + TaskControllerCommands.INITIALIZE_TASK.toString() - + " for " + context.task.getTaskID().toString()); - } - runCommand(TaskControllerCommands.INITIALIZE_TASK, - context.env.conf.getUser(), - buildInitializeTaskArgs(context), context.env.workDir, context.env.env); - } - - /** - * Builds the args to be passed to task-controller for enabling of task for - * cleanup. Last arg in this List is either $attemptId or $attemptId/work - */ - private List buildTaskCleanupArgs( - TaskControllerTaskPathDeletionContext context) { - List commandArgs = new ArrayList(3); - commandArgs.add(context.mapredLocalDir.toUri().getPath()); - commandArgs.add(context.task.getJobID().toString()); - - String workDir = ""; - if (context.isWorkDir) { - workDir = "/work"; - } - if (context.task.isTaskCleanupTask()) { - commandArgs.add(context.task.getTaskID() + TaskTracker.TASK_CLEANUP_SUFFIX - + workDir); - } else { - commandArgs.add(context.task.getTaskID() + workDir); + LOG.debug("deleteLogAsUser: " + Arrays.toString(command)); } - - return commandArgs; + shExec.execute(); } - /** - * Builds the args to be passed to task-controller for enabling of job for - * cleanup. Last arg in this List is $jobid. - */ - private List buildJobCleanupArgs( - TaskControllerJobPathDeletionContext context) { - List commandArgs = new ArrayList(2); - commandArgs.add(context.mapredLocalDir.toUri().getPath()); - commandArgs.add(context.jobId.toString()); - - return commandArgs; - } - - /** - * Enables the task for cleanup by changing permissions of the specified path - * in the local filesystem - */ - @Override - void enableTaskForCleanup(PathDeletionContext context) - throws IOException { - if (context instanceof TaskControllerTaskPathDeletionContext) { - TaskControllerTaskPathDeletionContext tContext = - (TaskControllerTaskPathDeletionContext) context; - enablePathForCleanup(tContext, - TaskControllerCommands.ENABLE_TASK_FOR_CLEANUP, - buildTaskCleanupArgs(tContext)); - } - else { - throw new IllegalArgumentException("PathDeletionContext provided is not " - + "TaskControllerTaskPathDeletionContext."); - } - } - - /** - * Enables the job for cleanup by changing permissions of the specified path - * in the local filesystem - */ @Override - void enableJobForCleanup(PathDeletionContext context) - throws IOException { - if (context instanceof TaskControllerJobPathDeletionContext) { - TaskControllerJobPathDeletionContext tContext = - (TaskControllerJobPathDeletionContext) context; - enablePathForCleanup(tContext, - TaskControllerCommands.ENABLE_JOB_FOR_CLEANUP, - buildJobCleanupArgs(tContext)); - } else { - throw new IllegalArgumentException("PathDeletionContext provided is not " - + "TaskControllerJobPathDeletionContext."); - } - } - - /** - * Enable a path for cleanup - * @param c {@link TaskControllerPathDeletionContext} for the path to be - * cleaned up - * @param command {@link TaskControllerCommands} for task/job cleanup - * @param cleanupArgs arguments for the {@link LinuxTaskController} to enable - * path cleanup - */ - private void enablePathForCleanup(TaskControllerPathDeletionContext c, - TaskControllerCommands command, - List cleanupArgs) { + public boolean signalTask(String user, int taskPid, + Signal signal) throws IOException { + String[] command = + new String[]{taskControllerExe, + user, + Integer.toString(Commands.SIGNAL_TASK.getValue()), + Integer.toString(taskPid), + Integer.toString(signal.getValue())}; + ShellCommandExecutor shExec = new ShellCommandExecutor(command); if (LOG.isDebugEnabled()) { - LOG.debug("Going to do " + command.toString() + " for " + c.fullPath); - } - - if ( c.user != null && c.fs instanceof LocalFileSystem) { - try { - runCommand(command, c.user, cleanupArgs, null, null); - } catch(IOException e) { - LOG.warn("Unable to change permissions for " + c.fullPath); - } - } - else { - throw new IllegalArgumentException("Either user is null or the " - + "file system is not local file system."); - } - } - - private void logOutput(String output) { - String shExecOutput = output; - if (shExecOutput != null) { - for (String str : shExecOutput.split("\n")) { - LOG.info(str); - } + LOG.debug("signalTask: " + Arrays.toString(command)); } - } - - private String getJobId(TaskExecContext context) { - String taskId = context.task.getTaskID().toString(); - TaskAttemptID tId = TaskAttemptID.forName(taskId); - String jobId = tId.getJobID().toString(); - return jobId; - } - - /** - * Returns list of arguments to be passed while launching task VM. - * See {@code buildTaskControllerExecutor(TaskControllerCommands, - * String, List, JvmEnv)} documentation. - * @param context - * @return Argument to be used while launching Task VM - */ - private List buildLaunchTaskArgs(TaskExecContext context, - File workDir) { - List commandArgs = new ArrayList(3); - LOG.debug("getting the task directory as: " - + getTaskCacheDirectory(context, workDir)); - LOG.debug("getting the tt_root as " +getDirectoryChosenForTask( - new File(getTaskCacheDirectory(context, workDir)), - context) ); - commandArgs.add(getDirectoryChosenForTask( - new File(getTaskCacheDirectory(context, workDir)), - context)); - commandArgs.addAll(buildInitializeTaskArgs(context)); - return commandArgs; - } - - // Get the directory from the list of directories configured - // in Configs.LOCAL_DIR chosen for storing data pertaining to - // this task. - private String getDirectoryChosenForTask(File directory, - TaskExecContext context) { - String jobId = getJobId(context); - String taskId = context.task.getTaskID().toString(); - for (String dir : mapredLocalDirs) { - File mapredDir = new File(dir); - File taskDir = - new File(mapredDir, TaskTracker.getTaskWorkDir(context.task - .getUser(), jobId, taskId, context.task.isTaskCleanupTask())) - .getParentFile(); - if (directory.equals(taskDir)) { - return dir; - } - } - - LOG.error("Couldn't parse task cache directory correctly"); - throw new IllegalArgumentException("invalid task cache directory " - + directory.getAbsolutePath()); - } - - /** - * Builds the command line for launching/terminating/killing task JVM. - * Following is the format for launching/terminating/killing task JVM - *
- * For launching following is command line argument: - *
- * {@code mapreduce.job.user.name command tt-root job_id task_id} - *
- * For terminating/killing task jvm. - * {@code mapreduce.job.user.name command tt-root task-pid} - * - * @param command command to be executed. - * @param userName mapreduce.job.user.name - * @param cmdArgs list of extra arguments - * @param workDir working directory for the task-controller - * @param env JVM environment variables. - * @return {@link ShellCommandExecutor} - * @throws IOException - */ - private ShellCommandExecutor buildTaskControllerExecutor( - TaskControllerCommands command, String userName, List cmdArgs, - File workDir, Map env) - throws IOException { - String[] taskControllerCmd = new String[3 + cmdArgs.size()]; - taskControllerCmd[0] = getTaskControllerExecutablePath(); - taskControllerCmd[1] = userName; - taskControllerCmd[2] = String.valueOf(command.ordinal()); - int i = 3; - for (String cmdArg : cmdArgs) { - taskControllerCmd[i++] = cmdArg; - } - if (LOG.isDebugEnabled()) { - for (String cmd : taskControllerCmd) { - LOG.debug("taskctrl command = " + cmd); - } - } - ShellCommandExecutor shExec = null; - if(workDir != null && workDir.exists()) { - shExec = new ShellCommandExecutor(taskControllerCmd, - workDir, env); - } else { - shExec = new ShellCommandExecutor(taskControllerCmd); - } - - return shExec; - } - - // Return the task specific directory under the cache. - private String getTaskCacheDirectory(TaskExecContext context, - File workDir) { - // In the case of JVM reuse, the task specific directory - // is different from what is set with respect with - // env.workDir. Hence building this from the taskId everytime. - String taskId = context.task.getTaskID().toString(); - File cacheDirForJob = workDir.getParentFile().getParentFile(); - if(context.task.isTaskCleanupTask()) { - taskId = taskId + TaskTracker.TASK_CLEANUP_SUFFIX; - } - return new File(cacheDirForJob, taskId).getAbsolutePath(); - } - - // Write the JVM command line to a file under the specified directory - // Note that the JVM will be launched using a setuid executable, and - // could potentially contain strings defined by a user. Hence, to - // prevent special character attacks, we write the command line to - // a file and execute it. - private void writeCommand(String cmdLine, - String directory) throws IOException { - - PrintWriter pw = null; - String commandFile = directory + File.separator + COMMAND_FILE; - LOG.info("Writing commands to " + commandFile); - LOG.info("--------Commands Begin--------"); - LOG.info(cmdLine); - LOG.info("--------Commands End--------"); - try { - FileWriter fw = new FileWriter(commandFile); - BufferedWriter bw = new BufferedWriter(fw); - pw = new PrintWriter(bw); - pw.write(cmdLine); - } catch (IOException ioe) { - LOG.error("Caught IOException while writing JVM command line to file. " - + ioe.getMessage()); - } finally { - if (pw != null) { - pw.close(); - } - // set execute permissions for all on the file. - File f = new File(commandFile); - if (f.exists()) { - f.setReadable(true, false); - f.setExecutable(true, false); - } - } - } - - private List buildInitializeJobCommandArgs( - JobInitializationContext context) { - List initJobCmdArgs = new ArrayList(); - initJobCmdArgs.add(context.jobid.toString()); - return initJobCmdArgs; - } - - @Override - void initializeJob(JobInitializationContext context) - throws IOException { - LOG.debug("Going to initialize job " + context.jobid.toString() - + " on the TT"); - runCommand(TaskControllerCommands.INITIALIZE_JOB, context.user, - buildInitializeJobCommandArgs(context), context.workDir, null); - } - - @Override - public void initializeDistributedCacheFile(DistributedCacheFileContext context) - throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Going to initialize distributed cache for " + context.user - + " with localizedBaseDir " + context.localizedBaseDir + - " and uniqueString " + context.uniqueString); - } - List args = new ArrayList(); - // Here, uniqueString might start with '-'. Adding -- in front of the - // arguments indicates that they are non-option parameters. - args.add("--"); - args.add(context.localizedBaseDir.toString()); - args.add(context.uniqueString); - runCommand(TaskControllerCommands.INITIALIZE_DISTRIBUTEDCACHE_FILE, - context.user, args, context.workDir, null); - } - - @Override - public void initializeUser(InitializationContext context) - throws IOException { - LOG.debug("Going to initialize user directories for " + context.user - + " on the TT"); - runCommand(TaskControllerCommands.INITIALIZE_USER, context.user, - new ArrayList(), context.workDir, null); - } - - /** - * API which builds the command line to be pass to LinuxTaskController - * binary to terminate/kill the task. See - * {@code buildTaskControllerExecutor(TaskControllerCommands, - * String, List, JvmEnv)} documentation. - * - * - * @param context context of task which has to be passed kill signal. - * - */ - private List buildKillTaskCommandArgs(TaskControllerContext - context){ - List killTaskJVMArgs = new ArrayList(); - killTaskJVMArgs.add(context.pid); - return killTaskJVMArgs; - } - - /** - * Convenience method used to sending appropriate signal to the task - * VM - * @param context - * @param command - * @throws IOException - */ - protected void signalTask(TaskControllerContext context, - TaskControllerCommands command) throws IOException{ - if(context.task == null) { - LOG.info("Context task is null; not signaling the JVM"); - return; - } - ShellCommandExecutor shExec = buildTaskControllerExecutor( - command, context.env.conf.getUser(), - buildKillTaskCommandArgs(context), context.env.workDir, - context.env.env); try { shExec.execute(); - } catch (Exception e) { - LOG.warn("Output from task-contoller is : " + shExec.getOutput()); - throw new IOException(e); - } - } - - @Override - void terminateTask(TaskControllerContext context) { - try { - signalTask(context, TaskControllerCommands.TERMINATE_TASK_JVM); - } catch (Exception e) { - LOG.warn("Exception thrown while sending kill to the Task VM " + - StringUtils.stringifyException(e)); - } - } - - @Override - void killTask(TaskControllerContext context) { - try { - signalTask(context, TaskControllerCommands.KILL_TASK_JVM); - } catch (Exception e) { - LOG.warn("Exception thrown while sending destroy to the Task VM " + - StringUtils.stringifyException(e)); - } - } - - @Override - void dumpTaskStack(TaskControllerContext context) { - try { - signalTask(context, TaskControllerCommands.SIGQUIT_TASK_JVM); - } catch (Exception e) { - LOG.warn("Exception thrown while sending SIGQUIT to the Task VM " + - StringUtils.stringifyException(e)); + } catch (ExitCodeException e) { + int ret_code = shExec.getExitCode(); + if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) { + return false; + } + logOutput(shExec.getOutput()); + throw new IOException("Problem signalling task " + taskPid + " with " + + signal + "; exit = " + ret_code); } - } - - protected String getTaskControllerExecutablePath() { - return taskControllerExe; + return true; } @Override - String getRunAsUser(JobConf conf) { + public String getRunAsUser(JobConf conf) { return conf.getUser(); } -} \ No newline at end of file +} Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Jun 5 02:33:44 2012 @@ -38,7 +38,6 @@ import org.apache.hadoop.classification. import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.ClusterMetrics; @@ -79,6 +78,7 @@ public class LocalJobRunner implements C private AtomicInteger map_tasks = new AtomicInteger(0); private int reduce_tasks = 0; final Random rand = new Random(); + private final TaskController taskController = new DefaultTaskController(); private JobTrackerInstrumentation myMetrics = null; @@ -116,7 +116,7 @@ public class LocalJobRunner implements C private FileSystem localFs; boolean killed = false; - private TrackerDistributedCacheManager trackerDistributerdCacheManager; + private TrackerDistributedCacheManager trackerDistributedCacheManager; private TaskDistributedCacheManager taskDistributedCacheManager; public long getProtocolVersion(String protocol, long clientVersion) { @@ -134,14 +134,12 @@ public class LocalJobRunner implements C // Manage the distributed cache. If there are files to be copied, // this will trigger localFile to be re-written again. - this.trackerDistributerdCacheManager = - new TrackerDistributedCacheManager(conf, new DefaultTaskController()); + this.trackerDistributedCacheManager = + new TrackerDistributedCacheManager(conf); this.taskDistributedCacheManager = - trackerDistributerdCacheManager.newTaskDistributedCacheManager(conf); - taskDistributedCacheManager.setup( - new LocalDirAllocator(MRConfig.LOCAL_DIR), - new File(systemJobDir.toString()), - "archive", "archive"); + trackerDistributedCacheManager.newTaskDistributedCacheManager( + jobid, conf); + taskDistributedCacheManager.setupCache(conf, "archive", "archive"); if (DistributedCache.getSymlink(conf)) { // This is not supported largely because, @@ -458,7 +456,7 @@ public class LocalJobRunner implements C localFs.delete(localJobFile, true); // delete local copy // Cleanup distributed cache taskDistributedCacheManager.release(); - trackerDistributerdCacheManager.purgeCache(); + trackerDistributedCacheManager.purgeCache(); } catch (IOException e) { LOG.warn("Error cleaning up "+id+": "+e); } @@ -532,6 +530,14 @@ public class LocalJobRunner implements C public boolean ping(TaskAttemptID taskid) throws IOException { return true; } + + @Override + public void updatePrivateDistributedCacheSizes( + org.apache.hadoop.mapreduce.JobID jobId, + long[] sizes) + throws IOException { + trackerDistributedCacheManager.setArchiveSizes(jobId, sizes); + } public boolean canCommit(TaskAttemptID taskid) throws IOException { @@ -578,6 +584,7 @@ public class LocalJobRunner implements C this.fs = FileSystem.getLocal(conf); this.conf = conf; myMetrics = new JobTrackerMetricsInst(null, new JobConf(conf)); + taskController.setConf(conf); } // JobSubmissionProtocol methods Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTask.java Tue Jun 5 02:33:44 2012 @@ -129,8 +129,10 @@ class MapTask extends Task { @Override public TaskRunner createRunner(TaskTracker tracker, - TaskTracker.TaskInProgress tip) { - return new MapTaskRunner(tip, tracker, this.conf); + TaskTracker.TaskInProgress tip, + TaskTracker.RunningJob rjob + ) throws IOException { + return new MapTaskRunner(tip, tracker, this.conf, rjob); } @Override Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/MapTaskRunner.java Tue Jun 5 02:33:44 2012 @@ -17,14 +17,17 @@ */ package org.apache.hadoop.mapred; +import java.io.IOException; + import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.log4j.Level; /** Runs a map task. */ class MapTaskRunner extends TaskRunner { - public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf) { - super(task, tracker, conf); + public MapTaskRunner(TaskInProgress task, TaskTracker tracker, JobConf conf, + TaskTracker.RunningJob rjob) throws IOException { + super(task, tracker, conf, rjob); } @Override Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jun 5 02:33:44 2012 @@ -141,9 +141,10 @@ public class ReduceTask extends Task { } @Override - public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip) - throws IOException { - return new ReduceTaskRunner(tip, tracker, this.conf); + public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip, + TaskTracker.RunningJob rjob + ) throws IOException { + return new ReduceTaskRunner(tip, tracker, this.conf, rjob); } @Override Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java Tue Jun 5 02:33:44 2012 @@ -26,9 +26,10 @@ import org.apache.log4j.Level; class ReduceTaskRunner extends TaskRunner { public ReduceTaskRunner(TaskInProgress task, TaskTracker tracker, - JobConf conf) throws IOException { + JobConf conf, TaskTracker.RunningJob rjob + ) throws IOException { - super(task, tracker, conf); + super(task, tracker, conf, rjob); } public void close() throws IOException { Modified: hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java?rev=1346214&r1=1346213&r2=1346214&view=diff ============================================================================== --- hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/common/branches/branch-0.22/mapreduce/src/java/org/apache/hadoop/mapred/Task.java Tue Jun 5 02:33:44 2012 @@ -468,7 +468,9 @@ abstract public class Task implements Wr /** Return an approprate thread runner for this task. * @param tip TODO*/ public abstract TaskRunner createRunner(TaskTracker tracker, - TaskTracker.TaskInProgress tip) throws IOException; + TaskTracker.TaskInProgress tip, + TaskTracker.RunningJob rjob + ) throws IOException; /** The number of milliseconds between progress reports. */ public static final int PROGRESS_INTERVAL = 3000;