Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Mar 8 05:54:02 2011 @@ -72,8 +72,10 @@ public class ReduceTask extends Task { private CompressionCodec codec; + { getProgress().setStatus("reduce"); + setPhase(TaskStatus.Phase.SHUFFLE); // phase to start with } private Progress copyPhase; @@ -119,21 +121,14 @@ public class ReduceTask extends Task { public ReduceTask() { super(); - this.taskStatus = new ReduceTaskStatus(); } public ReduceTask(String jobFile, TaskAttemptID taskId, int partition, int numMaps, int numSlotsRequired) { super(jobFile, taskId, partition, numSlotsRequired); this.numMaps = numMaps; -/* - */ - this.taskStatus = new ReduceTaskStatus(getTaskID(), 0.0f, numSlotsRequired, - TaskStatus.State.UNASSIGNED, - "", "", "", TaskStatus.Phase.SHUFFLE, - getCounters()); } - + private CompressionCodec initCodec() { // check if map-outputs are to be compressed if (conf.getCompressMapOutput()) { @@ -156,45 +151,6 @@ public class ReduceTask extends Task { return false; } - /** - * Is this really a combo-task masquerading as a plain MapTask? Decidedly - * not. - */ - @Override - public boolean isUberTask() { - return false; - } - - /** - * Allow UberTask (or, potentially, JobInProgress or others) to set up a - * deeper Progress hierarchy even if run() is skipped. If setProgress() - * is also needed, it should be called before createPhase() or else - * the sub-phases created here will be wiped out. - */ - void createPhase(TaskStatus.Phase phaseType, String status) { - if (phaseType == TaskStatus.Phase.SHUFFLE) { - copyPhase = getProgress().addPhase(status); - } else if (phaseType == TaskStatus.Phase.SORT) { - sortPhase = getProgress().addPhase(status); - } else /* TaskStatus.Phase.REDUCE */ { - reducePhase = getProgress().addPhase(status); - } - } - - /** - * Allow UberTask to traverse the deeper Progress hierarchy in case run() is - * skipped. - */ - void completePhase(TaskStatus.Phase phaseType) { - if (phaseType == TaskStatus.Phase.SHUFFLE) { - copyPhase.complete(); - } else if (phaseType == TaskStatus.Phase.SORT) { - sortPhase.complete(); - } else /* TaskStatus.Phase.REDUCE */ { - reducePhase.complete(); - } - } - public int getNumMaps() { return numMaps; } /** @@ -221,39 +177,37 @@ public class ReduceTask extends Task { } // Get the input files for the reducer. - static Path[] getMapFiles(ReduceTask reduce, FileSystem fs, boolean isLocal) + private Path[] getMapFiles(FileSystem fs, boolean isLocal) throws IOException { List fileList = new ArrayList(); if (isLocal) { // for local jobs - for (int i = 0; i < reduce.numMaps; ++i) { - fileList.add(reduce.mapOutputFile.getInputFile(i)); + for(int i = 0; i < numMaps; ++i) { + fileList.add(mapOutputFile.getInputFile(i)); } } else { // for non local jobs - for (FileStatus filestatus : reduce.mapOutputFilesOnDisk) { + for (FileStatus filestatus : mapOutputFilesOnDisk) { fileList.add(filestatus.getPath()); } } return fileList.toArray(new Path[0]); } - private static class ReduceValuesIterator + private class ReduceValuesIterator extends ValuesIterator { - ReduceTask reduce; - public ReduceValuesIterator (ReduceTask reduce, RawKeyValueIterator in, + public ReduceValuesIterator (RawKeyValueIterator in, RawComparator comparator, Class keyClass, Class valClass, Configuration conf, Progressable reporter) throws IOException { super(in, comparator, keyClass, valClass, conf, reporter); - this.reduce = reduce; } @Override public VALUE next() { - reduce.reduceInputValueCounter.increment(1); + reduceInputValueCounter.increment(1); return moveToNext(); } @@ -262,13 +216,12 @@ public class ReduceTask extends Task { } public void informReduceProgress() { - // update progress: - reduce.reducePhase.set(super.in.getProgress().getProgress()); + reducePhase.set(super.in.getProgress().getProgress()); // update progress reporter.progress(); } } - private static class SkippingReduceValuesIterator + private class SkippingReduceValuesIterator extends ReduceValuesIterator { private SkipRangeIterator skipIt; private TaskUmbilicalProtocol umbilical; @@ -281,27 +234,26 @@ public class ReduceTask extends Task { private boolean toWriteSkipRecs; private boolean hasNext; private TaskReporter reporter; - - public SkippingReduceValuesIterator(ReduceTask reduce, - RawKeyValueIterator in, + + public SkippingReduceValuesIterator(RawKeyValueIterator in, RawComparator comparator, Class keyClass, Class valClass, Configuration conf, TaskReporter reporter, TaskUmbilicalProtocol umbilical) throws IOException { - super(reduce, in, comparator, keyClass, valClass, conf, reporter); + super(in, comparator, keyClass, valClass, conf, reporter); this.umbilical = umbilical; - this.skipGroupCounter = + this.skipGroupCounter = reporter.getCounter(TaskCounter.REDUCE_SKIPPED_GROUPS); - this.skipRecCounter = + this.skipRecCounter = reporter.getCounter(TaskCounter.REDUCE_SKIPPED_RECORDS); - this.toWriteSkipRecs = reduce.toWriteSkipRecs() && + this.toWriteSkipRecs = toWriteSkipRecs() && SkipBadRecords.getSkipOutputPath(conf)!=null; this.keyClass = keyClass; this.valClass = valClass; this.reporter = reporter; - skipIt = reduce.getSkipRanges().skipRangeIterator(); + skipIt = getSkipRanges().skipRangeIterator(); mayBeSkip(); } - + public void nextKey() throws IOException { super.nextKey(); mayBeSkip(); @@ -340,16 +292,16 @@ public class ReduceTask extends Task { } skipGroupCounter.increment(skip); skipRecCounter.increment(skipRec); - reduce.reportNextRecordRange(umbilical, grpIndex); + reportNextRecordRange(umbilical, grpIndex); } @SuppressWarnings("unchecked") private void writeSkippedRec(KEY key, VALUE value) throws IOException{ if(skipWriter==null) { - Path skipDir = SkipBadRecords.getSkipOutputPath(reduce.conf); - Path skipFile = new Path(skipDir, reduce.getTaskID().toString()); + Path skipDir = SkipBadRecords.getSkipOutputPath(conf); + Path skipFile = new Path(skipDir, getTaskID().toString()); skipWriter = SequenceFile.createWriter( - skipFile.getFileSystem(reduce.conf), reduce.conf, skipFile, + skipFile.getFileSystem(conf), conf, skipFile, keyClass, valClass, CompressionType.BLOCK, reporter); } @@ -411,8 +363,8 @@ public class ReduceTask extends Task { } else { final FileSystem rfs = FileSystem.getLocal(job).getRaw(); rIter = Merger.merge(job, rfs, job.getMapOutputKeyClass(), - job.getMapOutputValueClass(), codec, - getMapFiles(this, rfs, true), + job.getMapOutputValueClass(), codec, + getMapFiles(rfs, true), !conf.getKeepFailedTaskFiles(), job.getInt(JobContext.IO_SORT_FACTOR, 100), new Path(getTaskID().toString()), @@ -430,40 +382,18 @@ public class ReduceTask extends Task { RawComparator comparator = job.getOutputValueGroupingComparator(); if (useNewApi) { - runNewReducer(this, job, umbilical, reporter, rIter, comparator, + runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { - runOldReducer(this, job, umbilical, reporter, rIter, comparator, + runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } done(umbilical, reporter); } - private static class WrappedOutputCollector - implements OutputCollector { - RecordWriter out; - TaskReporter reporter; - Counters.Counter reduceOutputCounter; - public WrappedOutputCollector(ReduceTask reduce, - RecordWriter out, - TaskReporter reporter) { - this.out = out; - this.reporter = reporter; - this.reduceOutputCounter = reduce.reduceOutputCounter; - } - - public void collect(OUTKEY key, OUTVALUE value) - throws IOException { - out.write(key, value); - reduceOutputCounter.increment(1); - // indicate that progress update needs to be sent - reporter.progress(); - } - } - @SuppressWarnings("unchecked") - static - void runOldReducer(ReduceTask reduce, JobConf job, + private + void runOldReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, @@ -473,7 +403,7 @@ public class ReduceTask extends Task { Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass(), job); // make output collector - String finalName = getOutputName(reduce.getPartition()); + String finalName = getOutputName(getPartition()); FileSystem fs = FileSystem.get(job); @@ -481,7 +411,15 @@ public class ReduceTask extends Task { job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter); OutputCollector collector = - new WrappedOutputCollector(reduce, out, reporter); + new OutputCollector() { + public void collect(OUTKEY key, OUTVALUE value) + throws IOException { + out.write(key, value); + reduceOutputCounter.increment(1); + // indicate that progress update needs to be sent + reporter.progress(); + } + }; // apply reduce function try { @@ -489,16 +427,16 @@ public class ReduceTask extends Task { boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 && SkipBadRecords.getAutoIncrReducerProcCount(job); - ReduceValuesIterator values = reduce.isSkipping() ? - new SkippingReduceValuesIterator(reduce, rIter, + ReduceValuesIterator values = isSkipping() ? + new SkippingReduceValuesIterator(rIter, comparator, keyClass, valueClass, job, reporter, umbilical) : - new ReduceValuesIterator(reduce, rIter, + new ReduceValuesIterator(rIter, job.getOutputValueGroupingComparator(), keyClass, valueClass, job, reporter); values.informReduceProgress(); while (values.more()) { - reduce.reduceInputKeyCounter.increment(1); + reduceInputKeyCounter.increment(1); reducer.reduce(values.getKey(), values, collector, reporter); if(incrProcCount) { reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, @@ -511,7 +449,7 @@ public class ReduceTask extends Task { //Clean up: repeated in catch block below reducer.close(); out.close(reporter); - //End of cleanup. + //End of clean up. } catch (IOException ioe) { try { reducer.close(); @@ -549,38 +487,9 @@ public class ReduceTask extends Task { } } - private static class WrappedRawKeyValueIterator implements RawKeyValueIterator { - ReduceTask reduce; - TaskReporter reporter; - RawKeyValueIterator rawIter; - public WrappedRawKeyValueIterator(ReduceTask reduce, TaskReporter reporter, - RawKeyValueIterator rawIter) { - this.reduce = reduce; - this.rawIter = rawIter; - this.reporter = reporter; - } - public void close() throws IOException { - rawIter.close(); - } - public DataInputBuffer getKey() throws IOException { - return rawIter.getKey(); - } - public Progress getProgress() { - return rawIter.getProgress(); - } - public DataInputBuffer getValue() throws IOException { - return rawIter.getValue(); - } - public boolean next() throws IOException { - boolean ret = rawIter.next(); - reporter.setProgress(rawIter.getProgress().getProgress()); - return ret; - } - } - @SuppressWarnings("unchecked") - static - void runNewReducer(final ReduceTask reduce, JobConf job, + private + void runNewReducer(JobConf job, final TaskUmbilicalProtocol umbilical, final TaskReporter reporter, RawKeyValueIterator rIter, @@ -589,29 +498,49 @@ public class ReduceTask extends Task { Class valueClass ) throws IOException,InterruptedException, ClassNotFoundException { - org.apache.hadoop.mapreduce.TaskAttemptID reduceId = reduce.getTaskID(); // wrap value iterator to report progress. final RawKeyValueIterator rawIter = rIter; - rIter = new WrappedRawKeyValueIterator(reduce, reporter, rawIter); + rIter = new RawKeyValueIterator() { + public void close() throws IOException { + rawIter.close(); + } + public DataInputBuffer getKey() throws IOException { + return rawIter.getKey(); + } + public Progress getProgress() { + return rawIter.getProgress(); + } + public DataInputBuffer getValue() throws IOException { + return rawIter.getValue(); + } + public boolean next() throws IOException { + boolean ret = rawIter.next(); + reporter.setProgress(rawIter.getProgress().getProgress()); + return ret; + } + }; // make a task context so we can get the classes org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = - new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, reduceId); + new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, getTaskID()); // make a reducer org.apache.hadoop.mapreduce.Reducer reducer = (org.apache.hadoop.mapreduce.Reducer) ReflectionUtils.newInstance(taskContext.getReducerClass(), job); org.apache.hadoop.mapreduce.RecordWriter output = (org.apache.hadoop.mapreduce.RecordWriter) - reduce.outputFormat.getRecordWriter(taskContext); + outputFormat.getRecordWriter(taskContext); org.apache.hadoop.mapreduce.RecordWriter trackedRW = - new NewTrackingRecordWriter(output, reduce.reduceOutputCounter); - job.setBoolean(JobContext.SKIP_RECORDS, reduce.isSkipping()); - org.apache.hadoop.mapreduce.Reducer.Context reducerContext = - createReduceContext(reducer, job, reduceId, rIter, - reduce.reduceInputKeyCounter, - reduce.reduceInputValueCounter, - trackedRW, reduce.committer, reporter, - comparator, keyClass, valueClass); + new NewTrackingRecordWriter(output, reduceOutputCounter); + job.setBoolean("mapred.skip.on", isSkipping()); + job.setBoolean(JobContext.SKIP_RECORDS, isSkipping()); + org.apache.hadoop.mapreduce.Reducer.Context + reducerContext = createReduceContext(reducer, job, getTaskID(), + rIter, reduceInputKeyCounter, + reduceInputValueCounter, + trackedRW, + committer, + reporter, comparator, keyClass, + valueClass); reducer.run(reducerContext); output.close(reducerContext); } Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java Tue Mar 8 05:54:02 2011 @@ -54,11 +54,6 @@ class ReduceTaskStatus extends TaskStatu } @Override - public boolean getIsUber() { - return false; - } - - @Override void setFinishTime(long finishTime) { if (shuffleFinishTime == 0) { this.shuffleFinishTime = finishTime; Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/Task.java Tue Mar 8 05:54:02 2011 @@ -118,7 +118,6 @@ abstract public class Task implements Wr private String jobFile; // job configuration file private String user; // user running the job private TaskAttemptID taskId; // unique, includes job id - private TaskAttemptID taskIdForUmbilical; // same, or uber's if subtask private int partition; // id within job TaskStatus taskStatus; // current status of the task protected JobStatus.State jobRunStateForCleanup; @@ -160,8 +159,8 @@ abstract public class Task implements Wr //////////////////////////////////////////// public Task() { + taskStatus = TaskStatus.createTaskStatus(isMapTask()); taskId = new TaskAttemptID(); - taskIdForUmbilical = taskId; spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); failedShuffleCounter = @@ -175,10 +174,17 @@ abstract public class Task implements Wr int numSlotsRequired) { this.jobFile = jobFile; this.taskId = taskId; - this.taskIdForUmbilical = taskId; this.partition = partition; this.numSlotsRequired = numSlotsRequired; + this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, + 0.0f, numSlotsRequired, + TaskStatus.State.UNASSIGNED, + "", "", "", + isMapTask() ? + TaskStatus.Phase.MAP : + TaskStatus.Phase.SHUFFLE, + counters); spilledRecordsCounter = counters.findCounter(TaskCounter.SPILLED_RECORDS); failedShuffleCounter = counters.findCounter(TaskCounter.FAILED_SHUFFLE); mergedMapOutputsCounter = @@ -191,12 +197,7 @@ abstract public class Task implements Wr //////////////////////////////////////////// public void setJobFile(String jobFile) { this.jobFile = jobFile; } public String getJobFile() { return jobFile; } - public TaskAttemptID getTaskID() { return taskId; } - public void setTaskIdForUmbilical(TaskAttemptID taskIdForUmbilical) { - this.taskIdForUmbilical = taskIdForUmbilical; - } - public int getNumSlotsRequired() { return numSlotsRequired; } @@ -268,7 +269,7 @@ abstract public class Task implements Wr /** * Report a fatal error to the parent (task) tracker. */ - protected void reportFatalError(Throwable throwable, + protected void reportFatalError(TaskAttemptID id, Throwable throwable, String logMsg) { LOG.fatal(logMsg); Throwable tCause = throwable.getCause(); @@ -276,7 +277,7 @@ abstract public class Task implements Wr ? StringUtils.stringifyException(throwable) : StringUtils.stringifyException(tCause); try { - umbilical.fatalError(taskIdForUmbilical, cause); + umbilical.fatalError(id, cause); } catch (IOException ioe) { LOG.fatal("Failed to contact the tasktracker", ioe); System.exit(-1); @@ -388,14 +389,6 @@ abstract public class Task implements Wr this.user = user; } - /** - * Return the task's MapOutputFile instance. - * @return the task's MapOutputFile instance - */ - MapOutputFile getMapOutputFile() { - return mapOutputFile; - } - //////////////////////////////////////////// // Writable methods //////////////////////////////////////////// @@ -403,7 +396,6 @@ abstract public class Task implements Wr public void write(DataOutput out) throws IOException { Text.writeString(out, jobFile); taskId.write(out); - taskIdForUmbilical.write(out); out.writeInt(partition); out.writeInt(numSlotsRequired); taskStatus.write(out); @@ -422,7 +414,6 @@ abstract public class Task implements Wr public void readFields(DataInput in) throws IOException { jobFile = Text.readString(in); taskId = TaskAttemptID.read(in); - taskIdForUmbilical = TaskAttemptID.read(in); partition = in.readInt(); numSlotsRequired = in.readInt(); taskStatus.readFields(in); @@ -474,7 +465,7 @@ abstract public class Task implements Wr /** The number of milliseconds between progress reports. */ public static final int PROGRESS_INTERVAL = 3000; - private transient Progress taskProgress = new Progress(); //GRR Q: why transient? lose entire tree every time serialize?? + private transient Progress taskProgress = new Progress(); // Current counters private transient Counters counters = new Counters(); @@ -484,21 +475,6 @@ abstract public class Task implements Wr public abstract boolean isMapTask(); - /** - * Is this really a combo-task masquerading as a plain MapTask? - */ - public abstract boolean isUberTask(); - - /** - * This setter allows one to incorporate Tasks into multiple levels of - * a Progress addPhase()-generated hierarchy (i.e., not always the root - * node), which in turn allows Progress to handle all details of progress - * aggregation for an UberTask or even a whole job. - */ - protected void setProgress(Progress progress) { - taskProgress = progress; - } - public Progress getProgress() { return taskProgress; } public void initialize(JobConf job, JobID id, @@ -542,7 +518,7 @@ abstract public class Task implements Wr resourceCalculator.getProcResourceValues().getCumulativeCpuTime(); } } - + @InterfaceAudience.Private @InterfaceStability.Unstable protected class TaskReporter @@ -585,7 +561,6 @@ abstract public class Task implements Wr // indicate that progress update needs to be sent setProgressFlag(); } - // FIXME? why isn't this deprecated in favor of public setProgressFlag()? public void progress() { // indicate that progress update needs to be sent setProgressFlag(); @@ -628,16 +603,15 @@ abstract public class Task implements Wr } public InputSplit getInputSplit() throws UnsupportedOperationException { if (split == null) { - throw new UnsupportedOperationException("Input available only on map"); + throw new UnsupportedOperationException("Input only available on map"); } else { return split; } } /** - * The communication thread handles communication with the parent - * (TaskTracker). It sends progress updates if progress has been made or - * if the task needs to let the parent know that it's alive. It also pings - * the parent to see if it's alive. + * The communication thread handles communication with the parent (Task Tracker). + * It sends progress updates if progress has been made or if the task needs to + * let the parent know that it's alive. It also pings the parent to see if it's alive. */ public void run() { final int MAX_RETRIES = 3; @@ -673,8 +647,8 @@ abstract public class Task implements Wr taskFound = umbilical.ping(taskId); } - // if TaskTracker is not aware of our task ID (probably because it - // died and came back up), kill ourselves + // if Task Tracker is not aware of our task ID (probably because it died and + // came back up), kill ourselves if (!taskFound) { LOG.warn("Parent died. Exiting "+taskId); System.exit(66); @@ -708,7 +682,7 @@ abstract public class Task implements Wr } } } - + /** * Reports the next executing record range to TaskTracker. * @@ -727,7 +701,7 @@ abstract public class Task implements Wr if (LOG.isDebugEnabled()) { LOG.debug("sending reportNextRecordRange " + range); } - umbilical.reportNextRecordRange(taskIdForUmbilical, range); + umbilical.reportNextRecordRange(taskId, range); } /** @@ -863,13 +837,8 @@ abstract public class Task implements Wr public void done(TaskUmbilicalProtocol umbilical, TaskReporter reporter ) throws IOException, InterruptedException { - if (isUberTask()) { - LOG.info("UberTask:" + taskIdForUmbilical + " subtask:" + taskId - + "is done and is in the process of committing."); - } else { - LOG.info("Task:" + taskId - + "is done and is in the process of committing."); - } + LOG.info("Task:" + taskId + " is done." + + " And is in the process of commiting"); updateCounters(); boolean commitRequired = isCommitRequired(); @@ -879,7 +848,7 @@ abstract public class Task implements Wr // say the task tracker that task is commit pending while (true) { try { - umbilical.commitPending(taskIdForUmbilical, taskStatus); + umbilical.commitPending(taskId, taskStatus); break; } catch (InterruptedException ie) { // ignore @@ -930,14 +899,8 @@ abstract public class Task implements Wr int retries = MAX_RETRIES; while (true) { try { - //GRR FIXME (later): alternatives to taskIdForUmbilical would be - // (1) include taskId as part of umbilical object and protocol; - // (2) include taskId as part of taskStatus - // (3) extend TaskAttemptID (or create related Task inner class?) to - // include taskAttemptId() and taskAtteptIdForUmbilical() method - // that's overridden in uber context [Dick] - if (!umbilical.statusUpdate(taskIdForUmbilical, taskStatus)) { - LOG.warn("Parent died. Exiting " + taskId); + if (!umbilical.statusUpdate(getTaskID(), taskStatus)) { + LOG.warn("Parent died. Exiting "+taskId); System.exit(66); } taskStatus.clearStatus(); @@ -973,7 +936,7 @@ abstract public class Task implements Wr * @return -1 if it can't be found. */ private long calculateOutputSize() throws IOException { - if (!isMapOrReduce() || isUberTask()) { + if (!isMapOrReduce()) { return -1; } @@ -993,13 +956,8 @@ abstract public class Task implements Wr int retries = MAX_RETRIES; while (true) { try { - umbilical.done(taskIdForUmbilical); - if (isUberTask()) { - LOG.info("UberTask '" + taskIdForUmbilical + "' subtask '" + taskId - + "' done."); - } else { - LOG.info("Task '" + taskId + "' done."); - } + umbilical.done(getTaskID()); + LOG.info("Task '" + taskId + "' done."); return; } catch (IOException ie) { LOG.warn("Failure signalling completion: " + @@ -1018,7 +976,7 @@ abstract public class Task implements Wr int retries = MAX_RETRIES; while (true) { try { - while (!umbilical.canCommit(taskIdForUmbilical)) { + while (!umbilical.canCommit(taskId)) { try { Thread.sleep(1000); } catch(InterruptedException ie) { @@ -1075,7 +1033,7 @@ abstract public class Task implements Wr setPhase(TaskStatus.Phase.CLEANUP); getProgress().setStatus("cleanup"); statusUpdate(umbilical); - LOG.info("Running cleanup for the task"); + LOG.info("Runnning cleanup for the task"); // do the cleanup committer.abortTask(taskContext); } @@ -1100,10 +1058,8 @@ abstract public class Task implements Wr oldCommitter.abortJob(jobContext, jobRunStateForCleanup); } } else if (jobRunStateForCleanup == JobStatus.State.SUCCEEDED){ - // delete /_temporary and optionally create _SUCCESS file - if (!isUberTask()) { // defer since output files have not yet been saved - commitJob(); - } + LOG.info("Committing job"); + committer.commitJob(jobContext); } else { throw new IOException("Invalid state of the job for cleanup. State found " + jobRunStateForCleanup + " expecting " @@ -1112,31 +1068,20 @@ abstract public class Task implements Wr + JobStatus.State.KILLED); } - // delete the staging area for the job (e.g., - // "hdfs://localhost:9000/tmp/hadoop-/mapred/staging//.staging/ - // job_YYYYMMDDhhmm_nnnn"--NOT same as "_temporary" subdir of output dir) + // delete the staging area for the job JobConf conf = new JobConf(jobContext.getConfiguration()); if (!supportIsolationRunner(conf)) { - String jobStagingDir = conf.get("mapreduce.job.dir"); - Path jobStagingDirPath = new Path(jobStagingDir); - FileSystem fs = jobStagingDirPath.getFileSystem(conf); - fs.delete(jobStagingDirPath, true); + String jobTempDir = conf.get("mapreduce.job.dir"); + Path jobTempDirPath = new Path(jobTempDir); + FileSystem fs = jobTempDirPath.getFileSystem(conf); + fs.delete(jobTempDirPath, true); } - // update counters, save any pending output files, shut down the progress- - // reporter communication thread and the umbilical, and mark the task done - if (!isUberTask()) { // defer so UberTask can send TT final update(s) - done(umbilical, reporter); - } - } - - protected void commitJob() throws IOException { - LOG.info("Committing job"); - committer.commitJob(jobContext); + done(umbilical, reporter); } - + protected boolean supportIsolationRunner(JobConf conf) { - return (conf.getKeepTaskFilesPattern() != null || - conf.getKeepFailedTaskFiles()); + return (conf.getKeepTaskFilesPattern() != null || conf + .getKeepFailedTaskFiles()); } protected void runJobSetupTask(TaskUmbilicalProtocol umbilical, @@ -1145,12 +1090,9 @@ abstract public class Task implements Wr // do the setup getProgress().setStatus("setup"); committer.setupJob(jobContext); - if (!isUberTask()) { - // UberTask calls done() directly; don't shut down umbilical prematurely - done(umbilical, reporter); - } + done(umbilical, reporter); } - + public void setConf(Configuration conf) { if (conf instanceof JobConf) { this.conf = (JobConf) conf; Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Mar 8 05:54:02 2011 @@ -40,7 +40,7 @@ import org.apache.hadoop.mapreduce.TaskT import org.apache.hadoop.mapreduce.jobhistory.JobHistory; import org.apache.hadoop.mapreduce.jobhistory.TaskUpdatedEvent; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; -import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex; + import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.net.Node; @@ -62,7 +62,7 @@ import org.apache.hadoop.net.Node; * ************************************************************** */ class TaskInProgress { - static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently + static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently int maxTaskAttempts = 4; static final long SPECULATIVE_LAG = 60 * 1000; private static final int NUM_ATTEMPTS_PER_RESTART = 1000; @@ -75,19 +75,14 @@ class TaskInProgress { // Defines the TIP private String jobFile = null; - private TaskSplitMetaInfo[] splitInfo; + private TaskSplitMetaInfo splitInfo; private int numMaps; - private int numReduces; private int partition; private JobTracker jobtracker; private JobHistory jobHistory; private TaskID id; private JobInProgress job; private final int numSlotsRequired; - private boolean jobCleanup = false; - private boolean jobSetup = false; - private boolean isUber = false; - private boolean jobSetupCleanupNeeded = false; // UberTasks only // Status of the TIP private int successEventNumber = -1; @@ -105,6 +100,8 @@ class TaskInProgress { private long maxSkipRecords = 0; private FailedRanges failedRanges = new FailedRanges(); private volatile boolean skipping = false; + private boolean jobCleanup = false; + private boolean jobSetup = false; private static Enum CPU_COUNTER_KEY = TaskCounter.CPU_MILLISECONDS; private static Enum VM_BYTES_KEY = TaskCounter.VIRTUAL_MEMORY_BYTES; @@ -172,8 +169,7 @@ class TaskInProgress { JobInProgress job, int partition, int numSlotsRequired) { this.jobFile = jobFile; - this.splitInfo = new TaskSplitMetaInfo[1]; - this.splitInfo[0] = split; + this.splitInfo = split; this.jobtracker = jobtracker; this.job = job; this.conf = conf; @@ -187,7 +183,7 @@ class TaskInProgress { } this.user = job.getUser(); } - + /** * Constructor for ReduceTask */ @@ -269,36 +265,7 @@ class TaskInProgress { } } } - - /** - * Constructor for UberTask - */ - public TaskInProgress(JobID jobid, String jobFile, - TaskSplitMetaInfo[] splits, - int numMaps, int numReduces, - int partition, JobTracker jobtracker, JobConf conf, - JobInProgress job, int numSlotsRequired, - boolean jobSetupCleanupNeeded) { - this.isUber = true; - this.jobFile = jobFile; - this.splitInfo = splits; - this.numMaps = numMaps; - this.numReduces = numReduces; - this.jobSetupCleanupNeeded = jobSetupCleanupNeeded; - this.jobtracker = jobtracker; - this.job = job; - this.conf = conf; - this.partition = partition; - this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf); - this.numSlotsRequired = numSlotsRequired; - setMaxTaskAttempts(); - init(jobid); - if (jobtracker != null) { - this.jobHistory = jobtracker.getJobHistory(); - } - this.user = job.getUser(); - } - + /** * Set the max number of attempts before we declare a TIP as "failed" */ @@ -309,6 +276,15 @@ class TaskInProgress { this.maxTaskAttempts = conf.getMaxReduceAttempts(); } } + + /** + * Return the index of the tip within the job, so + * "task_200707121733_1313_0002_m_012345" would return 12345; + * @return int the tip index + */ + public int idWithinJob() { + return partition; + } public boolean isJobCleanupTask() { return jobCleanup; @@ -416,32 +392,19 @@ class TaskInProgress { public JobInProgress getJob() { return job; } - /** * Return an ID for this task, not its component taskid-threads */ public TaskID getTIPId() { return this.id; } - /** - * Whether this is a map task. Note that ubertasks return false here so - * they can run in a reduce slot (larger). (Setup and cleanup tasks may - * return either true or false.) + * Whether this is a map task */ public boolean isMapTask() { - return splitInfo != null && !isUber; + return splitInfo != null; } - - /** - * Whether this is an ubertask, i.e., a meta-task that contains a handful - * of map tasks and (at most) a single reduce task. Note that ubertasks - * are seen as reduce tasks in most contexts. - */ - public boolean isUberTask() { - return isUber; - } - + /** * Returns the {@link TaskType} of the {@link TaskAttemptID} passed. * The type of an attempt is determined by the nature of the task and not its @@ -966,16 +929,15 @@ class TaskInProgress { } /** - * Get the split locations + * Get the split locations */ public String[] getSplitLocations() { -//GRR FIXME? may need to add "( .. || isUberTask())" if ever called for uber (but locations for which split? all of them?) if (isMapTask() && !jobSetup && !jobCleanup) { - return splitInfo[0].getLocations(); + return splitInfo.getLocations(); } return new String[0]; } - + /** * Get the Status of the tasks managed by this TIP */ @@ -1158,7 +1120,7 @@ class TaskInProgress { //set this the first time we run a taskAttempt in this TIP //each Task attempt has its own TaskStatus, which tracks that - //attempt's execStartTime, thus this startTime is TIP wide. + //attempts execStartTime, thus this startTime is TIP wide. if (0 == execStartTime){ setExecStartTime(lastDispatchTime); } @@ -1170,9 +1132,8 @@ class TaskInProgress { } /** - * Creates a task or recreates a previously running one and adds it to this - * tip. The latter is used in case of jobtracker restarts. This is the - * ultimate source of Task objects in a normal Hadoop setup. + * Adds a previously running task to this tip. This is used in case of + * jobtracker restarts. */ public Task addRunningTask(TaskAttemptID taskid, String taskTracker, @@ -1186,20 +1147,10 @@ class TaskInProgress { LOG.debug("attempt " + numTaskFailures + " sending skippedRecords " + failedRanges.getIndicesCount()); } - t = new MapTask(jobFile, taskid, partition, - splitInfo[0].getSplitIndex(), numSlotsNeeded); + t = new MapTask(jobFile, taskid, partition, splitInfo.getSplitIndex(), + numSlotsNeeded); } else { - if (isUberTask()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Launching actual UberTask (" + numMaps + " maps, " - + numReduces + " reduces)"); - } - // numMaps is implicit in size of splitIndex array: - t = new UberTask(jobFile, taskid, partition, getSplitIndexArray(), - numReduces, numSlotsNeeded, jobSetupCleanupNeeded); - } else { - t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded); - } + t = new ReduceTask(jobFile, taskid, partition, numMaps, numSlotsNeeded); } if (jobCleanup) { t.setJobCleanupTask(); @@ -1236,17 +1187,6 @@ class TaskInProgress { return t; } - // GRR FIXME? more efficient just to pass splitInfo directly...any need for - // rest of it in UberTask? - TaskSplitIndex[] getSplitIndexArray() { - int numSplits = splitInfo.length; - TaskSplitIndex[] splitIndex = new TaskSplitIndex[numSplits]; - for (int i = 0; i < numSplits; ++i) { - splitIndex[i] = splitInfo[i].getSplitIndex(); - } - return splitIndex; - } - boolean isRunningTask(TaskAttemptID taskid) { TaskStatus status = taskStatuses.get(taskid); return status != null && status.getRunState() == TaskStatus.State.RUNNING; @@ -1296,8 +1236,7 @@ class TaskInProgress { } /** - * Get the task index of this map or reduce task. For example, - * "task_201011230308_87259_r_000240" would return 240. + * Get the id of this map or reduce task. * @return The index of this tip in the maps/reduces lists. */ public int getIdWithinJob() { @@ -1317,7 +1256,7 @@ class TaskInProgress { public int getSuccessEventNumber() { return successEventNumber; } - + /** * Gets the Node list of input split locations sorted in rack order. */ @@ -1325,7 +1264,7 @@ class TaskInProgress { if (!isMapTask() || jobSetup || jobCleanup) { return ""; } - String[] splits = splitInfo[0].getLocations(); // actually replicas + String[] splits = splitInfo.getLocations(); Node[] nodes = new Node[splits.length]; for (int i = 0; i < splits.length; i++) { nodes[i] = jobtracker.getNode(splits[i]); @@ -1354,20 +1293,13 @@ class TaskInProgress { } public long getMapInputSize() { - if (isUberTask()) { - int numSplits = splitInfo.length; - long sumInputDataLength = 0; - for (int i = 0; i < numSplits; ++i) { - sumInputDataLength += splitInfo[i].getInputDataLength(); - } - return sumInputDataLength; - } else if (isMapTask() && !jobSetup && !jobCleanup) { - return splitInfo[0].getInputDataLength(); + if(isMapTask() && !jobSetup && !jobCleanup) { + return splitInfo.getInputDataLength(); } else { return 0; } } - + /** * Compare most recent task attempts dispatch time to current system time so * that task progress rate will slow down as time proceeds even if no progress Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Mar 8 05:54:02 2011 @@ -41,15 +41,10 @@ public abstract class TaskStatus impleme static final Log LOG = LogFactory.getLog(TaskStatus.class.getName()); - // what kind of TaskStatus is it? + //enumeration for reporting current phase of a task. @InterfaceAudience.Private @InterfaceStability.Unstable - public static enum Type {MAP, REDUCE, UBER} - - // enumeration for reporting current phase of a task. - @InterfaceAudience.Private - @InterfaceStability.Unstable - public static enum Phase {STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP} + public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE, CLEANUP} // what state is the task in? @InterfaceAudience.Private @@ -110,7 +105,6 @@ public abstract class TaskStatus impleme public TaskAttemptID getTaskID() { return taskid; } public abstract boolean getIsMap(); - public abstract boolean getIsUber(); public int getNumSlots() { return numSlots; } @@ -501,61 +495,46 @@ public abstract class TaskStatus impleme ////////////////////////////////////////////////////////////////////////////// // Factory-like methods to create/read/write appropriate TaskStatus objects ////////////////////////////////////////////////////////////////////////////// - - // this is the main one used by TT.TIP, JIP, and (apparently) all the - // relevant tests (CapacityTestUtils, TestFairScheduler, TestClusterStatus, - // TestJobInProgress, TestJobTrackerInstrumentation, FakeObjectUtilities) - // [also formerly MapTask and ReduceTask, but no longer] + + static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, + float progress, int numSlots, + State runState, String diagnosticInfo, + String stateString, String taskTracker, + Phase phase, Counters counters) + throws IOException { + boolean isMap = in.readBoolean(); + return createTaskStatus(isMap, taskId, progress, numSlots, runState, + diagnosticInfo, stateString, taskTracker, phase, + counters); + } + static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress, int numSlots, State runState, String diagnosticInfo, String stateString, String taskTracker, Phase phase, Counters counters) { return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, - diagnosticInfo, stateString, taskTracker, + diagnosticInfo, stateString, taskTracker, phase, counters) : new ReduceTaskStatus(taskId, progress, numSlots, runState, diagnosticInfo, stateString, taskTracker, phase, counters); } - - // used only in default ctors of Task (also formerly MapTask, ReduceTask) and - // readTaskStatus() below - static TaskStatus createTaskStatus(Type tsType) { - return (tsType == TaskStatus.Type.MAP) - ? new MapTaskStatus() - : (tsType == TaskStatus.Type.REDUCE) - ? new ReduceTaskStatus() - : new UberTaskStatus(); + + static TaskStatus createTaskStatus(boolean isMap) { + return (isMap) ? new MapTaskStatus() : new ReduceTaskStatus(); } static TaskStatus readTaskStatus(DataInput in) throws IOException { boolean isMap = in.readBoolean(); - boolean isUber = in.readBoolean(); - Type tsType = isMap - ? TaskStatus.Type.MAP - : isUber - ? TaskStatus.Type.UBER - : TaskStatus.Type.REDUCE; - TaskStatus taskStatus = createTaskStatus(tsType); + TaskStatus taskStatus = createTaskStatus(isMap); taskStatus.readFields(in); return taskStatus; } static void writeTaskStatus(DataOutput out, TaskStatus taskStatus) throws IOException { -/* LATER - * //GRR FIXME: longer-term, just store tsType as member var (but then need - * // to modify or add new ctor: used in many places) - * Type tsType = taskStatus.getIsUber() - * ? TaskStatus.Type.UBER - * : taskStatus.getIsMap() - * ? TaskStatus.Type.MAP - * : TaskStatus.Type.REDUCE; - * WritableUtils.writeEnum(out, tsType); (or enum.ordinal() [as in MR-901] or ...) - */ out.writeBoolean(taskStatus.getIsMap()); - out.writeBoolean(taskStatus.getIsUber()); taskStatus.write(out); } } Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Mar 8 05:54:02 2011 @@ -827,7 +827,7 @@ public class TaskTracker f = rjob.getFetchStatus(); for (TaskInProgress tip : rjob.tasks) { Task task = tip.getTask(); - if (!task.isMapTask() && !task.isUberTask()) { + if (!task.isMapTask()) { if (((ReduceTask)task).getPhase() == TaskStatus.Phase.SHUFFLE) { if (rjob.getFetchStatus() == null) { @@ -2502,27 +2502,21 @@ public class TaskTracker this.lastProgressReport = System.currentTimeMillis(); this.defaultJobConf = conf; localJobConf = null; - if (task.isUberTask()) { - taskStatus = new UberTaskStatus( - task.getTaskID(), 0.0f, - task.getNumSlotsRequired(), task.getState(), - diagnosticInfo.toString(), "initializing", getName(), - TaskStatus.Phase.MAP, task.getCounters()); - } else { - taskStatus = TaskStatus.createTaskStatus( - task.isMapTask(), task.getTaskID(), 0.0f, - task.getNumSlotsRequired(), task.getState(), - diagnosticInfo.toString(), "initializing", getName(), - task.isTaskCleanupTask() - ? TaskStatus.Phase.CLEANUP - : task.isMapTask() - ? TaskStatus.Phase.MAP - : TaskStatus.Phase.SHUFFLE, - task.getCounters()); - } + taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), + 0.0f, + task.getNumSlotsRequired(), + task.getState(), + diagnosticInfo.toString(), + "initializing", + getName(), + task.isTaskCleanupTask() ? + TaskStatus.Phase.CLEANUP : + task.isMapTask()? TaskStatus.Phase.MAP: + TaskStatus.Phase.SHUFFLE, + task.getCounters()); taskTimeout = (10 * 60 * 1000); } - + void localizeTask(Task task) throws IOException{ FileSystem localFs = FileSystem.getLocal(fConf); @@ -2639,7 +2633,7 @@ public class TaskTracker this.taskStatus.setStartTime(System.currentTimeMillis()); } else { LOG.info("Not launching task: " + task.getTaskID() + - " since its state is " + this.taskStatus.getRunState()); + " since it's state is " + this.taskStatus.getRunState()); } } @@ -3212,7 +3206,7 @@ public class TaskTracker LOG.debug("JVM with ID : " + jvmId + " asked for a task"); if (!jvmManager.isJvmKnown(jvmId)) { - LOG.info("Killing unknown JVM " + jvmId); //GRR FIXME: bug? no (apparent) killing going on here... + LOG.info("Killing unknown JVM " + jvmId); return new JvmTask(null, true); } RunningJob rjob = runningJobs.get(jvmId.getJobId()); @@ -3243,7 +3237,7 @@ public class TaskTracker public synchronized boolean statusUpdate(TaskAttemptID taskid, TaskStatus taskStatus) throws IOException { - TaskInProgress tip = tasks.get(taskid); // TT.TIP, not TaskInProgress.java + TaskInProgress tip = tasks.get(taskid); if (tip != null) { tip.reportProgress(taskStatus); myInstrumentation.statusUpdate(tip.getTask(), taskStatus); @@ -3691,15 +3685,17 @@ public class TaskTracker } userName = rjob.jobConf.getUser(); } - - // Map-output filename ("... /file.out") - StringBuilder sb = new StringBuilder(); - sb.append(TaskTracker.getIntermediateOutputDir(userName, jobId, mapId)) - .append("/").append(MapOutputFile.MAP_OUTPUT_FILENAME_STRING); - Path mapOutputFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf); - // Index filename ("... /file.out.index") - sb.append(MapOutputFile.MAP_OUTPUT_INDEX_SUFFIX_STRING); - Path indexFileName = lDirAlloc.getLocalPathToRead(sb.toString(), conf); + // Index file + Path indexFileName = + lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( + userName, jobId, mapId) + + "/file.out.index", conf); + + // Map-output file + Path mapOutputFileName = + lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir( + userName, jobId, mapId) + + "/file.out", conf); /** * Read the index file to get the information about where the map-output Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/MRJobConfig.java Tue Mar 8 05:54:02 2011 @@ -271,19 +271,4 @@ public interface MRJobConfig { "mapreduce.job.submithostname"; public static final String JOB_SUBMITHOSTADDR = "mapreduce.job.submithostaddress"; - - public static final String JOB_UBERTASK_ENABLE = - "mapreduce.job.ubertask.enable"; - public static final String JOB_UBERTASK_MAXMAPS = - "mapreduce.job.ubertask.maxmaps"; - public static final String JOB_UBERTASK_MAXREDUCES = - "mapreduce.job.ubertask.maxreduces"; - public static final String JOB_UBERTASK_MAXBYTES = - "mapreduce.job.ubertask.maxbytes"; - public static final String UBERTASK_JAVA_OPTS = - "mapreduce.ubertask.child.java.opts"; // or mapreduce.uber.java.opts? - public static final String UBERTASK_ULIMIT = - "mapreduce.ubertask.child.ulimit"; // or mapreduce.uber.ulimit? - public static final String UBERTASK_ENV = - "mapreduce.ubertask.child.env"; // or mapreduce.uber.env? } Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr Tue Mar 8 05:54:02 2011 @@ -64,9 +64,6 @@ {"name": "launchTime", "type": "long"}, {"name": "totalMaps", "type": "int"}, {"name": "totalReduces", "type": "int"}, - {"name": "isUber", "type": "boolean"}, - {"name": "numUberSubMaps", "type": "int"}, - {"name": "numUberSubReduces", "type": "int"}, {"name": "jobStatus", "type": "string"} ] }, Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/HistoryViewer.java Tue Mar 8 05:54:02 2011 @@ -526,8 +526,6 @@ public class HistoryViewer { int totalReduces = 0; int totalCleanups = 0; int totalSetups = 0; - int numUberSubMaps = 0; - int numUberSubReduces = 0; int numFailedMaps = 0; int numKilledMaps = 0; int numFailedReduces = 0; @@ -546,20 +544,15 @@ public class HistoryViewer { long cleanupFinished = 0; long setupStarted = 0; long setupFinished = 0; - boolean isUber = false; - + /** Get total maps */ public int getTotalMaps() { return totalMaps; } /** Get total reduces */ public int getTotalReduces() { return totalReduces; } - /** Get number of cleanup tasks */ + /** Get number of clean up tasks */ public int getTotalCleanups() { return totalCleanups; } - /** Get number of setup tasks */ + /** Get number of set up tasks */ public int getTotalSetups() { return totalSetups; } - /** Get number of map subtasks within UberTask */ - public int getNumUberSubMaps() { return numUberSubMaps; } - /** Get number of reduce subtasks within UberTask */ - public int getNumUberSubReduces() { return numUberSubReduces; } /** Get number of failed maps */ public int getNumFailedMaps() { return numFailedMaps; } /** Get number of killed maps */ @@ -574,11 +567,11 @@ public class HistoryViewer { public int getNumFailedCleanups() { return numFailedCleanups; } /** Get number of killed cleanup tasks */ public int getNumKilledCleanups() { return numKilledCleanups; } - /** Get number of finished setup tasks */ + /** Get number of finished set up tasks */ public int getNumFinishedSetups() { return numFinishedSetups; } - /** Get number of failed setup tasks */ + /** Get number of failed set up tasks */ public int getNumFailedSetups() { return numFailedSetups; } - /** Get number of killed setup tasks */ + /** Get number of killed set up tasks */ public int getNumKilledSetups() { return numKilledSetups; } /** Get number of maps that were started */ public long getMapStarted() { return mapStarted; } @@ -596,10 +589,8 @@ public class HistoryViewer { public long getSetupStarted() { return setupStarted; } /** Get number of setup tasks that finished */ public long getSetupFinished() { return setupFinished; } - /** Get job's UberTask/non-UberTask status */ - public boolean isUber() { return isUber; } - /** Create summary information for the parsed job */ + /** Create summary information for the parsed job */ public SummarizedJob(JobInfo job) { tasks = job.getAllTasks(); Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Tue Mar 8 05:54:02 2011 @@ -302,9 +302,6 @@ public class JobHistoryParser { info.launchTime = event.getLaunchTime(); info.totalMaps = event.getTotalMaps(); info.totalReduces = event.getTotalReduces(); - info.isUber = event.getIsUber(); - info.numUberSubMaps = event.getNumUberSubMaps(); - info.numUberSubReduces = event.getNumUberSubReduces(); } private void handleJobInfoChangeEvent(JobInfoChangeEvent event) { @@ -337,9 +334,6 @@ public class JobHistoryParser { long launchTime; int totalMaps; int totalReduces; - boolean isUber; - int numUberSubMaps; - int numUberSubReduces; int failedMaps; int failedReduces; int finishedMaps; @@ -358,9 +352,8 @@ public class JobHistoryParser { */ public JobInfo() { submitTime = launchTime = finishTime = -1; - isUber = false; - totalMaps = totalReduces = numUberSubMaps = numUberSubReduces = 0; - failedMaps = failedReduces = finishedMaps = finishedReduces = 0; + totalMaps = totalReduces = failedMaps = failedReduces = 0; + finishedMaps = finishedReduces = 0; username = jobname = jobConfPath = jobQueueName = ""; tasksMap = new HashMap(); jobACLs = new HashMap(); @@ -377,7 +370,6 @@ public class JobHistoryParser { System.out.println("PRIORITY: " + priority); System.out.println("TOTAL_MAPS: " + totalMaps); System.out.println("TOTAL_REDUCES: " + totalReduces); - //GRR FIXME: add UBER_SUBMAPS and UBER_SUBREDUCES? (or only if isUber == true? coordinate with TaskInfo printAll() changes) System.out.println("MAP_COUNTERS:" + mapCounters.toString()); System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString()); System.out.println("TOTAL_COUNTERS: " + totalCounters.toString()); @@ -403,24 +395,18 @@ public class JobHistoryParser { public String getJobConfPath() { return jobConfPath; } /** Get the job launch time */ public long getLaunchTime() { return launchTime; } - /** Get the total number of "real" maps */ - public int getTotalMaps() { return totalMaps; } - /** Get the total number of "real" reduces */ - public int getTotalReduces() { return totalReduces; } - /** Was the job small enough to be converted to an UberTask? */ - public boolean getIsUber() { return isUber; } - /** Get the number of sub-MapTasks within the UberTask */ - public int getNumUberSubMaps() { return numUberSubMaps; } - /** Get the number of sub-ReduceTasks within the UberTask */ - public int getNumUberSubReduces() { return numUberSubReduces; } + /** Get the total number of maps */ + public long getTotalMaps() { return totalMaps; } + /** Get the total number of reduces */ + public long getTotalReduces() { return totalReduces; } /** Get the total number of failed maps */ - public int getFailedMaps() { return failedMaps; } + public long getFailedMaps() { return failedMaps; } /** Get the number of failed reduces */ - public int getFailedReduces() { return failedReduces; } + public long getFailedReduces() { return failedReduces; } /** Get the number of finished maps */ - public int getFinishedMaps() { return finishedMaps; } + public long getFinishedMaps() { return finishedMaps; } /** Get the number of finished reduces */ - public int getFinishedReduces() { return finishedReduces; } + public long getFinishedReduces() { return finishedReduces; } /** Get the job status */ public String getJobStatus() { return jobStatus; } public String getErrorInfo() { return errorInfo; } Modified: hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Tue Mar 8 05:54:02 2011 @@ -44,15 +44,11 @@ public class JobInitedEvent implements H * @param jobStatus */ public JobInitedEvent(JobID id, long launchTime, int totalMaps, - int totalReduces, boolean isUber, int numUberSubMaps, - int numUberSubReduces, String jobStatus) { + int totalReduces, String jobStatus) { datum.jobid = new Utf8(id.toString()); datum.launchTime = launchTime; datum.totalMaps = totalMaps; datum.totalReduces = totalReduces; - datum.isUber = isUber; - datum.numUberSubMaps = numUberSubMaps; - datum.numUberSubReduces = numUberSubReduces; datum.jobStatus = new Utf8(jobStatus); } @@ -65,16 +61,10 @@ public class JobInitedEvent implements H public JobID getJobId() { return JobID.forName(datum.jobid.toString()); } /** Get the launch time */ public long getLaunchTime() { return datum.launchTime; } - /** Get the total number of "real" maps */ + /** Get the total number of maps */ public int getTotalMaps() { return datum.totalMaps; } - /** Get the total number of "real" reduces */ + /** Get the total number of reduces */ public int getTotalReduces() { return datum.totalReduces; } - /** Was the job small enough to be converted to an UberTask? */ - public boolean getIsUber() { return datum.isUber; } - /** Get the number of sub-MapTasks within the UberTask */ - public int getNumUberSubMaps() { return datum.numUberSubMaps; } - /** Get the number of sub-ReduceTasks within the UberTask */ - public int getNumUberSubReduces() { return datum.numUberSubReduces; } /** Get the status */ public String getStatus() { return datum.jobStatus.toString(); } /** Get the event type */ Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Tue Mar 8 05:54:02 2011 @@ -241,12 +241,8 @@ public class TestJobQueueTaskScheduler e } @Override - public boolean getIsUber() { - return t.isUberTask(); - } - - @Override public void addFetchFailedMap(TaskAttemptID mapTaskId) { + } }; status.setRunState(TaskStatus.State.RUNNING); Modified: hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/test/mapred/org/apache/hadoop/mapred/TestTaskStatus.java Tue Mar 8 05:54:02 2011 @@ -103,7 +103,7 @@ public class TestTaskStatus { // check the default case String test = "hi"; final int maxSize = 16; - TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null, + TaskStatus status = new TaskStatus(null, 0, 0, null, test, test, null, null, null) { @Override protected int getMaxStringSize() { @@ -118,11 +118,6 @@ public class TestTaskStatus { public boolean getIsMap() { return false; } - - @Override - public boolean getIsUber() { - return false; - } }; assertEquals("Small diagnostic info test failed", status.getDiagnosticInfo(), test); @@ -203,11 +198,6 @@ public class TestTaskStatus { public boolean getIsMap() { return false; } - - @Override - public boolean getIsUber() { - return false; - } }; assertEquals("Large diagnostic info test failed", maxSize, status.getDiagnosticInfo().length()); Modified: hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/tools/org/apache/hadoop/tools/rumen/Job20LineHistoryEventEmitter.java Tue Mar 8 05:54:02 2011 @@ -125,15 +125,10 @@ public class Job20LineHistoryEventEmitte String status = line.get("JOB_STATUS"); String totalMaps = line.get("TOTAL_MAPS"); String totalReduces = line.get("TOTAL_REDUCES"); - // note: UberTask playback not supported since uber data not yet logged - boolean isUber = false; - int numUberSubMaps = 0; - int numUberSubReduces = 0; if (launchTime != null && totalMaps != null && totalReduces != null) { - return new JobInitedEvent(jobID, Long.parseLong(launchTime), - Integer.parseInt(totalMaps), Integer.parseInt(totalReduces), - isUber, numUberSubMaps, numUberSubReduces, status); + return new JobInitedEvent(jobID, Long.parseLong(launchTime), Integer + .parseInt(totalMaps), Integer.parseInt(totalReduces), status); } return null; Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetails.jsp Tue Mar 8 05:54:02 2011 @@ -365,16 +365,10 @@ "Killed" + "Failed/Killed
Task Attempts
\n"); - if (job.getUberMode()) { - /* placeholder until true task- and attempt-level uber info available */ - printTaskSummary(out, jobId, "uber", status.reduceProgress(), - job.getTasks(TaskType.REDUCE)); - } else { - printTaskSummary(out, jobId, "map", status.mapProgress(), - job.getTasks(TaskType.MAP)); - printTaskSummary(out, jobId, "reduce", status.reduceProgress(), - job.getTasks(TaskType.REDUCE)); - } + printTaskSummary(out, jobId, "map", status.mapProgress(), + job.getTasks(TaskType.MAP)); + printTaskSummary(out, jobId, "reduce", status.reduceProgress(), + job.getTasks(TaskType.REDUCE)); out.print("\n"); %> @@ -427,7 +421,6 @@ %> -<%if (job.getTasks(TaskType.MAP).length > 0) { %>
Map Completion Graph - <% if("off".equals(request.getParameter("map.graph"))) { @@ -449,14 +442,10 @@ if("off".equals(session.getAttribute("ma width="<%=TaskGraphServlet.width + 2 * TaskGraphServlet.xmargin%>" height="<%=TaskGraphServlet.height + 3 * TaskGraphServlet.ymargin%>" style="width:100%" type="image/svg+xml" pluginspage="http://www.adobe.com/svg/viewer/install/" /> -<%} }%> +<%}%> -<%if (job.getTasks(TaskType.REDUCE).length > 0) { %> -<%if (job.getUberMode()) { %> -
UberTask Completion Graph - -<%} else { %> +<%if(job.getTasks(TaskType.REDUCE).length > 0) { %>
Reduce Completion Graph - -<%}%> <%if("off".equals(session.getAttribute("reduce.graph"))) { %> open <%} else { %> Modified: hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp?rev=1079192&r1=1079191&r2=1079192&view=diff ============================================================================== --- hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp (original) +++ hadoop/mapreduce/branches/yahoo-merge/src/webapps/job/jobdetailshistory.jsp Tue Mar 8 05:54:02 2011 @@ -56,12 +56,6 @@ if (job == null) { return; } - // MR-1220 FIXME (LATER): to fully integrate uberization, need task- - // and attempt-level JobHistory/Avro/etc. changes to multiple Event - // types; instead, going with top-level hack for now - boolean isUber = job.getIsUber(); - int numUberSubMaps = job.getNumUberSubMaps(); - int numUberSubReduces = job.getNumUberSubReduces(); if (job.getJobStatus().equals("FAILED")) reasonforFailure = job.getErrorInfo(); %> @@ -96,7 +90,7 @@
- + @@ -112,10 +106,6 @@ - -<% - if (!isUber) { -%> @@ -141,36 +131,6 @@ - -<% - } else /* isUber */ { -%> - - - - - - - - - - - - - - - - - - -<% - } -%> -
KindTotal Tasks (successful+failed+killed)Successful tasksFailed tasksKilled tasksStart TimeFinish TimeKindTotal Tasks(successful+failed+killed)Successful tasksFailed tasksKilled tasksStart TimeFinish Time
Setup<%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getSetupFinished(), sj.getSetupStarted()) %>
Map <%=sj.getTotalMaps()%><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %> <%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %>
Uber - <%=sj.getTotalReduces()%> - <%=job.getFinishedReduces()%> - <%=sj.getNumFailedReduces()%> - <%=sj.getNumKilledReduces()%><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceStarted(), 0) %><%=StringUtils.getFormattedTimeWithDiff(dateFormat, sj.getReduceFinished(), sj.getReduceStarted()) %>
Map subtasks<%=numUberSubMaps%>
Reduce subtasks<%=numUberSubReduces%>
Cleanup