Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 67644 invoked from network); 26 Jan 2010 14:04:17 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 26 Jan 2010 14:04:17 -0000 Received: (qmail 76262 invoked by uid 500); 26 Jan 2010 14:04:17 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 76225 invoked by uid 500); 26 Jan 2010 14:04:16 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 76215 invoked by uid 99); 26 Jan 2010 14:04:16 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2010 14:04:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Jan 2010 14:04:06 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 839E52388ABA; Tue, 26 Jan 2010 14:03:15 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r903227 [11/16] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ conf/ ivy/ src/benchmarks/gridmix/ src/benchmarks/gridmix/javasort/ src/benchmarks/gridmix/maxent/ src/benchmarks/gridmix/monsterQuery/ src/benchmarks/gri... Date: Tue, 26 Jan 2010 14:03:09 -0000 To: mapreduce-commits@hadoop.apache.org From: stevel@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100126140315.839E52388ABA@eris.apache.org> Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Tue Jan 26 14:02:53 2010 @@ -19,6 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.Queue.QueueOperation; import org.apache.hadoop.mapreduce.QueueState; import org.apache.hadoop.security.SecurityUtil.AccessControlList; @@ -87,9 +88,30 @@ } - QueueConfigurationParser(String file) { + QueueConfigurationParser(String confFile) { + File file = new File(confFile).getAbsoluteFile(); + if (!file.exists()) { + throw new RuntimeException("Configuration file not found at " + + confFile); + } + InputStream in = null; + try { + in = new BufferedInputStream(new FileInputStream(file)); + loadFrom(in); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } finally { + IOUtils.closeStream(in); + } + } + + QueueConfigurationParser(InputStream xmlInput) { + loadFrom(xmlInput); + } + + private void loadFrom(InputStream xmlInput) { try { - this.root = loadResource(file); + this.root = loadResource(xmlInput); } catch (ParserConfigurationException e) { throw new RuntimeException(e); } catch (SAXException e) { @@ -120,13 +142,13 @@ * Method to load the resource file. * generates the root. * - * @param confFile + * @param resourceInput InputStream that provides the XML to parse * @return * @throws ParserConfigurationException * @throws SAXException * @throws IOException */ - protected Queue loadResource(String confFile) + protected Queue loadResource(InputStream resourceInput) throws ParserConfigurationException, SAXException, IOException { DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); @@ -146,19 +168,8 @@ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); Document doc = null; Element queuesNode = null; - File file = new File(confFile).getAbsoluteFile(); - if (file.exists()) { - InputStream in = new BufferedInputStream(new FileInputStream(file)); - try { - doc = builder.parse(in); - } finally { - in.close(); - } - } - if (doc == null) { - throw new RuntimeException(file.getAbsolutePath() + " not found"); - } + doc = builder.parse(resourceInput); queuesNode = doc.getDocumentElement(); return this.parseResource(queuesNode); } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/QueueManager.java Tue Jan 26 14:02:53 2010 @@ -22,6 +22,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.TaskScheduler.QueueRefresher; import org.apache.hadoop.mapreduce.QueueState; import org.apache.hadoop.security.SecurityUtil.AccessControlList; @@ -31,6 +32,8 @@ import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; +import java.io.BufferedInputStream; +import java.io.InputStream; import java.io.IOException; import java.io.Writer; import java.util.ArrayList; @@ -87,6 +90,7 @@ private Map leafQueues = new HashMap(); private Map allQueues = new HashMap(); static final String QUEUE_CONF_FILE_NAME = "mapred-queues.xml"; + static final String QUEUE_CONF_DEFAULT_FILE_NAME = "mapred-queues-default.xml"; // Prefix in configuration for queue related keys static final String QUEUE_CONF_PROPERTY_NAME_PREFIX @@ -120,10 +124,24 @@ } return new DeprecatedQueueConfigurationParser(conf); } else { - URL filePath = + URL xmlInUrl = Thread.currentThread().getContextClassLoader() .getResource(QUEUE_CONF_FILE_NAME); - return new QueueConfigurationParser(filePath.getPath()); + if (xmlInUrl == null) { + xmlInUrl = Thread.currentThread().getContextClassLoader() + .getResource(QUEUE_CONF_DEFAULT_FILE_NAME); + assert xmlInUrl != null; // this should be in our jar + } + InputStream stream = null; + try { + stream = xmlInUrl.openStream(); + return new QueueConfigurationParser(new BufferedInputStream(stream)); + } catch (IOException ioe) { + throw new RuntimeException("Couldn't open queue configuration at " + + xmlInUrl, ioe); + } finally { + IOUtils.closeStream(stream); + } } } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Jan 26 14:02:53 2010 @@ -45,6 +45,7 @@ import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator; import org.apache.hadoop.mapred.TaskTracker.TaskInProgress; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.task.reduce.Shuffle; import org.apache.hadoop.util.Progress; @@ -77,19 +78,19 @@ private Progress sortPhase; private Progress reducePhase; private Counters.Counter shuffledMapsCounter = - getCounters().findCounter(Counter.SHUFFLED_MAPS); + getCounters().findCounter(TaskCounter.SHUFFLED_MAPS); private Counters.Counter reduceShuffleBytes = - getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES); + getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES); private Counters.Counter reduceInputKeyCounter = - getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS); + getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS); private Counters.Counter reduceInputValueCounter = - getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS); + getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS); private Counters.Counter reduceOutputCounter = - getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS); + getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS); private Counters.Counter reduceCombineInputCounter = - getCounters().findCounter(Counter.COMBINE_INPUT_RECORDS); + getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS); private Counters.Counter reduceCombineOutputCounter = - getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS); + getCounters().findCounter(TaskCounter.COMBINE_OUTPUT_RECORDS); // A custom comparator for map output files. Here the ordering is determined // by the file's size and path. In case of files with same size and different @@ -237,9 +238,9 @@ super(in, comparator, keyClass, valClass, conf, reporter); this.umbilical = umbilical; this.skipGroupCounter = - reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS); + reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS); this.skipRecCounter = - reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS); + reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS); this.toWriteSkipRecs = toWriteSkipRecs() && SkipBadRecords.getSkipOutputPath(conf)!=null; this.keyClass = keyClass; Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/Task.java Tue Jan 26 14:02:53 2010 @@ -28,6 +28,8 @@ import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicBoolean; +import javax.crypto.SecretKey; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; @@ -44,13 +46,12 @@ import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.IFile.Writer; +import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.TaskCounter; import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer; import org.apache.hadoop.mapreduce.task.ReduceContextImpl; -import org.apache.hadoop.mapreduce.security.JobTokens; -import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progress; @@ -68,27 +69,6 @@ private static final Log LOG = LogFactory.getLog(Task.class); - // Counters used by Task subclasses - protected static enum Counter { - MAP_INPUT_RECORDS, - MAP_OUTPUT_RECORDS, - MAP_SKIPPED_RECORDS, - MAP_INPUT_BYTES, - MAP_OUTPUT_BYTES, - COMBINE_INPUT_RECORDS, - COMBINE_OUTPUT_RECORDS, - REDUCE_INPUT_GROUPS, - REDUCE_SHUFFLE_BYTES, - REDUCE_INPUT_RECORDS, - REDUCE_OUTPUT_RECORDS, - REDUCE_SKIPPED_GROUPS, - REDUCE_SKIPPED_RECORDS, - SPILLED_RECORDS, - FAILED_SHUFFLE, - SHUFFLED_MAPS, - MERGED_MAP_OUTPUTS, - } - public static String MERGED_OUTPUT_PREFIX = ".merged"; @@ -160,7 +140,7 @@ protected final Counters.Counter mergedMapOutputsCounter; private int numSlotsRequired; protected TaskUmbilicalProtocol umbilical; - protected JobTokens jobTokens=null; // storage of the secret keys + protected SecretKey tokenSecret; //////////////////////////////////////////// // Constructors @@ -169,9 +149,12 @@ public Task() { taskStatus = TaskStatus.createTaskStatus(isMapTask()); taskId = new TaskAttemptID(); - spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); - failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE); - mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS); + spilledRecordsCounter = + counters.findCounter(TaskCounter.SPILLED_RECORDS); + failedShuffleCounter = + counters.findCounter(TaskCounter.FAILED_SHUFFLE); + mergedMapOutputsCounter = + counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); } public Task(String jobFile, TaskAttemptID taskId, int partition, @@ -190,8 +173,9 @@ TaskStatus.Phase.SHUFFLE, counters); spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); - failedShuffleCounter = counters.findCounter(Counter.FAILED_SHUFFLE); - mergedMapOutputsCounter = counters.findCounter(Counter.MERGED_MAP_OUTPUTS); + failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE); + mergedMapOutputsCounter = + counters.findCounter(TaskCounter.MERGED_MAP_OUTPUTS); } //////////////////////////////////////////// @@ -215,19 +199,19 @@ } /** - * set JobToken storage - * @param jt + * Set the job token secret + * @param tokenSecret the secret */ - public void setJobTokens(JobTokens jt) { - this.jobTokens = jt; + public void setJobTokenSecret(SecretKey tokenSecret) { + this.tokenSecret = tokenSecret; } /** - * get JobToken storage - * @return storage object + * Get the job token secret + * @return the token secret */ - public JobTokens getJobTokens() { - return this.jobTokens; + public SecretKey getJobTokenSecret() { + return this.tokenSecret; } @@ -938,8 +922,22 @@ + JobStatus.State.FAILED + " or " + JobStatus.State.KILLED); } + + // delete the staging area for the job + JobConf conf = new JobConf(jobContext.getConfiguration()); + if (!supportIsolationRunner(conf)) { + String jobTempDir = conf.get("mapreduce.job.dir"); + Path jobTempDirPath = new Path(jobTempDir); + FileSystem fs = jobTempDirPath.getFileSystem(conf); + fs.delete(jobTempDirPath, true); + } done(umbilical, reporter); } + + protected boolean supportIsolationRunner(JobConf conf) { + return (conf.getKeepTaskFilesPattern() != null || conf + .getKeepFailedTaskFiles()); + } protected void runJobSetupTask(TaskUmbilicalProtocol umbilical, TaskReporter reporter Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskController.java Tue Jan 26 14:02:53 2010 @@ -25,6 +25,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; import org.apache.hadoop.mapred.JvmManager.JvmEnv; import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.mapreduce.MRConfig; @@ -69,12 +72,10 @@ * disks: *
    *
  • mapreduce.cluster.local.directories
  • - *
  • Job cache directories
  • - *
  • Archive directories
  • *
  • Hadoop log directories
  • *
*/ - void setup() { + public void setup() { for (String localDir : this.mapredLocalDirs) { // Set up the mapreduce.cluster.local.directories. File mapredlocalDir = new File(localDir); @@ -108,13 +109,13 @@ /** * Take task-controller specific actions to initialize the distributed cache - * files. This involves setting appropriate permissions for these files so as + * file. This involves setting appropriate permissions for these files so as * to secure them to be accessible only their owners. * * @param context * @throws IOException */ - public abstract void initializeDistributedCache(InitializationContext context) + public abstract void initializeDistributedCacheFile(DistributedCacheFileContext context) throws IOException; /** @@ -134,26 +135,26 @@ /** * Top level cleanup a task JVM method. - * - * The current implementation does the following. *
    - *
  1. Sends a graceful terminate signal to task JVM allowing its sub-process + *
  2. Sends a graceful termiante signal to task JVM to allow subprocesses * to cleanup.
  3. - *
  4. Waits for stipulated period
  5. *
  6. Sends a forceful kill signal to task JVM, terminating all its - * sub-process forcefully.
  7. + * sub-processes forcefully. *
- * + * * @param context the task for which kill signal has to be sent. */ final void destroyTaskJVM(TaskControllerContext context) { + // Send SIGTERM to try to ask for a polite exit. terminateTask(context); + try { Thread.sleep(context.sleeptimeBeforeSigkill); } catch (InterruptedException e) { - LOG.warn("Sleep interrupted : " + + LOG.warn("Sleep interrupted : " + StringUtils.stringifyException(e)); } + killTask(context); } @@ -191,12 +192,104 @@ } /** + * Contains info related to the path of the file/dir to be deleted. This info + * is needed by task-controller to build the full path of the file/dir + */ + static class TaskControllerPathDeletionContext extends PathDeletionContext { + Task task; + boolean isWorkDir; + TaskController taskController; + + /** + * mapredLocalDir is the base dir under which to-be-deleted taskWorkDir or + * taskAttemptDir exists. fullPath of taskAttemptDir or taskWorkDir + * is built using mapredLocalDir, jobId, taskId, etc. + */ + Path mapredLocalDir; + + public TaskControllerPathDeletionContext(FileSystem fs, Path mapredLocalDir, + Task task, boolean isWorkDir, TaskController taskController) { + super(fs, null); + this.task = task; + this.isWorkDir = isWorkDir; + this.taskController = taskController; + this.mapredLocalDir = mapredLocalDir; + } + + @Override + protected String getPathForCleanup() { + if (fullPath == null) { + fullPath = buildPathForDeletion(); + } + return fullPath; + } + + /** + * Builds the path of taskAttemptDir OR taskWorkDir based on + * mapredLocalDir, jobId, taskId, etc + */ + String buildPathForDeletion() { + String subDir = (isWorkDir) ? TaskTracker.getTaskWorkDir(task.getUser(), + task.getJobID().toString(), task.getTaskID().toString(), + task.isTaskCleanupTask()) + : TaskTracker.getLocalTaskDir(task.getUser(), + task.getJobID().toString(), task.getTaskID().toString(), + task.isTaskCleanupTask()); + + return mapredLocalDir.toUri().getPath() + Path.SEPARATOR + subDir; + } + + /** + * Makes the path(and its subdirectories recursively) fully deletable by + * setting proper permissions(770) by task-controller + */ + @Override + protected void enablePathForCleanup() throws IOException { + getPathForCleanup();// allow init of fullPath, if not inited already + if (fs.exists(new Path(fullPath))) { + taskController.enableTaskForCleanup(this); + } + } + } + + /** * NOTE: This class is internal only class and not intended for users!! * */ public static class InitializationContext { public File workDir; public String user; + + public InitializationContext() { + } + + public InitializationContext(String user, File workDir) { + this.user = user; + this.workDir = workDir; + } + } + + /** + * This is used for initializing the private localized files in distributed + * cache. Initialization would involve changing permission, ownership and etc. + */ + public static class DistributedCacheFileContext extends InitializationContext { + // base directory under which file has been localized + Path localizedBaseDir; + // the unique string used to construct the localized path + String uniqueString; + + public DistributedCacheFileContext(String user, File workDir, + Path localizedBaseDir, String uniqueString) { + super(user, workDir); + this.localizedBaseDir = localizedBaseDir; + this.uniqueString = uniqueString; + } + + public Path getLocalizedUniqueDir() { + return new Path(localizedBaseDir, new Path(TaskTracker + .getPrivateDistributedCacheDir(user), uniqueString)); + } } static class JobInitializationContext extends InitializationContext { @@ -224,6 +317,15 @@ */ abstract void killTask(TaskControllerContext context); + + /** + * Sends a QUIT signal to direct the task JVM (and sub-processes) to + * dump their stack to stdout. + * + * @param context task context. + */ + abstract void dumpTaskStack(TaskControllerContext context); + /** * Initialize user on this TaskTracer in a TaskController specific manner. * @@ -242,4 +344,11 @@ abstract void runDebugScript(DebugScriptContext context) throws IOException; + /** + * Enable the task for cleanup by changing permissions of the path + * @param context path deletion context + * @throws IOException + */ + abstract void enableTaskForCleanup(PathDeletionContext context) + throws IOException; } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jan 26 14:02:53 2010 @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -31,13 +32,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.JobInProgress.DataStatistics; import org.apache.hadoop.mapred.SortedRanges.Range; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.net.Node; @@ -65,7 +65,7 @@ // Defines the TIP private String jobFile = null; - private Job.RawSplit rawSplit; + private TaskSplitMetaInfo splitInfo; private int numMaps; private int partition; private JobTracker jobtracker; @@ -140,12 +140,12 @@ * Constructor for MapTask */ public TaskInProgress(JobID jobid, String jobFile, - Job.RawSplit rawSplit, + TaskSplitMetaInfo split, JobTracker jobtracker, JobConf conf, JobInProgress job, int partition, int numSlotsRequired) { this.jobFile = jobFile; - this.rawSplit = rawSplit; + this.splitInfo = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; @@ -316,10 +316,36 @@ * Whether this is a map task */ public boolean isMapTask() { - return rawSplit != null; + return splitInfo != null; } /** + * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. + * The type of an attempt is determined by the nature of the task and not its + * id. + * For example, + * - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it + * has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP + * instead of MAP. + * - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is + * now supposed to do the task-level cleanup. In such a case this attempt + * will be of type TASK_CLEANUP instead of REDUCE. + */ + TaskType getAttemptType (TaskAttemptID id) { + if (isCleanupAttempt(id)) { + return TaskType.TASK_CLEANUP; + } else if (isJobSetupTask()) { + return TaskType.JOB_SETUP; + } else if (isJobCleanupTask()) { + return TaskType.JOB_CLEANUP; + } else if (isMapTask()) { + return TaskType.MAP; + } else { + return TaskType.REDUCE; + } + } + + /** * Is the Task associated with taskid is the first attempt of the tip? * @param taskId * @return Returns true if the Task is the first attempt of the tip @@ -335,6 +361,15 @@ public boolean isRunning() { return !activeTasks.isEmpty(); } + + /** + * Is this TaskAttemptid running + * @param taskId + * @return true if taskId attempt is running. + */ + boolean isAttemptRunning(TaskAttemptID taskId) { + return activeTasks.containsKey(taskId); + } TaskAttemptID getSuccessfulTaskid() { return successfulTaskId; @@ -534,15 +569,16 @@ * A status message from a client has arrived. * It updates the status of a single component-thread-task, * which might result in an overall TaskInProgress status update. - * @return has the task changed its state noticably? + * @return has the task changed its state noticeably? */ synchronized boolean updateStatus(TaskStatus status) { TaskAttemptID taskid = status.getTaskID(); + String tracker = status.getTaskTracker(); String diagInfo = status.getDiagnosticInfo(); TaskStatus oldStatus = taskStatuses.get(taskid); boolean changed = true; if (diagInfo != null && diagInfo.length() > 0) { - LOG.info("Error from "+taskid+": "+diagInfo); + LOG.info("Error from " + taskid + " on " + tracker + ": "+ diagInfo); addDiagnosticInfo(taskid, diagInfo); } @@ -697,6 +733,12 @@ if (tasks.contains(taskid)) { if (taskState == TaskStatus.State.FAILED) { numTaskFailures++; + if (isMapTask()) { + jobtracker.getInstrumentation().failedMap(taskid); + } else { + jobtracker.getInstrumentation().failedReduce(taskid); + } + machinesWhereFailed.add(trackerHostName); if(maxSkipRecords>0) { //skipping feature enabled @@ -707,6 +749,11 @@ } else if (taskState == TaskStatus.State.KILLED) { numKilledTasks++; + if (isMapTask()) { + jobtracker.getInstrumentation().killedMap(taskid); + } else { + jobtracker.getInstrumentation().killedReduce(taskid); + } } } @@ -787,7 +834,7 @@ */ public String[] getSplitLocations() { if (isMapTask() && !jobSetup && !jobCleanup) { - return rawSplit.getLocations(); + return splitInfo.getLocations(); } return new String[0]; } @@ -800,6 +847,13 @@ } /** + * Get all the {@link TaskAttemptID}s in this {@link TaskInProgress} + */ + TaskAttemptID[] getAllTaskAttemptIDs() { + return tasks.toArray(new TaskAttemptID[tasks.size()]); + } + + /** * Get the status of the specified task * @param taskid * @return @@ -992,16 +1046,8 @@ if (isMapTask()) { LOG.debug("attempt " + numTaskFailures + " sending skippedRecords " + failedRanges.getIndicesCount()); - String splitClass = null; - BytesWritable split; - if (!jobSetup && !jobCleanup) { - splitClass = rawSplit.getClassName(); - split = rawSplit.getBytes(); - } else { - split = new BytesWritable(); - } - t = new MapTask(jobFile, taskid, partition, splitClass, split, - numSlotsNeeded); + t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(), + numSlotsNeeded); } else { t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded); } @@ -1114,7 +1160,7 @@ if (!isMapTask() || jobSetup || jobCleanup) { return ""; } - String[] splits = rawSplit.getLocations(); + String[] splits = splitInfo.getLocations(); Node[] nodes = new Node[splits.length]; for (int i = 0; i < splits.length; i++) { nodes[i] = jobtracker.getNode(splits[i]); @@ -1144,16 +1190,12 @@ public long getMapInputSize() { if(isMapTask() && !jobSetup && !jobCleanup) { - return rawSplit.getDataLength(); + return splitInfo.getInputDataLength(); } else { return 0; } } - public void clearSplit() { - rawSplit.clearBytes(); - } - /** * Compare most recent task attempts dispatch time to current system time so * that task progress rate will slow down as time proceeds even if no progress Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskRunner.java Tue Jan 26 14:02:53 2010 @@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.TaskController.InitializationContext; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.log4j.Level; @@ -93,6 +92,8 @@ public TaskTracker.TaskInProgress getTaskInProgress() { return tip; } public TaskTracker getTracker() { return tracker; } + public JvmManager getJvmManager() { return jvmManager; } + /** Called to assemble this task's input. This method is run in the parent * process before the child is spawned. It should not execute user code, * only system code. */ @@ -172,18 +173,14 @@ taskDistributedCacheManager = tracker.getTrackerDistributedCacheManager() .newTaskDistributedCacheManager(conf); taskDistributedCacheManager.setup(lDirAlloc, workDir, TaskTracker - .getDistributedCacheDir(conf.getUser())); + .getPrivateDistributedCacheDir(conf.getUser()), + TaskTracker.getPublicDistributedCacheDir()); // Set up the child task's configuration. After this call, no localization // of files should happen in the TaskTracker's process space. Any changes to // the conf object after this will NOT be reflected to the child. setupChildTaskConfiguration(lDirAlloc); - InitializationContext context = new InitializationContext(); - context.user = conf.getUser(); - context.workDir = new File(conf.get(TaskTracker.JOB_LOCAL_DIR)); - tracker.getTaskController().initializeDistributedCache(context); - if (!prepare()) { return; } @@ -521,7 +518,7 @@ } hadoopClientOpts = hadoopClientOpts + "-Dhadoop.tasklog.taskid=" + taskid + " -Dhadoop.tasklog.totalLogFileSize=" + logSize; - env.put("HADOOP_CLIENT_OPTS", "\"" + hadoopClientOpts + "\""); + env.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); // add the env variables passed by the user String mapredChildEnv = getChildEnv(conf); @@ -647,7 +644,40 @@ } } classPaths.add(new File(jobCacheDir, "classes").toString()); - classPaths.add(jobCacheDir.toString()); + classPaths.add(new File(jobCacheDir, "job.jar").toString()); + } + + /** + * Sets permissions recursively and then deletes the contents of dir. + * Makes dir empty directory(does not delete dir itself). + */ + static void deleteDirContents(JobConf conf, File dir) throws IOException { + FileSystem fs = FileSystem.getLocal(conf); + if (fs.exists(new Path(dir.getAbsolutePath()))) { + File contents[] = dir.listFiles(); + if (contents != null) { + for (int i = 0; i < contents.length; i++) { + try { + int ret = 0; + if ((ret = FileUtil.chmod(contents[i].getAbsolutePath(), + "ug+rwx", true)) != 0) { + LOG.warn("Unable to chmod for " + contents[i] + + "; chmod exit status = " + ret); + } + } catch(InterruptedException e) { + LOG.warn("Interrupted while setting permissions for contents of " + + "workDir. Not deleting the remaining contents of workDir."); + return; + } + if (!fs.delete(new Path(contents[i].getAbsolutePath()), true)) { + LOG.warn("Unable to delete "+ contents[i]); + } + } + } + } + else { + LOG.warn(dir + " does not exist."); + } } /** @@ -660,11 +690,14 @@ * @param workDir Working directory, which is completely deleted. */ public static void setupWorkDir(JobConf conf, File workDir) throws IOException { - LOG.debug("Fully deleting and re-creating" + workDir); - FileUtil.fullyDelete(workDir); - if (!workDir.mkdir()) { - LOG.debug("Did not recreate " + workDir); + if (LOG.isDebugEnabled()) { + LOG.debug("Fully deleting contents of " + workDir); } + + /** delete only the contents of workDir leaving the directory empty. We + * can't delete the workDir as it is the current working directory. + */ + deleteDirContents(conf, workDir); if (DistributedCache.getSymlink(conf)) { URI[] archives = DistributedCache.getCacheArchives(conf); Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Jan 26 14:02:53 2010 @@ -38,11 +38,11 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.TreeMap; -import java.util.Vector; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.regex.Pattern; +import javax.crypto.SecretKey; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; @@ -67,14 +67,19 @@ import org.apache.hadoop.ipc.Server; import org.apache.hadoop.mapred.TaskController.DebugScriptContext; import org.apache.hadoop.mapred.TaskController.JobInitializationContext; +import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext; +import org.apache.hadoop.mapred.TaskController.TaskControllerPathDeletionContext; import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus; import org.apache.hadoop.mapred.pipes.Submitter; import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager; -import org.apache.hadoop.mapreduce.security.JobTokens; import org.apache.hadoop.mapreduce.security.SecureShuffleUtils; +import org.apache.hadoop.mapreduce.security.TokenCache; +import org.apache.hadoop.mapreduce.security.TokenStorage; +import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; +import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig; import org.apache.hadoop.mapreduce.server.tasktracker.Localizer; import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader; @@ -86,13 +91,16 @@ import org.apache.hadoop.net.DNS; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UnixUserGroupInformation; import org.apache.hadoop.security.authorize.ConfiguredPolicy; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.mapreduce.util.ConfigUtil; import org.apache.hadoop.mapreduce.util.MemoryCalculatorPlugin; +import org.apache.hadoop.mapreduce.util.ResourceCalculatorPlugin; import org.apache.hadoop.mapreduce.util.ProcfsBasedProcessTree; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.RunJar; import org.apache.hadoop.util.Service; @@ -100,7 +108,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.DiskChecker.DiskErrorException; -import org.apache.hadoop.util.Shell.ShellCommandExecutor; +import org.apache.hadoop.mapreduce.util.MRAsyncDiskService; /******************************************************* * TaskTracker is a process that starts and tracks MR Tasks @@ -123,6 +131,7 @@ static final String MAPRED_TASKTRACKER_PMEM_RESERVED_PROPERTY = "mapred.tasktracker.pmem.reserved"; + static final long WAIT_FOR_DONE = 3 * 1000; int httpPort; @@ -194,6 +203,8 @@ */ Map runningTasks = null; Map runningJobs = new TreeMap(); + private final JobTokenSecretManager jobTokenSecretManager + = new JobTokenSecretManager(); volatile int mapTotal = 0; volatile int reduceTotal = 0; @@ -221,13 +232,14 @@ static final String OUTPUT = "output"; private static final String JARSDIR = "jars"; static final String LOCAL_SPLIT_FILE = "split.dta"; + static final String LOCAL_SPLIT_META_FILE = "split.info"; static final String JOBFILE = "job.xml"; static final String JOB_TOKEN_FILE="jobToken"; //localized file static final String JOB_LOCAL_DIR = JobContext.JOB_LOCAL_DIR; private JobConf fConf; - FileSystem localFs; + private FileSystem localFs; private Localizer localizer; @@ -254,9 +266,7 @@ private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT; private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT; - - static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY = - TT_MEMORY_CALCULATOR_PLUGIN; + private ResourceCalculatorPlugin resourceCalculatorPlugin = null; /** * the minimum interval between jobtracker polls @@ -269,6 +279,8 @@ private IndexCache indexCache; + private MRAsyncDiskService asyncDiskService; + /** * Handle to the specific instance of the {@link TaskController} class */ @@ -365,6 +377,11 @@ return taskController; } + // Currently this is used only by tests + void setTaskController(TaskController t) { + taskController = t; + } + private RunningJob addTaskToJob(JobID jobId, TaskInProgress tip) { synchronized (runningJobs) { @@ -398,6 +415,10 @@ } } + JobTokenSecretManager getJobTokenSecretManager() { + return jobTokenSecretManager; + } + Localizer getLocalizer() { return localizer; } @@ -410,9 +431,13 @@ return TaskTracker.SUBDIR + Path.SEPARATOR + user; } - public static String getDistributedCacheDir(String user) { + public static String getPrivateDistributedCacheDir(String user) { return getUserDir(user) + Path.SEPARATOR + TaskTracker.DISTCACHEDIR; } + + public static String getPublicDistributedCacheDir() { + return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.DISTCACHEDIR; + } public static String getJobCacheSubdir(String user) { return getUserDir(user) + Path.SEPARATOR + TaskTracker.JOBCACHE; @@ -449,11 +474,16 @@ return getLocalJobDir(user, jobid) + Path.SEPARATOR + MRConstants.WORKDIR; } - static String getLocalSplitFile(String user, String jobid, String taskid) { + static String getLocalSplitMetaFile(String user, String jobid, String taskid){ return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR - + TaskTracker.LOCAL_SPLIT_FILE; + + TaskTracker.LOCAL_SPLIT_META_FILE; } + static String getLocalSplitFile(String user, String jobid, String taskid) { + return TaskTracker.getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR + + TaskTracker.LOCAL_SPLIT_FILE; + } + static String getIntermediateOutputDir(String user, String jobid, String taskid) { return getLocalTaskDir(user, jobid, taskid) + Path.SEPARATOR @@ -475,10 +505,7 @@ static String getTaskWorkDir(String user, String jobid, String taskid, boolean isCleanupAttempt) { - String dir = getLocalJobDir(user, jobid) + Path.SEPARATOR + taskid; - if (isCleanupAttempt) { - dir = dir + TASK_CLEANUP_SUFFIX; - } + String dir = getLocalTaskDir(user, jobid, taskid, isCleanupAttempt); return dir + Path.SEPARATOR + MRConstants.WORKDIR; } @@ -529,9 +556,11 @@ fConf.get(TT_DNS_NAMESERVER,"default")); } - //check local disk + // Check local disk, start async disk service, and clean up all + // local directories. checkLocalDirs(this.fConf.getLocalDirs()); - fConf.deleteLocalFiles(SUBDIR); + asyncDiskService = new MRAsyncDiskService(fConf); + asyncDiskService.cleanupAllVolumes(); // Clear out state tables this.tasks.clear(); @@ -597,12 +626,19 @@ this.taskTrackerName = "tracker_" + localHostname + ":" + taskReportAddress; LOG.info("Starting tracker " + taskTrackerName); - // Initialize DistributedCache and - // clear out temporary files that might be lying around + Class taskControllerClass = fConf.getClass( + TT_TASK_CONTROLLER, DefaultTaskController.class, TaskController.class); + taskController = (TaskController) ReflectionUtils.newInstance( + taskControllerClass, fConf); + + + // setup and create jobcache directory with appropriate permissions + taskController.setup(); + + // Initialize DistributedCache this.distributedCacheManager = - new TrackerDistributedCacheManager(this.fConf); - this.distributedCacheManager.purgeCache(); - cleanupStorage(); + new TrackerDistributedCacheManager(this.fConf, taskController, + asyncDiskService); //mark as just started; this is used in heartbeats this.justStarted = true; @@ -625,6 +661,12 @@ taskTrackerName); mapEventsFetcher.start(); + Class clazz = + fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN, + null, ResourceCalculatorPlugin.class); + resourceCalculatorPlugin = ResourceCalculatorPlugin + .getResourceCalculatorPlugin(clazz, fConf); + LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculatorPlugin); initializeMemoryManagement(); this.indexCache = new IndexCache(this.fConf); @@ -633,15 +675,6 @@ reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots); mapLauncher.start(); reduceLauncher.start(); - Class taskControllerClass - = fConf.getClass(TT_TASK_CONTROLLER, - DefaultTaskController.class, - TaskController.class); - taskController = (TaskController)ReflectionUtils.newInstance( - taskControllerClass, fConf); - - //setup and create jobcache directory with appropriate permissions - taskController.setup(); // create a localizer instance setLocalizer(new Localizer(localFs, fConf.getLocalDirs(), taskController)); @@ -667,10 +700,14 @@ t, TaskTrackerInstrumentation.class); } - /** - * Removes all contents of temporary storage. Called upon + /** + * Removes all contents of temporary storage. Called upon * startup, to remove any leftovers from previous run. + * + * Use MRAsyncDiskService.moveAndDeleteAllVolumes instead. + * @see org.apache.hadoop.mapreduce.util.MRAsyncDiskService#cleanupAllVolumes() */ + @Deprecated public void cleanupStorage() throws IOException { if (fConf != null) { fConf.deleteLocalFiles(); @@ -880,18 +917,22 @@ rjob.jobConf = localJobConf; rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) || localJobConf.getKeepFailedTaskFiles()); - FSDataInputStream in = localFs.open(new Path( - rjob.jobConf.get(JobContext.JOB_TOKEN_FILE))); - JobTokens jt = new JobTokens(); - jt.readFields(in); - rjob.jobTokens = jt; // store JobToken object per job - + TokenStorage ts = TokenCache.loadTokens(rjob.jobConf); + Token jt = (Token)ts.getJobToken(); + getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt); rjob.localized = true; } } launchTaskForJob(tip, new JobConf(rjob.jobConf)); } + private void setUgi(String user, Configuration conf) { + //The dummy-group used here will not be required once we have UGI + //object creation with just the user name. + conf.set(UnixUserGroupInformation.UGI_PROPERTY_NAME, + user+","+UnixUserGroupInformation.DEFAULT_GROUP); + } + /** * Localize the job on this tasktracker. Specifically *
    @@ -934,6 +975,7 @@ } System.setProperty(JOB_LOCAL_DIR, workDir.toUri().getPath()); localJobConf.set(JOB_LOCAL_DIR, workDir.toUri().getPath()); + setUgi(userName, localJobConf); // Download the job.jar for this job from the system FS localizeJobJarFile(userName, jobId, localFs, localJobConf); @@ -952,12 +994,17 @@ */ private Path localizeJobConfFile(Path jobFile, String user, JobID jobId) throws IOException { - // Get sizes of JobFile and JarFile + JobConf conf = new JobConf(getJobConf()); + setUgi(user, conf); + + FileSystem userFs = jobFile.getFileSystem(conf); + // Get sizes of JobFile // sizes are -1 if they are not present. FileStatus status = null; long jobFileSize = -1; try { - status = systemFS.getFileStatus(jobFile); + + status = userFs.getFileStatus(jobFile); jobFileSize = status.getLen(); } catch(FileNotFoundException fe) { jobFileSize = -1; @@ -968,7 +1015,7 @@ jobFileSize, fConf); // Download job.xml - systemFS.copyToLocalFile(jobFile, localJobFile); + userFs.copyToLocalFile(jobFile, localJobFile); return localJobFile; } @@ -990,8 +1037,9 @@ long jarFileSize = -1; if (jarFile != null) { Path jarFilePath = new Path(jarFile); + FileSystem fs = jarFilePath.getFileSystem(localJobConf); try { - status = systemFS.getFileStatus(jarFilePath); + status = fs.getFileStatus(jarFilePath); jarFileSize = status.getLen(); } catch (FileNotFoundException fe) { jarFileSize = -1; @@ -1003,14 +1051,15 @@ getJobJarFile(user, jobId.toString()), 5 * jarFileSize, fConf); // Download job.jar - systemFS.copyToLocalFile(jarFilePath, localJarFile); + fs.copyToLocalFile(jarFilePath, localJarFile); localJobConf.setJar(localJarFile.toString()); - // Also un-jar the job.jar files. We un-jar it so that classes inside - // sub-directories, for e.g., lib/, classes/ are available on class-path - RunJar.unJar(new File(localJarFile.toString()), new File(localJarFile - .getParent().toString())); + // Un-jar the parts of the job.jar that need to be added to the classpath + RunJar.unJar( + new File(localJarFile.toString()), + new File(localJarFile.getParent().toString()), + localJobConf.getJarUnpackPattern()); } } @@ -1081,9 +1130,23 @@ this.running = false; - // Clear local storage - cleanupStorage(); - + if (asyncDiskService != null) { + // Clear local storage + asyncDiskService.cleanupAllVolumes(); + + // Shutdown all async deletion threads with up to 10 seconds of delay + asyncDiskService.shutdown(); + try { + if (!asyncDiskService.awaitTermination(10000)) { + asyncDiskService.shutdownNow(); + asyncDiskService = null; + } + } catch (InterruptedException e) { + asyncDiskService.shutdownNow(); + asyncDiskService = null; + } + } + // Shutdown the fetcher thread if (mapEventsFetcher != null) { mapEventsFetcher.interrupt(); @@ -1229,6 +1292,16 @@ directoryCleanupThread = new CleanupQueue(); } + + // only used by tests + void setCleanupThread(CleanupQueue c) { + directoryCleanupThread = c; + } + + CleanupQueue getCleanupThread() { + return directoryCleanupThread; + } + /** * Tell the cleanup threads that they should end themselves */ @@ -1429,7 +1502,7 @@ * @return false if the tracker was unknown * @throws IOException */ - private HeartbeatResponse transmitHeartBeat(long now) throws IOException { + HeartbeatResponse transmitHeartBeat(long now) throws IOException { // Send Counters in the status once every COUNTER_UPDATE_INTERVAL boolean sendCounters; if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) { @@ -1478,6 +1551,12 @@ long freeDiskSpace = getFreeSpace(); long totVmem = getTotalVirtualMemoryOnTT(); long totPmem = getTotalPhysicalMemoryOnTT(); + long availableVmem = getAvailableVirtualMemoryOnTT(); + long availablePmem = getAvailablePhysicalMemoryOnTT(); + long cumuCpuTime = getCumulativeCpuTimeOnTT(); + long cpuFreq = getCpuFrequencyOnTT(); + int numCpu = getNumProcessorsOnTT(); + float cpuUsage = getCpuUsageOnTT(); status.getResourceStatus().setAvailableSpace(freeDiskSpace); status.getResourceStatus().setTotalVirtualMemory(totVmem); @@ -1486,6 +1565,12 @@ mapSlotMemorySizeOnTT); status.getResourceStatus().setReduceSlotMemorySizeOnTT( reduceSlotSizeMemoryOnTT); + status.getResourceStatus().setAvailableVirtualMemory(availableVmem); + status.getResourceStatus().setAvailablePhysicalMemory(availablePmem); + status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime); + status.getResourceStatus().setCpuFrequency(cpuFreq); + status.getResourceStatus().setNumProcessors(numCpu); + status.getResourceStatus().setCpuUsage(cpuUsage); } //add node health information @@ -1562,6 +1647,80 @@ return totalPhysicalMemoryOnTT; } + /** + * Return the free virtual memory available on this TaskTracker. + * @return total size of free virtual memory. + */ + long getAvailableVirtualMemoryOnTT() { + long availableVirtualMemoryOnTT = TaskTrackerStatus.UNAVAILABLE; + if (resourceCalculatorPlugin != null) { + availableVirtualMemoryOnTT = + resourceCalculatorPlugin.getAvailableVirtualMemorySize(); + } + return availableVirtualMemoryOnTT; + } + + /** + * Return the free physical memory available on this TaskTracker. + * @return total size of free physical memory in bytes + */ + long getAvailablePhysicalMemoryOnTT() { + long availablePhysicalMemoryOnTT = TaskTrackerStatus.UNAVAILABLE; + if (resourceCalculatorPlugin != null) { + availablePhysicalMemoryOnTT = + resourceCalculatorPlugin.getAvailablePhysicalMemorySize(); + } + return availablePhysicalMemoryOnTT; + } + + /** + * Return the cumulative CPU used time on this TaskTracker since system is on + * @return cumulative CPU used time in millisecond + */ + long getCumulativeCpuTimeOnTT() { + long cumulativeCpuTime = TaskTrackerStatus.UNAVAILABLE; + if (resourceCalculatorPlugin != null) { + cumulativeCpuTime = resourceCalculatorPlugin.getCumulativeCpuTime(); + } + return cumulativeCpuTime; + } + + /** + * Return the number of Processors on this TaskTracker + * @return number of processors + */ + int getNumProcessorsOnTT() { + int numProcessors = TaskTrackerStatus.UNAVAILABLE; + if (resourceCalculatorPlugin != null) { + numProcessors = resourceCalculatorPlugin.getNumProcessors(); + } + return numProcessors; + } + + /** + * Return the CPU frequency of this TaskTracker + * @return CPU frequency in kHz + */ + long getCpuFrequencyOnTT() { + long cpuFrequency = TaskTrackerStatus.UNAVAILABLE; + if (resourceCalculatorPlugin != null) { + cpuFrequency = resourceCalculatorPlugin.getCpuFrequency(); + } + return cpuFrequency; + } + + /** + * Return the CPU usage in % of this TaskTracker + * @return CPU usage in % + */ + float getCpuUsageOnTT() { + float cpuUsage = TaskTrackerStatus.UNAVAILABLE; + if (resourceCalculatorPlugin != null) { + cpuUsage = resourceCalculatorPlugin.getCpuUsage(); + } + return cpuUsage; + } + long getTotalMemoryAllottedForTasksOnTT() { return totalMemoryAllottedForTasks; } @@ -1613,6 +1772,7 @@ ReflectionUtils.logThreadInfo(LOG, "lost task", 30); tip.reportDiagnosticInfo(msg); myInstrumentation.timedoutTask(tip.getTask().getTaskID()); + dumpTaskStack(tip); purgeTask(tip, true); } } @@ -1620,6 +1780,60 @@ } /** + * Builds list of PathDeletionContext objects for the given paths + */ + private static PathDeletionContext[] buildPathDeletionContexts(FileSystem fs, + Path[] paths) { + int i = 0; + PathDeletionContext[] contexts = new PathDeletionContext[paths.length]; + + for (Path p : paths) { + contexts[i++] = new PathDeletionContext(fs, p.toUri().getPath()); + } + return contexts; + } + + /** + * Builds list of TaskControllerPathDeletionContext objects for a task + * @param fs : FileSystem in which the dirs to be deleted + * @param paths : mapred-local-dirs + * @param task : the task whose taskDir or taskWorkDir is going to be deleted + * @param isWorkDir : the dir to be deleted is workDir or taskDir + * @param taskController : the task-controller to be used for deletion of + * taskDir or taskWorkDir + */ + static PathDeletionContext[] buildTaskControllerPathDeletionContexts( + FileSystem fs, Path[] paths, Task task, boolean isWorkDir, + TaskController taskController) + throws IOException { + int i = 0; + PathDeletionContext[] contexts = + new TaskControllerPathDeletionContext[paths.length]; + + for (Path p : paths) { + contexts[i++] = new TaskControllerPathDeletionContext(fs, p, task, + isWorkDir, taskController); + } + return contexts; + } + + /** + * Send a signal to a stuck task commanding it to dump stack traces + * to stderr before we kill it with purgeTask(). + * + * @param tip {@link TaskInProgress} to dump stack traces. + */ + private void dumpTaskStack(TaskInProgress tip) { + TaskRunner runner = tip.getTaskRunner(); + if (null == runner) { + return; // tip is already abandoned. + } + + JvmManager jvmMgr = runner.getJvmManager(); + jvmMgr.dumpStack(runner); + } + + /** * The task tracker is done with this job, so we need to clean up. * @param action The action with the job * @throws IOException @@ -1657,6 +1871,7 @@ synchronized(runningJobs) { runningJobs.remove(jobId); } + getJobTokenSecretManager().removeTokenForJob(jobId.toString()); } /** @@ -1667,8 +1882,9 @@ */ void removeJobFiles(String user, String jobId) throws IOException { - directoryCleanupThread.addToQueue(localFs, getLocalFiles(fConf, - getLocalJobDir(user, jobId))); + PathDeletionContext[] contexts = buildPathDeletionContexts(localFs, + getLocalFiles(fConf, getLocalJobDir(user, jobId))); + directoryCleanupThread.addToQueue(contexts); } /** @@ -2739,29 +2955,33 @@ runner.close(); } - String localTaskDir = - getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId - .toString(), task.isTaskCleanupTask()); if (localJobConf.getNumTasksToExecutePerJvm() == 1) { // No jvm reuse, remove everything - directoryCleanupThread.addToQueue(localFs, getLocalFiles( - defaultJobConf, localTaskDir)); + PathDeletionContext[] contexts = + buildTaskControllerPathDeletionContexts(localFs, + getLocalFiles(fConf, ""), task, false/* not workDir */, + taskController); + directoryCleanupThread.addToQueue(contexts); } else { // Jvm reuse. We don't delete the workdir since some other task // (running in the same JVM) might be using the dir. The JVM // running the tasks would clean the workdir per a task in the // task process itself. - directoryCleanupThread.addToQueue(localFs, getLocalFiles( - defaultJobConf, localTaskDir + Path.SEPARATOR - + TaskTracker.JOBFILE)); + String localTaskDir = + getLocalTaskDir(task.getUser(), task.getJobID().toString(), taskId + .toString(), task.isTaskCleanupTask()); + PathDeletionContext[] contexts = buildPathDeletionContexts( + localFs, getLocalFiles(defaultJobConf, localTaskDir + + Path.SEPARATOR + TaskTracker.JOBFILE)); + directoryCleanupThread.addToQueue(contexts); } } else { if (localJobConf.getNumTasksToExecutePerJvm() == 1) { - String taskWorkDir = - getTaskWorkDir(task.getUser(), task.getJobID().toString(), - taskId.toString(), task.isTaskCleanupTask()); - directoryCleanupThread.addToQueue(localFs, getLocalFiles( - defaultJobConf, taskWorkDir)); + PathDeletionContext[] contexts = + buildTaskControllerPathDeletionContexts(localFs, + getLocalFiles(fConf, ""), task, true /* workDir */, + taskController); + directoryCleanupThread.addToQueue(contexts); } } } @@ -3001,7 +3221,6 @@ boolean localized; boolean keepJobFiles; FetchStatus f; - JobTokens jobTokens; RunningJob(JobID jobid) { this.jobid = jobid; localized = false; @@ -3379,14 +3598,8 @@ private void verifyRequest(HttpServletRequest request, HttpServletResponse response, TaskTracker tracker, String jobId) throws IOException { - JobTokens jt = null; - synchronized (tracker.runningJobs) { - RunningJob rjob = tracker.runningJobs.get(JobID.forName(jobId)); - if (rjob == null) { - throw new IOException("Unknown job " + jobId + "!!"); - } - jt = rjob.jobTokens; - } + SecretKey tokenSecret = tracker.getJobTokenSecretManager() + .retrieveTokenSecret(jobId); // string to encrypt String enc_str = SecureShuffleUtils.buildMsgFrom(request); @@ -3400,17 +3613,16 @@ LOG.debug("verifying request. enc_str="+enc_str+"; hash=..."+ urlHashStr.substring(len-len/2, len-1)); // half of the hash for debug - SecureShuffleUtils ssutil = new SecureShuffleUtils(jt.getShuffleJobToken()); // verify - throws exception try { - ssutil.verifyReply(urlHashStr, enc_str); + SecureShuffleUtils.verifyReply(urlHashStr, enc_str, tokenSecret); } catch (IOException ioe) { response.sendError(HttpServletResponse.SC_UNAUTHORIZED); throw ioe; } // verification passed - encode the reply - String reply = ssutil.generateHash(urlHashStr.getBytes()); + String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(), tokenSecret); response.addHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply); len = reply.length(); @@ -3419,19 +3631,29 @@ } } - // get the full paths of the directory in all the local disks. - private Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{ + Path[] getLocalFiles(JobConf conf, String subdir) throws IOException{ String[] localDirs = conf.getLocalDirs(); Path[] paths = new Path[localDirs.length]; FileSystem localFs = FileSystem.getLocal(conf); + boolean subdirNeeded = (subdir != null) && (subdir.length() > 0); for (int i = 0; i < localDirs.length; i++) { - paths[i] = new Path(localDirs[i], subdir); + paths[i] = (subdirNeeded) ? new Path(localDirs[i], subdir) + : new Path(localDirs[i]); paths[i] = paths[i].makeQualified(localFs); } return paths; } + FileSystem getLocalFileSystem(){ + return localFs; + } + + // only used by tests + void setLocalFileSystem(FileSystem fs){ + localFs = fs; + } + int getMaxCurrentMapTasks() { return maxMapSlots; } @@ -3515,22 +3737,24 @@ JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)); } - Class clazz = - fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN, - null, MemoryCalculatorPlugin.class); - MemoryCalculatorPlugin memoryCalculatorPlugin = - MemoryCalculatorPlugin - .getMemoryCalculatorPlugin(clazz, fConf); - LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin); - - if (memoryCalculatorPlugin != null) { - totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize(); + // Use TT_MEMORY_CALCULATOR_PLUGIN if it is configured. + Class clazz = + fConf.getClass(TT_MEMORY_CALCULATOR_PLUGIN, + null, MemoryCalculatorPlugin.class); + MemoryCalculatorPlugin memoryCalculatorPlugin = (clazz == null ? + null : MemoryCalculatorPlugin.getMemoryCalculatorPlugin(clazz, fConf)); + if (memoryCalculatorPlugin != null || resourceCalculatorPlugin != null) { + totalVirtualMemoryOnTT = (memoryCalculatorPlugin == null ? + resourceCalculatorPlugin.getVirtualMemorySize() : + memoryCalculatorPlugin.getVirtualMemorySize()); if (totalVirtualMemoryOnTT <= 0) { LOG.warn("TaskTracker's totalVmem could not be calculated. " + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT); totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT; } - totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize(); + totalPhysicalMemoryOnTT = (memoryCalculatorPlugin == null ? + resourceCalculatorPlugin.getPhysicalMemorySize() : + memoryCalculatorPlugin.getPhysicalMemorySize()); if (totalPhysicalMemoryOnTT <= 0) { LOG.warn("TaskTracker's totalPmem could not be calculated. " + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT); @@ -3662,7 +3886,7 @@ throws IOException { // check if the tokenJob file is there.. Path skPath = new Path(systemDirectory, - jobId.toString()+"/"+JobTokens.JOB_TOKEN_FILENAME); + jobId.toString()+"/"+SecureShuffleUtils.JOB_TOKEN_FILENAME); FileStatus status = null; long jobTokenSize = -1; Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Jan 26 14:02:53 2010 @@ -55,6 +55,7 @@ private int maxReduceTasks; private TaskTrackerHealthStatus healthStatus; + public static final int UNAVAILABLE = -1; /** * Class representing a collection of resources on this tasktracker. */ @@ -65,7 +66,14 @@ private long mapSlotMemorySizeOnTT; private long reduceSlotMemorySizeOnTT; private long availableSpace; - + + private long availableVirtualMemory = UNAVAILABLE; // in byte + private long availablePhysicalMemory = UNAVAILABLE; // in byte + private int numProcessors = UNAVAILABLE; + private long cumulativeCpuTime = UNAVAILABLE; // in millisecond + private long cpuFrequency = UNAVAILABLE; // in kHz + private float cpuUsage = UNAVAILABLE; // in % + ResourceStatus() { totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT; totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT; @@ -172,21 +180,160 @@ long getAvailableSpace() { return availableSpace; } + + /** + * Set the amount of available virtual memory on the tasktracker. + * If the input is not a valid number, it will be set to UNAVAILABLE + * + * @param vmem amount of available virtual memory on the tasktracker + * in bytes. + */ + void setAvailableVirtualMemory(long availableMem) { + availableVirtualMemory = availableMem > 0 ? + availableMem : UNAVAILABLE; + } + + /** + * Get the amount of available virtual memory on the tasktracker. + * Will return UNAVAILABLE if it cannot be obtained + * + * @return the amount of available virtual memory on the tasktracker + * in bytes. + */ + long getAvailabelVirtualMemory() { + return availableVirtualMemory; + } + + /** + * Set the amount of available physical memory on the tasktracker. + * If the input is not a valid number, it will be set to UNAVAILABLE + * + * @param availableRAM amount of available physical memory on the + * tasktracker in bytes. + */ + void setAvailablePhysicalMemory(long availableRAM) { + availablePhysicalMemory = availableRAM > 0 ? + availableRAM : UNAVAILABLE; + } + + /** + * Get the amount of available physical memory on the tasktracker. + * Will return UNAVAILABLE if it cannot be obtained + * + * @return amount of available physical memory on the tasktracker in bytes. + */ + long getAvailablePhysicalMemory() { + return availablePhysicalMemory; + } + + /** + * Set the CPU frequency of this TaskTracker + * If the input is not a valid number, it will be set to UNAVAILABLE + * + * @param cpuFrequency CPU frequency in kHz + */ + public void setCpuFrequency(long cpuFrequency) { + this.cpuFrequency = cpuFrequency > 0 ? + cpuFrequency : UNAVAILABLE; + } + + /** + * Get the CPU frequency of this TaskTracker + * Will return UNAVAILABLE if it cannot be obtained + * + * @return CPU frequency in kHz + */ + public long getCpuFrequency() { + return cpuFrequency; + } + + /** + * Set the number of processors on this TaskTracker + * If the input is not a valid number, it will be set to UNAVAILABLE + * + * @param numProcessors number of processors + */ + public void setNumProcessors(int numProcessors) { + this.numProcessors = numProcessors > 0 ? + numProcessors : UNAVAILABLE; + } + + /** + * Get the number of processors on this TaskTracker + * Will return UNAVAILABLE if it cannot be obtained + * + * @return number of processors + */ + public int getNumProcessors() { + return numProcessors; + } + + /** + * Set the cumulative CPU time on this TaskTracker since it is up + * It can be set to UNAVAILABLE if it is currently unavailable. + * + * @param cumulativeCpuTime Used CPU time in millisecond + */ + public void setCumulativeCpuTime(long cumulativeCpuTime) { + this.cumulativeCpuTime = cumulativeCpuTime > 0 ? + cumulativeCpuTime : UNAVAILABLE; + } + + /** + * Get the cumulative CPU time on this TaskTracker since it is up + * Will return UNAVAILABLE if it cannot be obtained + * + * @return used CPU time in milliseconds + */ + public long getCumulativeCpuTime() { + return cumulativeCpuTime; + } + + /** + * Set the CPU usage on this TaskTracker + * + * @param cpuUsage CPU usage in % + */ + public void setCpuUsage(float cpuUsage) { + this.cpuUsage = cpuUsage; + } + + /** + * Get the CPU usage on this TaskTracker + * Will return UNAVAILABLE if it cannot be obtained + * + * @return CPU usage in % + */ + public float getCpuUsage() { + return cpuUsage; + } public void write(DataOutput out) throws IOException { WritableUtils.writeVLong(out, totalVirtualMemory); WritableUtils.writeVLong(out, totalPhysicalMemory); + WritableUtils.writeVLong(out, availableVirtualMemory); + WritableUtils.writeVLong(out, availablePhysicalMemory); WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT); WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT); WritableUtils.writeVLong(out, availableSpace); + WritableUtils.writeVLong(out, cumulativeCpuTime); + WritableUtils.writeVLong(out, cpuFrequency); + WritableUtils.writeVInt(out, numProcessors); + out.writeFloat(getCpuUsage()); } public void readFields(DataInput in) throws IOException { totalVirtualMemory = WritableUtils.readVLong(in); totalPhysicalMemory = WritableUtils.readVLong(in); + availableVirtualMemory = WritableUtils.readVLong(in); + availablePhysicalMemory = WritableUtils.readVLong(in); mapSlotMemorySizeOnTT = WritableUtils.readVLong(in); reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in); availableSpace = WritableUtils.readVLong(in); + cumulativeCpuTime = WritableUtils.readVLong(in); + cpuFrequency = WritableUtils.readVLong(in); + numProcessors = WritableUtils.readVInt(in); + setCpuUsage(in.readFloat()); } } Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/Chain.java Tue Jan 26 14:02:53 2010 @@ -18,8 +18,6 @@ package org.apache.hadoop.mapred.lib; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Stringifier; -import org.apache.hadoop.io.DefaultStringifier; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.Serialization; import org.apache.hadoop.io.serializer.SerializationFactory; @@ -32,45 +30,19 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; /** * The Chain class provides all the common functionality for the * {@link ChainMapper} and the {@link ChainReducer} classes. + * @deprecated Use {@link org.apache.hadoop.mapreduce.lib.chain.Chain} instead */ -class Chain { - private static final String CHAIN_MAPPER = "chain.mapper"; - private static final String CHAIN_REDUCER = "chain.reducer"; - - private static final String CHAIN_MAPPER_SIZE = ".size"; - private static final String CHAIN_MAPPER_CLASS = ".mapper.class."; - private static final String CHAIN_MAPPER_CONFIG = ".mapper.config."; - private static final String CHAIN_REDUCER_CLASS = ".reducer.class"; - private static final String CHAIN_REDUCER_CONFIG = ".reducer.config"; +@Deprecated +class Chain extends org.apache.hadoop.mapreduce.lib.chain.Chain { private static final String MAPPER_BY_VALUE = "chain.mapper.byValue"; private static final String REDUCER_BY_VALUE = "chain.reducer.byValue"; - private static final String MAPPER_INPUT_KEY_CLASS = - "chain.mapper.input.key.class"; - private static final String MAPPER_INPUT_VALUE_CLASS = - "chain.mapper.input.value.class"; - private static final String MAPPER_OUTPUT_KEY_CLASS = - "chain.mapper.output.key.class"; - private static final String MAPPER_OUTPUT_VALUE_CLASS = - "chain.mapper.output.value.class"; - private static final String REDUCER_INPUT_KEY_CLASS = - "chain.reducer.input.key.class"; - private static final String REDUCER_INPUT_VALUE_CLASS = - "chain.reducer.input.value.class"; - private static final String REDUCER_OUTPUT_KEY_CLASS = - "chain.reducer.output.key.class"; - private static final String REDUCER_OUTPUT_VALUE_CLASS = - "chain.reducer.output.value.class"; - - private boolean isMap; - private JobConf chainJobConf; private List mappers = new ArrayList(); @@ -92,51 +64,7 @@ * Reducer. */ Chain(boolean isMap) { - this.isMap = isMap; - } - - /** - * Returns the prefix to use for the configuration of the chain depending - * if it is for a Mapper or a Reducer. - * - * @param isMap TRUE for Mapper, FALSE for Reducer. - * @return the prefix to use. - */ - private static String getPrefix(boolean isMap) { - return (isMap) ? CHAIN_MAPPER : CHAIN_REDUCER; - } - - /** - * Creates a {@link JobConf} for one of the Maps or Reduce in the chain. - *

    - * It creates a new JobConf using the chain job's JobConf as base and adds to - * it the configuration properties for the chain element. The keys of the - * chain element jobConf have precedence over the given JobConf. - * - * @param jobConf the chain job's JobConf. - * @param confKey the key for chain element configuration serialized in the - * chain job's JobConf. - * @return a new JobConf aggregating the chain job's JobConf with the chain - * element configuration properties. - */ - private static JobConf getChainElementConf(JobConf jobConf, String confKey) { - JobConf conf; - try { - Stringifier stringifier = - new DefaultStringifier(jobConf, JobConf.class); - conf = stringifier.fromString(jobConf.get(confKey, null)); - } catch (IOException ioex) { - throw new RuntimeException(ioex); - } - // we have to do this because the Writable desearialization clears all - // values set in the conf making not possible do do a new JobConf(jobConf) - // in the creation of the conf above - jobConf = new JobConf(jobConf); - - for(Map.Entry entry : conf) { - jobConf.set(entry.getKey(), entry.getValue()); - } - return jobConf; + super(isMap); } /** @@ -169,82 +97,27 @@ String prefix = getPrefix(isMap); // if a reducer chain check the Reducer has been already set - if (!isMap) { - if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, - Reducer.class) == null) { - throw new IllegalStateException( - "A Mapper can be added to the chain only after the Reducer has " + - "been set"); - } - } - int index = jobConf.getInt(prefix + CHAIN_MAPPER_SIZE, 0); + checkReducerAlreadySet(isMap, jobConf, prefix, true); + + // set the mapper class + int index = getIndex(jobConf, prefix); jobConf.setClass(prefix + CHAIN_MAPPER_CLASS + index, klass, Mapper.class); - - // if it is a reducer chain and the first Mapper is being added check the - // key and value input classes of the mapper match those of the reducer - // output. - if (!isMap && index == 0) { - JobConf reducerConf = - getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG); - if (! inputKeyClass.isAssignableFrom( - reducerConf.getClass(REDUCER_OUTPUT_KEY_CLASS, null))) { - throw new IllegalArgumentException("The Reducer output key class does" + - " not match the Mapper input key class"); - } - if (! inputValueClass.isAssignableFrom( - reducerConf.getClass(REDUCER_OUTPUT_VALUE_CLASS, null))) { - throw new IllegalArgumentException("The Reducer output value class" + - " does not match the Mapper input value class"); - } - } else if (index > 0) { - // check the that the new Mapper in the chain key and value input classes - // match those of the previous Mapper output. - JobConf previousMapperConf = - getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + - (index - 1)); - if (! inputKeyClass.isAssignableFrom( - previousMapperConf.getClass(MAPPER_OUTPUT_KEY_CLASS, null))) { - throw new IllegalArgumentException("The Mapper output key class does" + - " not match the previous Mapper input key class"); - } - if (! inputValueClass.isAssignableFrom( - previousMapperConf.getClass(MAPPER_OUTPUT_VALUE_CLASS, null))) { - throw new IllegalArgumentException("The Mapper output value class" + - " does not match the previous Mapper input value class"); - } - } - + + validateKeyValueTypes(isMap, jobConf, inputKeyClass, inputValueClass, + outputKeyClass, outputValueClass, index, prefix); + // if the Mapper does not have a private JobConf create an empty one if (mapperConf == null) { - // using a JobConf without defaults to make it lightweight. - // still the chain JobConf may have all defaults and this conf is - // overlapped to the chain JobConf one. + // using a JobConf without defaults to make it lightweight. + // still the chain JobConf may have all defaults and this conf is + // overlapped to the chain JobConf one. mapperConf = new JobConf(true); } - - // store in the private mapper conf the input/output classes of the mapper - // and if it works by value or by reference + // store in the private mapper conf if it works by value or by reference mapperConf.setBoolean(MAPPER_BY_VALUE, byValue); - mapperConf.setClass(MAPPER_INPUT_KEY_CLASS, inputKeyClass, Object.class); - mapperConf.setClass(MAPPER_INPUT_VALUE_CLASS, inputValueClass, - Object.class); - mapperConf.setClass(MAPPER_OUTPUT_KEY_CLASS, outputKeyClass, Object.class); - mapperConf.setClass(MAPPER_OUTPUT_VALUE_CLASS, outputValueClass, - Object.class); - - // serialize the private mapper jobconf in the chain jobconf. - Stringifier stringifier = - new DefaultStringifier(jobConf, JobConf.class); - try { - jobConf.set(prefix + CHAIN_MAPPER_CONFIG + index, - stringifier.toString(new JobConf(mapperConf))); - } - catch (IOException ioEx) { - throw new RuntimeException(ioEx); - } - - // increment the chain counter - jobConf.setInt(prefix + CHAIN_MAPPER_SIZE, index + 1); + + setMapperConf(isMap, jobConf, inputKeyClass, inputValueClass, + outputKeyClass, outputValueClass, mapperConf, index, prefix); } /** @@ -273,13 +146,10 @@ Class outputValueClass, boolean byValue, JobConf reducerConf) { String prefix = getPrefix(false); - - if (jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null) != null) { - throw new IllegalStateException("Reducer has been already set"); - } + checkReducerAlreadySet(false, jobConf, prefix, false); jobConf.setClass(prefix + CHAIN_REDUCER_CLASS, klass, Reducer.class); - + // if the Reducer does not have a private JobConf create an empty one if (reducerConf == null) { // using a JobConf without defaults to make it lightweight. @@ -291,24 +161,9 @@ // store in the private reducer conf the input/output classes of the reducer // and if it works by value or by reference reducerConf.setBoolean(MAPPER_BY_VALUE, byValue); - reducerConf.setClass(REDUCER_INPUT_KEY_CLASS, inputKeyClass, Object.class); - reducerConf.setClass(REDUCER_INPUT_VALUE_CLASS, inputValueClass, - Object.class); - reducerConf.setClass(REDUCER_OUTPUT_KEY_CLASS, outputKeyClass, - Object.class); - reducerConf.setClass(REDUCER_OUTPUT_VALUE_CLASS, outputValueClass, - Object.class); - - // serialize the private mapper jobconf in the chain jobconf. - Stringifier stringifier = - new DefaultStringifier(jobConf, JobConf.class); - try { - jobConf.set(prefix + CHAIN_REDUCER_CONFIG, - stringifier.toString(new JobConf(reducerConf))); - } - catch (IOException ioEx) { - throw new RuntimeException(ioEx); - } + + setReducerConf(jobConf, inputKeyClass, inputValueClass, outputKeyClass, + outputValueClass, reducerConf, prefix); } /** @@ -325,8 +180,8 @@ for (int i = 0; i < index; i++) { Class klass = jobConf.getClass(prefix + CHAIN_MAPPER_CLASS + i, null, Mapper.class); - JobConf mConf = - getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i); + JobConf mConf = new JobConf( + getChainElementConf(jobConf, prefix + CHAIN_MAPPER_CONFIG + i)); Mapper mapper = ReflectionUtils.newInstance(klass, mConf); mappers.add(mapper); @@ -343,8 +198,8 @@ Class klass = jobConf.getClass(prefix + CHAIN_REDUCER_CLASS, null, Reducer.class); if (klass != null) { - JobConf rConf = - getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG); + JobConf rConf = new JobConf( + getChainElementConf(jobConf, prefix + CHAIN_REDUCER_CONFIG)); reducer = ReflectionUtils.newInstance(klass, rConf); if (rConf.getBoolean(REDUCER_BY_VALUE, true)) { reducerKeySerialization = serializationFactory Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainMapper.java Tue Jan 26 14:02:53 2010 @@ -86,7 +86,10 @@ * RunningJob job = jc.submitJob(conf); * ... * + * @deprecated + * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainMapper} instead */ +@Deprecated public class ChainMapper implements Mapper { /** Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/ChainReducer.java Tue Jan 26 14:02:53 2010 @@ -86,7 +86,10 @@ * RunningJob job = jc.submitJob(conf); * ... * + * @deprecated + * Use {@link org.apache.hadoop.mapreduce.lib.chain.ChainReducer} instead */ +@Deprecated public class ChainReducer implements Reducer { /** Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java?rev=903227&r1=903226&r2=903227&view=diff ============================================================================== --- hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java (original) +++ hadoop/mapreduce/branches/MAPREDUCE-233/src/java/org/apache/hadoop/mapred/lib/FieldSelectionMapReduce.java Tue Jan 26 14:02:53 2010 @@ -46,7 +46,8 @@ * * The field separator is under attribute "mapreduce.fieldsel.data.field.separator" * - * The map output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec". + * The map output field list spec is under attribute + * "mapreduce.fieldsel.map.output.key.value.fields.spec". * The value is expected to be like "keyFieldsSpec:valueFieldsSpec" * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ... * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range @@ -57,7 +58,8 @@ * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys, * and use fields 6,5,1,2,3,7 and above for values. * - * The reduce output field list spec is under attribute "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec". + * The reduce output field list spec is under attribute + * "mapreduce.fieldsel.reduce.output.key.value.fields.spec". * * The reducer extracts output key/value pairs in a similar manner, except that * the key is never ignored. @@ -156,13 +158,14 @@ } public void configure(JobConf job) { - this.fieldSeparator = job.get("mapreduce.fieldsel.data.field.separator", "\t"); - this.mapOutputKeyValueSpec = job.get("mapreduce.fieldsel.mapreduce.fieldsel.map.output.key.value.fields.spec", - "0-:"); + this.fieldSeparator = job.get(FieldSelectionHelper.DATA_FIELD_SEPERATOR, + "\t"); + this.mapOutputKeyValueSpec = job.get( + FieldSelectionHelper.MAP_OUTPUT_KEY_VALUE_SPEC, "0-:"); this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals( job.getInputFormat().getClass().getCanonicalName()); this.reduceOutputKeyValueSpec = job.get( - "mapreduce.fieldsel.mapreduce.fieldsel.reduce.output.key.value.fields.spec", "0-:"); + FieldSelectionHelper.REDUCE_OUTPUT_KEY_VALUE_SPEC, "0-:"); parseOutputKeyValueSpec(); LOG.info(specToString()); }