Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 45780 invoked from network); 5 Sep 2008 10:50:31 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Sep 2008 10:50:31 -0000 Received: (qmail 63291 invoked by uid 500); 5 Sep 2008 10:50:29 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 63138 invoked by uid 500); 5 Sep 2008 10:50:29 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 63128 invoked by uid 99); 5 Sep 2008 10:50:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Sep 2008 03:50:29 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Sep 2008 10:49:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 1EED5238899E; Fri, 5 Sep 2008 03:50:09 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r692408 [2/2] - in /hadoop/core/trunk: ./ src/docs/src/documentation/content/xdocs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ src/webapps/job/ Date: Fri, 05 Sep 2008 10:50:06 -0000 To: core-commits@hadoop.apache.org From: ddas@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080905105009.1EED5238899E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Task.java Fri Sep 5 03:50:04 2008 @@ -21,7 +21,6 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.net.URI; import java.text.NumberFormat; import java.util.ArrayList; import java.util.Iterator; @@ -34,8 +33,6 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.hadoop.fs.ContentSummary; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -48,7 +45,6 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; -import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.IFile.Writer; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Progress; @@ -110,8 +106,8 @@ private String jobFile; // job configuration file private TaskAttemptID taskId; // unique, includes job id private int partition; // id within job - TaskStatus taskStatus; // current status of the task - private Path taskOutputPath; // task-specific output dir + TaskStatus taskStatus; // current status of the task + protected boolean cleanupJob = false; //failed ranges from previous attempts private SortedRanges failedRanges = new SortedRanges(); @@ -125,6 +121,10 @@ protected JobConf conf; protected MapOutputFile mapOutputFile = new MapOutputFile(); protected LocalDirAllocator lDirAlloc; + private final static int MAX_RETRIES = 10; + protected JobContext jobContext; + protected TaskAttemptContext taskContext; + private volatile boolean commitPending = false; //////////////////////////////////////////// // Constructors @@ -220,6 +220,13 @@ this.skipping = skipping; } + /** + * Sets whether the task is cleanup task + */ + public void setCleanupTask() { + cleanupJob = true; + } + //////////////////////////////////////////// // Writable methods //////////////////////////////////////////// @@ -228,48 +235,27 @@ Text.writeString(out, jobFile); taskId.write(out); out.writeInt(partition); - if (taskOutputPath != null) { - Text.writeString(out, taskOutputPath.toString()); - } else { - Text.writeString(out, ""); - } taskStatus.write(out); failedRanges.write(out); out.writeBoolean(skipping); + out.writeBoolean(cleanupJob); } public void readFields(DataInput in) throws IOException { jobFile = Text.readString(in); taskId = TaskAttemptID.read(in); partition = in.readInt(); - String outPath = Text.readString(in); - if (outPath.length() != 0) { - taskOutputPath = new Path(outPath); - } else { - taskOutputPath = null; - } taskStatus.readFields(in); this.mapOutputFile.setJobId(taskId.getJobID()); failedRanges.readFields(in); currentRecIndexIterator = failedRanges.skipRangeIterator(); currentRecStartIndex = currentRecIndexIterator.next(); skipping = in.readBoolean(); + cleanupJob = in.readBoolean(); } @Override public String toString() { return taskId.toString(); } - private Path getTaskOutputPath(JobConf conf) { - Path p = new Path(FileOutputFormat.getOutputPath(conf), - (MRConstants.TEMP_DIR_NAME + Path.SEPARATOR + "_" + taskId)); - try { - FileSystem fs = p.getFileSystem(conf); - return p.makeQualified(fs); - } catch (IOException ie) { - LOG.warn(StringUtils.stringifyException(ie)); - return p; - } - } - /** * Localize the given JobConf to be specific for this task. */ @@ -279,12 +265,18 @@ conf.setBoolean("mapred.task.is.map", isMapTask()); conf.setInt("mapred.task.partition", partition); conf.set("mapred.job.id", taskId.getJobID().toString()); - - // The task-specific output path - if (FileOutputFormat.getOutputPath(conf) != null) { - taskOutputPath = getTaskOutputPath(conf); - FileOutputFormat.setWorkOutputPath(conf, taskOutputPath); + Path outputPath = FileOutputFormat.getOutputPath(conf); + if (outputPath != null) { + OutputCommitter committer = conf.getOutputCommitter(); + if ((committer instanceof FileOutputCommitter)) { + TaskAttemptContext context = new TaskAttemptContext(conf, taskId); + FileOutputFormat.setWorkOutputPath(conf, + ((FileOutputCommitter)committer).getTempTaskOutputPath(context)); + } else { + FileOutputFormat.setWorkOutputPath(conf, outputPath); + } } + } /** Run this task as a part of the named job. This method is executed in the @@ -359,8 +351,17 @@ if (sendProgress) { // we need to send progress update updateCounters(); - taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), - counters); + if (commitPending) { + taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING, + taskProgress.get(), + taskProgress.toString(), + counters); + } else { + taskStatus.statusUpdate(TaskStatus.State.RUNNING, + taskProgress.get(), + taskProgress.toString(), + counters); + } taskFound = umbilical.statusUpdate(taskId, taskStatus); taskStatus.clearStatus(); } @@ -396,6 +397,13 @@ LOG.debug(getTaskID() + " Progress/ping thread started"); } + public void initialize(JobConf job, Reporter reporter) + throws IOException { + jobContext = new JobContext(job, reporter); + taskContext = new TaskAttemptContext(job, taskId, reporter); + OutputCommitter committer = conf.getOutputCommitter(); + committer.setupTask(taskContext); + } protected Reporter getReporter(final TaskUmbilicalProtocol umbilical) throws IOException @@ -543,54 +551,86 @@ } public void done(TaskUmbilicalProtocol umbilical) throws IOException { - int retries = 10; - boolean needProgress = true; + LOG.info("Task:" + taskId + " is done." + + " And is in the process of commiting"); updateCounters(); + + OutputCommitter outputCommitter = conf.getOutputCommitter(); + // check whether the commit is required. + boolean commitRequired = outputCommitter.needsTaskCommit(taskContext); + if (commitRequired) { + int retries = MAX_RETRIES; + taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING); + commitPending = true; + // say the task tracker that task is commit pending + while (true) { + try { + umbilical.commitPending(taskId, taskStatus); + break; + } catch (InterruptedException ie) { + // ignore + } catch (IOException ie) { + LOG.warn("Failure sending commit pending: " + + StringUtils.stringifyException(ie)); + if (--retries == 0) { + System.exit(67); + } + } + } + //wait for commit approval and commit + commit(umbilical, outputCommitter); + } taskDone.set(true); + sendLastUpdate(umbilical); + //signal the tasktracker that we are done + sendDone(umbilical); + } + + private void sendLastUpdate(TaskUmbilicalProtocol umbilical) + throws IOException { + //first wait for the COMMIT approval from the tasktracker + int retries = MAX_RETRIES; while (true) { try { - if (needProgress) { - // send a final status report - taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), + // send a final status report + if (commitPending) { + taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING, + taskProgress.get(), + taskProgress.toString(), + counters); + } else { + taskStatus.statusUpdate(TaskStatus.State.RUNNING, + taskProgress.get(), + taskProgress.toString(), counters); - try { - if (!umbilical.statusUpdate(getTaskID(), taskStatus)) { - LOG.warn("Parent died. Exiting "+taskId); - System.exit(66); - } - taskStatus.clearStatus(); - needProgress = false; - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); // interrupt ourself - } } - // Check whether there is any task output - boolean shouldBePromoted = false; + try { - if (taskOutputPath != null) { - // Get the file-system for the task output directory - FileSystem fs = taskOutputPath.getFileSystem(conf); - if (fs.exists(taskOutputPath)) { - // Get the summary for the folder - ContentSummary summary = fs.getContentSummary(taskOutputPath); - // Check if the directory contains data to be promoted - // i.e total-files + total-folders - 1(itself) - if (summary != null - && (summary.getFileCount() + summary.getDirectoryCount() - 1) - > 0) { - shouldBePromoted = true; - } - } else { - LOG.info(getTaskID() + ": No outputs to promote from " + - taskOutputPath); - } + if (!umbilical.statusUpdate(getTaskID(), taskStatus)) { + LOG.warn("Parent died. Exiting "+taskId); + System.exit(66); } - } catch (IOException ioe) { - // To be safe in case of an exception - shouldBePromoted = true; + taskStatus.clearStatus(); + return; + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // interrupt ourself } - umbilical.done(taskId, shouldBePromoted); - LOG.info("Task '" + getTaskID() + "' done."); + } catch (IOException ie) { + LOG.warn("Failure sending last status update: " + + StringUtils.stringifyException(ie)); + if (--retries == 0) { + throw ie; + } + } + } + } + + private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException { + int retries = MAX_RETRIES; + while (true) { + try { + umbilical.done(getTaskID()); + LOG.info("Task '" + taskId + "' done."); return; } catch (IOException ie) { LOG.warn("Failure signalling completion: " + @@ -601,15 +641,66 @@ } } } + + private void commit(TaskUmbilicalProtocol umbilical, + OutputCommitter committer) throws IOException { + int retries = MAX_RETRIES; + while (true) { + try { + while (!umbilical.canCommit(taskId)) { + try { + Thread.sleep(1000); + } catch(InterruptedException ie) { + //ignore + } + setProgressFlag(); + } + // task can Commit now + try { + LOG.info("Task " + taskId + " is allowed to commit now"); + committer.commitTask(taskContext); + return; + } catch (IOException iee) { + LOG.warn("Failure committing: " + + StringUtils.stringifyException(iee)); + discardOutput(taskContext, committer); + throw iee; + } + } catch (IOException ie) { + LOG.warn("Failure asking whether task can commit: " + + StringUtils.stringifyException(ie)); + if (--retries == 0) { + //if it couldn't commit a successfully then delete the output + discardOutput(taskContext, committer); + System.exit(68); + } + } + } + } + + private void discardOutput(TaskAttemptContext taskContext, + OutputCommitter committer) { + try { + committer.abortTask(taskContext); + } catch (IOException ioe) { + LOG.warn("Failure cleaning up: " + + StringUtils.stringifyException(ioe)); + } + } + + protected void runCleanup(TaskUmbilicalProtocol umbilical) + throws IOException { + // set phase for this task + setPhase(TaskStatus.Phase.CLEANUP); + getProgress().setStatus("cleanup"); + // do the cleanup + conf.getOutputCommitter().cleanupJob(jobContext); + done(umbilical); + } public void setConf(Configuration conf) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; - - if (taskId != null && taskOutputPath == null && - FileOutputFormat.getOutputPath(this.conf) != null) { - taskOutputPath = getTaskOutputPath(this.conf); - } } else { this.conf = new JobConf(conf); } @@ -633,68 +724,6 @@ } /** - * Save the task's output on successful completion. - * - * @throws IOException - */ - void saveTaskOutput() throws IOException { - - if (taskOutputPath != null) { - FileSystem fs = taskOutputPath.getFileSystem(conf); - if (fs.exists(taskOutputPath)) { - Path jobOutputPath = taskOutputPath.getParent().getParent(); - - // Move the task outputs to their final place - moveTaskOutputs(fs, jobOutputPath, taskOutputPath); - - // Delete the temporary task-specific output directory - if (!fs.delete(taskOutputPath, true)) { - LOG.info("Failed to delete the temporary output directory of task: " + - getTaskID() + " - " + taskOutputPath); - } - - LOG.info("Saved output of task '" + getTaskID() + "' to " + jobOutputPath); - } - } - } - - private Path getFinalPath(Path jobOutputDir, Path taskOutput) { - URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri()); - if (relativePath.getPath().length() > 0) { - return new Path(jobOutputDir, relativePath.getPath()); - } else { - return jobOutputDir; - } - } - - private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput) - throws IOException { - if (fs.isFile(taskOutput)) { - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput); - if (!fs.rename(taskOutput, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new IOException("Failed to delete earlier output of task: " + - getTaskID()); - } - if (!fs.rename(taskOutput, finalOutputPath)) { - throw new IOException("Failed to save output of task: " + - getTaskID()); - } - } - LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); - } else if(fs.isDirectory(taskOutput)) { - FileStatus[] paths = fs.listStatus(taskOutput); - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput); - fs.mkdirs(finalOutputPath); - if (paths != null) { - for (FileStatus path : paths) { - moveTaskOutputs(fs, jobOutputDir, path.getPath()); - } - } - } - } - - /** * OutputCollector for the combiner. */ protected static class CombineOutputCollector Added: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java?rev=692408&view=auto ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java (added) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskAttemptContext.java Fri Sep 5 03:50:04 2008 @@ -0,0 +1,56 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.util.Progressable; + +public class TaskAttemptContext extends JobContext { + + private JobConf conf; + private TaskAttemptID taskid; + + TaskAttemptContext(JobConf conf, TaskAttemptID taskid) { + this(conf, taskid, Reporter.NULL); + } + + TaskAttemptContext(JobConf conf, TaskAttemptID taskid, + Progressable progress) { + super(conf, progress); + this.conf = conf; + this.taskid = taskid; + } + + /** + * Get the taskAttemptID. + * + * @return TaskAttemptID + */ + public TaskAttemptID getTaskAttemptID() { + return taskid; + } + + /** + * Get the job Configuration. + * + * @return JobConf + */ + public JobConf getJobConf() { + return conf; + } + +} Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Sep 5 03:50:04 2008 @@ -79,6 +79,7 @@ private boolean killed = false; private volatile SortedRanges failedRanges = new SortedRanges(); private volatile boolean skipping = false; + private boolean cleanup = false; // The 'next' usable taskid of this tip int nextTaskId = 0; @@ -107,6 +108,9 @@ //list of tasks to kill, -> private TreeMap tasksToKill = new TreeMap(); + //task to commit, + private TaskAttemptID taskToCommit; + private Counters counters = new Counters(); @@ -164,6 +168,14 @@ return partition; } + public boolean isCleanupTask() { + return cleanup; + } + + public void setCleanupTask() { + cleanup = true; + } + public boolean isOnlyCommitPending() { for (TaskStatus t : taskStatuses.values()) { if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) { @@ -172,6 +184,14 @@ } return false; } + + public boolean isCommitPending(TaskAttemptID taskId) { + TaskStatus t = taskStatuses.get(taskId); + if (t == null) { + return false; + } + return t.getRunState() == TaskStatus.State.COMMIT_PENDING; + } /** * Initialization common to Map and Reduce @@ -324,6 +344,10 @@ !tasksReportedClosed.contains(taskid)) { tasksReportedClosed.add(taskid); close = true; + } else if (isCommitPending(taskid) && !shouldCommit(taskid) && + !tasksReportedClosed.contains(taskid)) { + tasksReportedClosed.add(taskid); + close = true; } else { close = tasksToKill.keySet().contains(taskid); } @@ -331,6 +355,21 @@ } /** + * Commit this task attempt for the tip. + * @param taskid + */ + public void doCommit(TaskAttemptID taskid) { + taskToCommit = taskid; + } + + /** + * Returns whether the task attempt should be committed or not + */ + public boolean shouldCommit(TaskAttemptID taskid) { + return taskToCommit.equals(taskid); + } + + /** * Creates a "status report" for this task. Includes the * task ID and overall status, plus reports for all the * component task-threads that have ever been started. @@ -401,7 +440,8 @@ // status update for the same taskid! This is a safety check, // and is addressed better at the TaskTracker to ensure this. // @see {@link TaskTracker.transmitHeartbeat()} - if ((newState != TaskStatus.State.RUNNING) && + if ((newState != TaskStatus.State.RUNNING && + newState != TaskStatus.State.COMMIT_PENDING ) && (oldState == newState)) { LOG.warn("Recieved duplicate status update of '" + newState + "' for '" + taskid + "' of TIP '" + getTIPId() + "'"); @@ -733,6 +773,9 @@ } else { t = new ReduceTask(jobFile, taskid, partition, numMaps); } + if (cleanup) { + t.setCleanupTask(); + } t.setConf(conf); t.setFailedRanges(failedRanges); t.setSkipping(skipping); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Fri Sep 5 03:50:04 2008 @@ -436,7 +436,7 @@ }catch(IOException ie){ LOG.warn("Error releasing caches : Cache files might not have been cleaned up"); } - tracker.reportTaskFinished(t.getTaskID()); + tracker.reportTaskFinished(t.getTaskID(), false); } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Sep 5 03:50:04 2008 @@ -37,7 +37,7 @@ LogFactory.getLog(TaskStatus.class.getName()); //enumeration for reporting current phase of a task. - public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE} + public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP} // what state is the task in? public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, @@ -264,9 +264,11 @@ * @param phase * @param counters */ - synchronized void statusUpdate(float progress, String state, + synchronized void statusUpdate(State runState, + float progress, + String state, Counters counters) { - setRunState(TaskStatus.State.RUNNING); + setRunState(runState); setProgress(progress); setStateString(state); setCounters(counters); Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Sep 5 03:50:04 2008 @@ -28,6 +28,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -191,6 +192,12 @@ */ private int probe_sample_size = 500; + /* + * A list of commitTaskActions for whom commit response has been received + */ + private List commitResponses = + Collections.synchronizedList(new ArrayList()); + private ShuffleServerMetrics shuffleServerMetrics; /** This class contains the methods that should be used for metrics-reporting * the specific metrics for shuffle. The TaskTracker is actually a server for @@ -407,8 +414,10 @@ // RPC initialization int max = maxCurrentMapTasks > maxCurrentReduceTasks ? maxCurrentMapTasks : maxCurrentReduceTasks; + //set the num handlers to max*2 since canCommit may wait for the duration + //of a heartbeat RPC this.taskReportServer = - RPC.getServer(this, bindAddress, tmpPort, max, false, this.fConf); + RPC.getServer(this, bindAddress, tmpPort, 2 * max, false, this.fConf); this.taskReportServer.start(); // get the assigned address @@ -957,6 +966,13 @@ for(TaskTrackerAction action: actions) { if (action instanceof LaunchTaskAction) { startNewTask((LaunchTaskAction) action); + } else if (action instanceof CommitTaskAction) { + CommitTaskAction commitAction = (CommitTaskAction)action; + if (!commitResponses.contains(commitAction.getTaskID())) { + LOG.info("Received commit task action for " + + commitAction.getTaskID()); + commitResponses.add(commitAction.getTaskID()); + } } else { tasksToCleanup.put(action); } @@ -1072,7 +1088,8 @@ synchronized (this) { for (TaskStatus taskStatus : status.getTaskReports()) { - if (taskStatus.getRunState() != TaskStatus.State.RUNNING) { + if (taskStatus.getRunState() != TaskStatus.State.RUNNING && + taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) { if (taskStatus.getIsMap()) { mapTotal--; } else { @@ -1188,7 +1205,8 @@ private synchronized void markUnresponsiveTasks() throws IOException { long now = System.currentTimeMillis(); for (TaskInProgress tip: runningTasks.values()) { - if (tip.getRunState() == TaskStatus.State.RUNNING) { + if (tip.getRunState() == TaskStatus.State.RUNNING || + tip.getRunState() == TaskStatus.State.COMMIT_PENDING) { // Check the per-job timeout interval for tasks; // an interval of '0' implies it is never timed-out long jobTaskTimeout = tip.getTaskTimeout(); @@ -1308,7 +1326,8 @@ TaskInProgress killMe = null; for (Iterator it = runningTasks.values().iterator(); it.hasNext();) { TaskInProgress tip = (TaskInProgress) it.next(); - if ((tip.getRunState() == TaskStatus.State.RUNNING) && + if ((tip.getRunState() == TaskStatus.State.RUNNING || + tip.getRunState() == TaskStatus.State.COMMIT_PENDING) && !tip.wasKilled) { if (killMe == null) { @@ -1517,7 +1536,6 @@ private TaskStatus taskStatus; private long taskTimeout; private String debugCommand; - private boolean shouldPromoteOutput = false; /** */ @@ -1667,7 +1685,8 @@ "% " + taskStatus.getStateString()); if (this.done || - this.taskStatus.getRunState() != TaskStatus.State.RUNNING) { + (this.taskStatus.getRunState() != TaskStatus.State.RUNNING && + this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) { //make sure we ignore progress messages after a task has //invoked TaskUmbilicalProtocol.done() or if the task has been //KILLED/FAILED @@ -1717,15 +1736,8 @@ /** * The task is reporting that it's done running */ - public synchronized void reportDone(boolean shouldPromote) { - TaskStatus.State state = null; - this.shouldPromoteOutput = shouldPromote; - if (shouldPromote) { - state = TaskStatus.State.COMMIT_PENDING; - } else { - state = TaskStatus.State.SUCCEEDED; - } - this.taskStatus.setRunState(state); + public synchronized void reportDone() { + this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED); this.taskStatus.setProgress(1.0f); this.taskStatus.setFinishTime(System.currentTimeMillis()); this.done = true; @@ -1758,13 +1770,7 @@ // boolean needCleanup = false; synchronized (this) { - if (done) { - if (shouldPromoteOutput) { - taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING); - } else { - taskStatus.setRunState(TaskStatus.State.SUCCEEDED); - } - } else { + if (!done) { if (!wasKilled) { failures += 1; taskStatus.setRunState(TaskStatus.State.FAILED); @@ -1960,7 +1966,8 @@ public void jobHasFinished(boolean wasFailure) throws IOException { // Kill the task if it is still running synchronized(this){ - if (getRunState() == TaskStatus.State.RUNNING) { + if (getRunState() == TaskStatus.State.RUNNING || + getRunState() == TaskStatus.State.COMMIT_PENDING) { kill(wasFailure); } } @@ -1974,7 +1981,8 @@ * @param wasFailure was it a failure (versus a kill request)? */ public synchronized void kill(boolean wasFailure) throws IOException { - if (taskStatus.getRunState() == TaskStatus.State.RUNNING) { + if (taskStatus.getRunState() == TaskStatus.State.RUNNING || + taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) { wasKilled = true; if (wasFailure) { failures += 1; @@ -2136,13 +2144,33 @@ } /** + * Task is reporting that it is in commit_pending + * and it is waiting for the commit Response + */ + public synchronized void commitPending(TaskAttemptID taskid, + TaskStatus taskStatus) + throws IOException { + LOG.info("Task " + taskid + " is in COMMIT_PENDING"); + statusUpdate(taskid, taskStatus); + reportTaskFinished(taskid, true); + } + + /** + * Child checking whether it can commit + */ + public synchronized boolean canCommit(TaskAttemptID taskid) { + return commitResponses.contains(taskid); //don't remove it now + } + + /** * The task is done. */ - public synchronized void done(TaskAttemptID taskid, boolean shouldPromote) + public synchronized void done(TaskAttemptID taskid) throws IOException { TaskInProgress tip = tasks.get(taskid); + commitResponses.remove(taskid); if (tip != null) { - tip.reportDone(shouldPromote); + tip.reportDone(); } else { LOG.warn("Unknown child task done: "+taskid+". Ignored."); } @@ -2196,13 +2224,15 @@ /** * The task is no longer running. It may not have completed successfully */ - void reportTaskFinished(TaskAttemptID taskid) { + void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) { TaskInProgress tip; synchronized (this) { tip = tasks.get(taskid); } if (tip != null) { - tip.taskFinished(); + if (!commitPending) { + tip.taskFinished(); + } synchronized(finishedCount) { finishedCount[0]++; finishedCount.notify(); @@ -2331,7 +2361,7 @@ TaskStatus status = tip.getStatus(); status.setIncludeCounters(sendCounters); status.setOutputSize(tryToGetOutputSize(status.getTaskID(), fConf)); - // send counters for finished or failed tasks. + // send counters for finished or failed tasks and commit pending tasks if (status.getRunState() != TaskStatus.State.RUNNING) { status.setIncludeCounters(true); } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerAction.java Fri Sep 5 03:50:04 2008 @@ -48,7 +48,10 @@ KILL_JOB, /** Reinitialize the tasktracker. */ - REINIT_TRACKER + REINIT_TRACKER, + + /** Ask a task to save its output. */ + COMMIT_TASK }; /** @@ -80,6 +83,11 @@ action = new ReinitTrackerAction(); } break; + case COMMIT_TASK: + { + action = new CommitTaskAction(); + } + break; } return action; Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Sep 5 03:50:04 2008 @@ -207,7 +207,8 @@ TaskStatus.State state = ts.getRunState(); if (ts.getIsMap() && ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED))) { + (state == TaskStatus.State.UNASSIGNED) || + (state == TaskStatus.State.COMMIT_PENDING))) { mapCount++; } } @@ -224,7 +225,8 @@ TaskStatus.State state = ts.getRunState(); if ((!ts.getIsMap()) && ((state == TaskStatus.State.RUNNING) || - (state == TaskStatus.State.UNASSIGNED))) { + (state == TaskStatus.State.UNASSIGNED) || + (state == TaskStatus.State.COMMIT_PENDING))) { reduceCount++; } } Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original) +++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Sep 5 03:50:04 2008 @@ -45,9 +45,10 @@ * Version 9 changes the counter representation for HADOOP-1915 * Version 10 changed the TaskStatus format and added reportNextRecordRange * for HADOOP-153 + * Version 11 Adds RPCs for task commit as part of HADOOP-3150 * */ - public static final long versionID = 10L; + public static final long versionID = 11L; /** Called when a child task process starts, to get its task.*/ Task getTask(TaskAttemptID taskid) throws IOException; @@ -88,9 +89,26 @@ /** Report that the task is successfully completed. Failure is assumed if * the task process exits without calling this. * @param taskid task's id - * @param shouldBePromoted whether to promote the task's output or not */ - void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException; + void done(TaskAttemptID taskid) throws IOException; + + /** + * Report that the task is complete, but its commit is pending. + * + * @param taskId task's id + * @param taskStatus status of the child + * @throws IOException + */ + void commitPending(TaskAttemptID taskId, TaskStatus taskStatus) + throws IOException, InterruptedException; + + /** + * Polling to know whether the task can go-ahead with commit + * @param taskid + * @return true/false + * @throws IOException + */ + boolean canCommit(TaskAttemptID taskid) throws IOException; /** Report that a reduce-task couldn't shuffle map-outputs.*/ void shuffleError(TaskAttemptID taskId, String message) throws IOException; Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java?rev=692408&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestFileOutputCommitter.java Fri Sep 5 03:50:04 2008 @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.*; +import junit.framework.TestCase; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.*; + +public class TestFileOutputCommitter extends TestCase { + private static Path outDir = new Path( + System.getProperty("test.build.data", "."), "output"); + + // A random task attempt id for testing. + private static String attempt = "attempt_200707121733_0001_m_000000_0"; + private static TaskAttemptID taskID = TaskAttemptID.forName(attempt); + + @SuppressWarnings("unchecked") + public void testCommitter() throws Exception { + JobConf job = new JobConf(); + job.set("mapred.task.id", attempt); + job.setOutputCommitter(FileOutputCommitter.class); + JobContext jContext = new JobContext(job); + TaskAttemptContext tContext = new TaskAttemptContext(job, taskID); + FileOutputFormat.setOutputPath(job, outDir); + FileOutputCommitter committer = new FileOutputCommitter(); + FileOutputFormat.setWorkOutputPath(job, + committer.getTempTaskOutputPath(tContext)); + + committer.setupJob(jContext); + committer.setupTask(tContext); + String file = "test.txt"; + + // A reporter that does nothing + Reporter reporter = Reporter.NULL; + FileSystem localFs = FileSystem.getLocal(job); + TextOutputFormat theOutputFormat = new TextOutputFormat(); + RecordWriter theRecordWriter = + theOutputFormat.getRecordWriter(localFs, job, file, reporter); + Text key1 = new Text("key1"); + Text key2 = new Text("key2"); + Text val1 = new Text("val1"); + Text val2 = new Text("val2"); + NullWritable nullWritable = NullWritable.get(); + + try { + theRecordWriter.write(key1, val1); + theRecordWriter.write(null, nullWritable); + theRecordWriter.write(null, val1); + theRecordWriter.write(nullWritable, val2); + theRecordWriter.write(key2, nullWritable); + theRecordWriter.write(key1, null); + theRecordWriter.write(null, null); + theRecordWriter.write(key2, val2); + } finally { + theRecordWriter.close(reporter); + } + committer.commitTask(tContext); + committer.cleanupJob(jContext); + + File expectedFile = new File(new Path(outDir, file).toString()); + StringBuffer expectedOutput = new StringBuffer(); + expectedOutput.append(key1).append('\t').append(val1).append("\n"); + expectedOutput.append(val1).append("\n"); + expectedOutput.append(val2).append("\n"); + expectedOutput.append(key2).append("\n"); + expectedOutput.append(key1).append("\n"); + expectedOutput.append(key2).append('\t').append(val2).append("\n"); + String output = UtilsForTests.slurp(expectedFile); + assertEquals(output, expectedOutput.toString()); + } + + public static void main(String[] args) throws Exception { + new TestFileOutputCommitter().testCommitter(); + } +} Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Fri Sep 5 03:50:04 2008 @@ -74,6 +74,8 @@ assertEquals("number of maps", 1, reports.length); reports = client.getReduceTaskReports(jobid); assertEquals("number of reduces", 1, reports.length); + reports = client.getCleanupTaskReports(jobid); + assertEquals("number of cleanups", 2, reports.length); Counters counters = ret.job.getCounters(); assertEquals("number of map inputs", 3, counters.getCounter(Task.Counter.MAP_INPUT_RECORDS)); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMultipleTextOutputFormat.java Fri Sep 5 03:50:04 2008 @@ -38,12 +38,14 @@ } } + // A random task attempt id for testing. + private static String attempt = "attempt_200707121733_0001_m_000000_0"; + private static Path workDir = new Path(new Path( new Path(System.getProperty("test.build.data", "."), "data"), - MRConstants.TEMP_DIR_NAME), - "TestMultipleTextOutputFormat"); + FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt); private static void writeData(RecordWriter rw) throws IOException { for (int i = 10; i < 40; i++) { @@ -84,6 +86,7 @@ public void testFormat() throws Exception { JobConf job = new JobConf(); + job.set("mapred.task.id", attempt); FileOutputFormat.setOutputPath(job, workDir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, workDir); FileSystem fs = workDir.getFileSystem(job); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Sep 5 03:50:04 2008 @@ -230,6 +230,13 @@ } } rjob.killJob(); + while(rjob.cleanupProgress() == 0.0f) { + try { + Thread.sleep(10); + } catch (InterruptedException ie) { + break; + } + } if (shouldSucceed) { assertTrue(rjob.isComplete()); } else { Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestSequenceFileAsBinaryOutputFormat.java Fri Sep 5 03:50:04 2008 @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; -import org.apache.hadoop.mapred.*; import org.apache.hadoop.io.SequenceFile.CompressionType; import junit.framework.TestCase; @@ -34,6 +33,8 @@ LogFactory.getLog(TestSequenceFileAsBinaryOutputFormat.class.getName()); private static final int RECORDS = 10000; + // A random task attempt id for testing. + private static final String attempt = "attempt_200707121733_0001_m_000000_0"; public void testBinary() throws IOException { JobConf job = new JobConf(); @@ -41,8 +42,7 @@ Path dir = new Path(new Path(new Path(System.getProperty("test.build.data",".")), - MRConstants.TEMP_DIR_NAME), - "mapred"); + FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt); Path file = new Path(dir, "testbinary.seq"); Random r = new Random(); long seed = r.nextLong(); @@ -53,6 +53,7 @@ fail("Failed to create output directory"); } + job.set("mapred.task.id", attempt); FileOutputFormat.setOutputPath(job, dir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, dir); Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java (original) +++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTextOutputFormat.java Fri Sep 5 03:50:04 2008 @@ -35,17 +35,19 @@ throw new RuntimeException("init failure", e); } } + // A random task attempt id for testing. + private static String attempt = "attempt_200707121733_0001_m_000000_0"; private static Path workDir = new Path(new Path( new Path(System.getProperty("test.build.data", "."), "data"), - MRConstants.TEMP_DIR_NAME), - "TestTextOutputFormat"); + FileOutputCommitter.TEMP_DIR_NAME), "_" + attempt); @SuppressWarnings("unchecked") public void testFormat() throws Exception { JobConf job = new JobConf(); + job.set("mapred.task.id", attempt); FileOutputFormat.setOutputPath(job, workDir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, workDir); FileSystem fs = workDir.getFileSystem(job); @@ -98,6 +100,7 @@ JobConf job = new JobConf(); String separator = "\u0001"; job.set("mapred.textoutputformat.separator", separator); + job.set("mapred.task.id", attempt); FileOutputFormat.setOutputPath(job, workDir.getParent().getParent()); FileOutputFormat.setWorkOutputPath(job, workDir); FileSystem fs = workDir.getFileSystem(job); Modified: hadoop/core/trunk/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobdetails.jsp?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/webapps/job/jobdetails.jsp (original) +++ hadoop/core/trunk/src/webapps/job/jobdetails.jsp Fri Sep 5 03:50:04 2008 @@ -88,6 +88,41 @@ ) + "\n"); } + + private void printCleanupTaskSummary(JspWriter out, + String jobId, + TaskInProgress[] tasks + ) throws IOException { + int totalTasks = tasks.length; + int runningTasks = 0; + int finishedTasks = 0; + int killedTasks = 0; + String kind = "cleanup"; + for(int i=0; i < totalTasks; ++i) { + TaskInProgress task = tasks[i]; + if (task.isComplete()) { + finishedTasks += 1; + } else if (task.isRunning()) { + runningTasks += 1; + } else if (task.isFailed()) { + killedTasks += 1; + } + } + int pendingTasks = totalTasks - runningTasks - killedTasks - finishedTasks; + out.print(((runningTasks > 0) + ? "" + " Running" + + "" + : ((pendingTasks > 0) ? " Pending" : + ((finishedTasks > 0) + ?"" + " Successful" + + "" + : ((killedTasks > 0) + ?"" + " Failed" + + "" : "None"))))); + } private void printConfirm(JspWriter out, String jobId) throws IOException{ String url = "jobdetails.jsp?jobid=" + jobId; @@ -194,6 +229,9 @@ job.getFinishTime(), job.getStartTime()) + "
\n"); } } + out.print("Job Cleanup:"); + printCleanupTaskSummary(out, jobId, job.getCleanupTasks()); + out.print("
\n"); if (flakyTaskTrackers > 0) { out.print("Black-listed TaskTrackers: " + "" + Modified: hadoop/core/trunk/src/webapps/job/jobtasks.jsp URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtasks.jsp?rev=692408&r1=692407&r2=692408&view=diff ============================================================================== --- hadoop/core/trunk/src/webapps/job/jobtasks.jsp (original) +++ hadoop/core/trunk/src/webapps/job/jobtasks.jsp Fri Sep 5 03:50:04 2008 @@ -39,9 +39,12 @@ reports = (job != null) ? tracker.getMapTaskReports(jobidObj) : null; tasks = (job != null) ? job.getMapTasks() : null; } - else{ + else if ("reduce".equals(type)) { reports = (job != null) ? tracker.getReduceTaskReports(jobidObj) : null; tasks = (job != null) ? job.getReduceTasks() : null; + } else { + reports = (job != null) ? tracker.getCleanupTaskReports(jobidObj) : null; + tasks = (job != null) ? job.getCleanupTasks() : null; } %>