Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B72024750 for ; Fri, 10 Jun 2011 09:17:01 +0000 (UTC) Received: (qmail 23716 invoked by uid 500); 10 Jun 2011 09:17:01 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 23648 invoked by uid 500); 10 Jun 2011 09:17:01 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 23640 invoked by uid 99); 10 Jun 2011 09:17:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2011 09:17:00 +0000 X-ASF-Spam-Status: No, hits=-1999.3 required=5.0 tests=ALL_TRUSTED,FRT_TODAY2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Jun 2011 09:16:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id F343F23888FD; Fri, 10 Jun 2011 09:16:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1134246 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/... Date: Fri, 10 Jun 2011 09:16:37 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110610091637.F343F23888FD@eris.apache.org> Author: sharad Date: Fri Jun 10 09:16:37 2011 New Revision: 1134246 URL: http://svn.apache.org/viewvc?rev=1134246&view=rev Log: MAPREDUCE-2582. Cleanup JobHistory event generation. Contributed by Siddharth Seth. Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original) +++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri Jun 10 09:16:37 2011 @@ -4,7 +4,9 @@ Trunk (unreleased changes) MAPREDUCE-279 - + + MAPREDUCE-2582. Cleanup JobHistory event generation.(Siddharth Seth via sharad) + Add ability to includes src files in assembly target for maven (Luke Lu via mahadev) Added few job diagnostic messages. (sharad) Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Fri Jun 10 09:16:37 2011 @@ -365,7 +365,7 @@ public class JobHistoryEventHandler exte throw new YarnException(e); } // check for done - if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) { + if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) { try { JobFinishedEvent jFinishedEvent = (JobFinishedEvent) event .getHistoryEvent(); @@ -378,6 +378,19 @@ public class JobHistoryEventHandler exte throw new YarnException(e); } } + if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED + || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) { + try { + JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event + .getHistoryEvent(); + mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime()); + mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps()); + mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces()); + closeEventWriter(event.getJobID()); + } catch (IOException e) { + throw new YarnException(e); + } + } } } @@ -433,6 +446,11 @@ public class JobHistoryEventHandler exte if (mi == null) { throw new IOException("No MetaInfo found for JobId: [" + jobId + "]"); } + if (!mi.isWriterActive()) { + throw new IOException( + "Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events for JobId: [" + + jobId + "]"); + } try { mi.closeWriter(); } catch (IOException e) { @@ -523,6 +541,8 @@ public class JobHistoryEventHandler exte JobSummary getJobSummary() { return jobSummary; } + boolean isWriterActive() {return writer != null ; } + void closeWriter() throws IOException { synchronized (lock) { if (writer != null) { Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Jun 10 09:16:37 2011 @@ -36,6 +36,7 @@ import java.util.concurrent.locks.Reentr import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; @@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.TaskA import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent; @@ -333,6 +335,7 @@ public class JobImpl implements org.apac private int failedReduceTaskCount = 0; private int killedMapTaskCount = 0; private int killedReduceTaskCount = 0; + private long submitTime; private long startTime; private long finishTime; private float setupProgress; @@ -709,7 +712,7 @@ public class JobImpl implements org.apac */ @Override public JobState transition(JobImpl job, JobEvent event) { - job.startTime = job.clock.getTime(); + job.submitTime = job.clock.getTime(); job.metrics.submittedJob(job); job.metrics.preparingJob(job); try { @@ -717,14 +720,14 @@ public class JobImpl implements org.apac job.fs = FileSystem.get(job.conf); //log to job history - //TODO_JH_Validate the values being sent here (along with defaults). Ideally for all JH evnts. - JobSubmittedEvent jse = - new JobSubmittedEvent(job.oldJobId, + JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, job.conf.get(MRJobConfig.JOB_NAME, "test"), - job.conf.get(MRJobConfig.USER_NAME,"mapred"), job.startTime, - job.remoteJobConfFile.toString(), job.jobACLs, - job.conf.get(MRJobConfig.QUEUE_NAME,"test")); + job.conf.get(MRJobConfig.USER_NAME, "mapred"), + job.submitTime, + job.remoteJobConfFile.toString(), + job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "test")); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); + //TODO JH Verify jobACLs, UserName via UGI? TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); job.numMapTasks = taskSplitMetaInfo.length; @@ -868,6 +871,7 @@ public class JobImpl implements org.apac job.metrics.endPreparingJob(job); return JobState.INITED; + //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition) } catch (Exception e) { LOG.warn("Job init failed", e); @@ -1025,8 +1029,11 @@ public class JobImpl implements org.apac job.startTime, job.numMapTasks, job.numReduceTasks, job.isUber, 0, 0, // FIXME: lose latter two args again (old-style uber junk: needs to go along with 98% of other old-style uber junk) - JobState.NEW.toString()); + job.getState().toString()); //Will transition to state running. Currently in INITED job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie)); + JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId, + job.submitTime, job.startTime); + job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice)); job.metrics.runningJob(job); } } @@ -1038,26 +1045,29 @@ public class JobImpl implements org.apac } catch (IOException e) { LOG.warn("Could not abortJob", e); } + if (finishTime == 0) setFinishTime(); cleanupProgress = 1.0f; JobUnsuccessfulCompletionEvent unsuccessfulJobEvent = new JobUnsuccessfulCompletionEvent(oldJobId, finishTime, succeededMapTaskCount, - numReduceTasks, //TODO finishedReduceTasks + succeededReduceTaskCount, finalState.toString()); eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent)); + } - JobFinishedEvent jfe = - new JobFinishedEvent(oldJobId, - finishTime, - succeededMapTaskCount, - succeededReduceTaskCount, failedMapTaskCount, - failedReduceTaskCount, - TypeConverter.fromYarn(getCounters()), //TODO replace with MapCounter - TypeConverter.fromYarn(getCounters()), // TODO reduceCounters - TypeConverter.fromYarn(getCounters())); - eventHandler.handle(new JobHistoryEvent(jobId, jfe)); - //TODO Does this require a JobFinishedEvent? + // JobFinishedEvent triggers the move of the history file out of the staging + // area. May need to create a new event type for this if JobFinished should + // not be generated for KilledJobs, etc. + private static JobFinishedEvent createJobFinishedEvent(JobImpl job) { + JobFinishedEvent jfe = new JobFinishedEvent( + job.oldJobId, job.finishTime, + job.succeededMapTaskCount, job.succeededReduceTaskCount, + job.failedMapTaskCount, job.failedReduceTaskCount, + TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounters + TypeConverter.fromYarn(job.getCounters()), //TODO replace with ReduceCoutners + TypeConverter.fromYarn(job.getCounters())); + return jfe; } // Task-start has been moved out of InitTransition, so this arc simply @@ -1066,10 +1076,11 @@ public class JobImpl implements org.apac implements SingleArcTransition { @Override public void transition(JobImpl job, JobEvent event) { + job.setFinishTime(); JobUnsuccessfulCompletionEvent failedEvent = new JobUnsuccessfulCompletionEvent(job.oldJobId, job.finishTime, 0, 0, - org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state + JobState.KILLED.toString()); job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.finished(JobState.KILLED); } @@ -1187,14 +1198,6 @@ public class JobImpl implements org.apac job.failedReduceTaskCount*100 > job.allowedReduceFailuresPercent*job.numReduceTasks) { job.setFinishTime(); - JobUnsuccessfulCompletionEvent failedEvent = - new JobUnsuccessfulCompletionEvent(job.oldJobId, - job.finishTime, - job.failedMapTaskCount, - job.failedReduceTaskCount, //TODO finishedReduceTasks - org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct state - job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); - //TODO This event not likely required - sent via abort(). String diagnosticMsg = "Job failed as tasks failed. " + "failedMaps:" + job.failedMapTaskCount + @@ -1214,17 +1217,9 @@ public class JobImpl implements org.apac } // Log job-history job.setFinishTime(); - JobFinishedEvent jfe = - new JobFinishedEvent(TypeConverter.fromYarn(job.jobId), - job.finishTime, - job.succeededMapTaskCount, job.numReduceTasks, job.failedMapTaskCount, - job.failedReduceTaskCount, - TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter - TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters - TypeConverter.fromYarn(job.getCounters())); + JobFinishedEvent jfe = createJobFinishedEvent(job); LOG.info("Calling handler for JobFinishedEvent "); job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe)); - return job.finished(JobState.SUCCEEDED); } @@ -1302,7 +1297,13 @@ public class JobImpl implements org.apac SingleArcTransition { @Override public void transition(JobImpl job, JobEvent event) { - //TODO JH Event? + //TODO Is this JH event required. + job.setFinishTime(); + JobUnsuccessfulCompletionEvent failedEvent = + new JobUnsuccessfulCompletionEvent(job.oldJobId, + job.finishTime, 0, 0, + JobState.ERROR.toString()); + job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent)); job.finished(JobState.ERROR); } } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Fri Jun 10 09:16:37 2011 @@ -75,4 +75,7 @@ public class MapTaskImpl extends TaskImp return TaskType.MAP; } + protected TaskSplitMetaInfo getTaskSplitMetaInfo() { + return this.taskSplitMetaInfo; + } } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Jun 10 09:16:37 2011 @@ -850,6 +850,18 @@ public abstract class TaskAttemptImpl im } } + private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent( + TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) { + TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent( + TypeConverter.fromYarn(taskAttempt.attemptId), + TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), + attemptState.toString(), taskAttempt.finishTime, + taskAttempt.containerMgrAddress, + taskAttempt.reportedStatus.diagnosticInfo.toString()); + //TODO Different constructor - allSplits + return tauce; + } + private static String[] racks = new String[] {NetworkTopology.DEFAULT_RACK}; private static class RequestContainerTransition implements SingleArcTransition { @@ -956,6 +968,10 @@ public abstract class TaskAttemptImpl im TaskEventType.T_ATTEMPT_KILLED)); break; } + TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( + taskAttempt, finalState); + taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId + .getTaskId().getJobId(), tauce)); } } @@ -970,6 +986,7 @@ public abstract class TaskAttemptImpl im // for it taskAttempt.taskAttemptListener.register( taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID); + //TODO Resolve to host / IP in case of a local address. InetSocketAddress nodeHttpInetAddr = NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO: // Costly? @@ -1058,17 +1075,11 @@ public abstract class TaskAttemptImpl im TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); - TaskAttemptUnsuccessfulCompletionEvent ta = - new TaskAttemptUnsuccessfulCompletionEvent( - TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - TaskAttemptState.FAILED.toString(), - taskAttempt.finishTime, - "hostname", - taskAttempt.reportedStatus.diagnosticInfo.toString()); - taskAttempt.eventHandler.handle( - new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta)); - taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); + TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( + taskAttempt, TaskAttemptState.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId + .getTaskId().getJobId(), tauce)); +// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not handling failed map/reduce events. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -1081,9 +1092,9 @@ public abstract class TaskAttemptImpl im new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), state.toString(), - finishTime, - finishTime, "hostname", - state.toString(), + finishTime, //TODO TaskAttemptStatus changes. MapFinishTime + finishTime, this.containerMgrAddress, + state.toString(), //TODO state is a progress string. TypeConverter.fromYarn(getCounters()),null); eventHandler.handle( new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); @@ -1092,9 +1103,9 @@ public abstract class TaskAttemptImpl im new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), state.toString(), - finishTime, - finishTime, - finishTime, "hostname", + finishTime, //TODO TaskAttemptStatus changes. ShuffleFinishTime + finishTime, //TODO TaskAttemptStatus changes. SortFinishTime + finishTime, this.containerMgrAddress, state.toString(), TypeConverter.fromYarn(getCounters()),null); eventHandler.handle( @@ -1110,6 +1121,10 @@ public abstract class TaskAttemptImpl im taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt"); //set the finish time taskAttempt.setFinishTime(); + TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( + taskAttempt, TaskAttemptState.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId + .getTaskId().getJobId(), tauce)); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); } @@ -1123,17 +1138,11 @@ public abstract class TaskAttemptImpl im TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); - TaskAttemptUnsuccessfulCompletionEvent tke = - new TaskAttemptUnsuccessfulCompletionEvent( - TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - TaskAttemptState.KILLED.toString(), - taskAttempt.finishTime, - TaskAttemptState.KILLED.toString(), - taskAttempt.reportedStatus.diagnosticInfo.toString()); - taskAttempt.eventHandler.handle( - new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke)); - taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); + TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent( + taskAttempt, TaskAttemptState.KILLED); + taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId + .getTaskId().getJobId(), tauce)); +// taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Jun 10 09:16:37 2011 @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; @@ -95,6 +96,7 @@ public abstract class TaskImpl implement private final Lock readLock; private final Lock writeLock; private final MRAppMetrics metrics; + private long scheduledTime; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); @@ -386,10 +388,14 @@ public abstract class TaskImpl implement launchTime = at.getLaunchTime(); } } + if (launchTime == 0) { + return this.scheduledTime; + } return launchTime; } //this is always called in read/write lock + //TODO Verify behaviour is Task is killed (no finished attempt) private long getFinishTime() { if (!isFinished()) { return 0; @@ -404,6 +410,20 @@ public abstract class TaskImpl implement return finishTime; } + private long getFinishTime(TaskAttemptId taId) { + if (taId == null) { + return clock.getTime(); + } + long finishTime = 0; + for (TaskAttempt at : attempts.values()) { + //select the max finish time of all attempts + if (at.getID().equals(taId)) { + return at.getFinishTime(); + } + } + return finishTime; + } + private TaskState finished(TaskState finalState) { if (getState() == TaskState.RUNNING) { metrics.endRunningTask(this); @@ -558,20 +578,55 @@ public abstract class TaskImpl implement } } + private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) { + TaskFinishedEvent tfe = + new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), + task.getFinishTime(task.successfulAttempt), + TypeConverter.fromYarn(task.taskId.getTaskType()), + taskState.toString(), + TypeConverter.fromYarn(task.getCounters())); + return tfe; + } + + private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, String error, TaskState taskState, TaskAttemptId taId) { + TaskFailedEvent taskFailedEvent = new TaskFailedEvent( + TypeConverter.fromYarn(task.taskId), + // Hack since getFinishTime needs isFinished to be true and that doesn't happen till after the transition. + task.getFinishTime(taId), + TypeConverter.fromYarn(task.getType()), + error == null ? "" : error, + taskState.toString(), + taId == null ? null : TypeConverter.fromYarn(taId)); + return taskFailedEvent; + } + private static class InitialScheduleTransition implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { - TaskStartedEvent tse = new TaskStartedEvent(TypeConverter - .fromYarn(task.taskId), task.getLaunchTime(), TypeConverter - .fromYarn(task.taskId.getTaskType()), TaskState.RUNNING.toString()); - task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse)); - //TODO This is a transition from NEW to SCHEDULED, not RUNNING - task.addAndScheduleAttempt(); + task.scheduledTime = task.clock.getTime(); + TaskStartedEvent tse = new TaskStartedEvent( + TypeConverter.fromYarn(task.taskId), task.getLaunchTime(), + TypeConverter.fromYarn(task.taskId.getTaskType()), + task instanceof MapTaskImpl ? splitsAsString(((MapTaskImpl) task) //TODO Should not be accessing MapTaskImpl + .getTaskSplitMetaInfo().getLocations()) : ""); + task.eventHandler + .handle(new JobHistoryEvent(task.taskId.getJobId(), tse)); task.metrics.launchedTask(task); } + + private String splitsAsString(String[] splits) { + if (splits == null || splits.length == 0) + return ""; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < splits.length; i++) { + if (i != 0) sb.append(","); + sb.append(splits[i]); + } + return sb.toString(); + } } // Used when creating a new attempt while one is already running. @@ -625,12 +680,7 @@ public abstract class TaskImpl implement task.taskId, TaskState.SUCCEEDED)); LOG.info("Task succeeded with attempt " + task.successfulAttempt); // issue kill to all other attempts - TaskFinishedEvent tfe = - new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), - task.getFinishTime(), - TypeConverter.fromYarn(task.taskId.getTaskType()), - TaskState.SUCCEEDED.toString(), - TypeConverter.fromYarn(task.getCounters())); + TaskFinishedEvent tfe = createTaskFinishedEvent(task, TaskState.SUCCEEDED); task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe)); for (TaskAttempt attempt : task.attempts.values()) { if (attempt.getID() != task.successfulAttempt && @@ -675,6 +725,11 @@ public abstract class TaskImpl implement TaskAttemptCompletionEventStatus.KILLED); // check whether all attempts are finished if (task.finishedAttempts == task.attempts.size()) { + TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, "", + finalState, null); //TODO JH verify failedAttempt null + task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), + taskFailedEvent)); + task.eventHandler.handle( new JobTaskEvent(task.taskId, finalState)); return finalState; @@ -704,13 +759,13 @@ public abstract class TaskImpl implement ((TaskTAttemptEvent) event).getTaskAttemptID(), TaskAttemptCompletionEventStatus.TIPFAILED); TaskTAttemptEvent ev = (TaskTAttemptEvent) event; - TaskFinishedEvent tfi = - new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), - task.getFinishTime(), - TypeConverter.fromYarn(task.taskId.getTaskType()), - TaskState.FAILED.toString(), - TypeConverter.fromYarn(task.getCounters())); - task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfi)); + TaskAttemptId taId = ev.getTaskAttemptID(); + + //TODO JH Populate the error string. FailReason from TaskAttempt(taId) + TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, "", + TaskState.FAILED, taId); + task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), + taskFailedEvent)); task.eventHandler.handle( new JobTaskEvent(task.taskId, TaskState.FAILED)); return task.finished(TaskState.FAILED); @@ -761,12 +816,12 @@ public abstract class TaskImpl implement implements SingleArcTransition { @Override public void transition(TaskImpl task, TaskEvent event) { - TaskFinishedEvent tfe = - new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), - task.getFinishTime(), TypeConverter.fromYarn(task.taskId.getTaskType()), - TaskState.KILLED.toString(), TypeConverter.fromYarn(task - .getCounters())); - task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe)); + + TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null, + TaskState.KILLED, null); //TODO Verify failedAttemptId is null + task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), + taskFailedEvent)); + task.eventHandler.handle( new JobTaskEvent(task.taskId, TaskState.KILLED)); task.metrics.endWaitingTask(task); Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Fri Jun 10 09:16:37 2011 @@ -298,7 +298,7 @@ public class RecoveryService extends Com // send the status update event sendStatusUpdateEvent(aId, attInfo); - TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getState()); + TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus()); switch (state) { case SUCCEEDED: // send the done event Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java Fri Jun 10 09:16:37 2011 @@ -121,7 +121,7 @@ public class AppController extends Contr notFound($(JOB_ID)); } } catch (Exception e) { - badRequest(e.getMessage()); + badRequest(e.getMessage() == null ? e.getClass().getName() : e.getMessage()); } } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Fri Jun 10 09:16:37 2011 @@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobACLsManager; import org.apache.hadoop.mapreduce.JobACL; -import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; @@ -131,7 +130,7 @@ public class CompletedJob implements org } //History data is leisurely loaded when task level data is requested - private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute) { + private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute) throws IOException { if (jobInfo != null) { return; //data already loaded } @@ -144,6 +143,8 @@ public class CompletedJob implements org throw new YarnException("Could not load history file " + historyFileAbsolute, e); } + } else { + throw new IOException("History file not found"); } if (loadTasks) { Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1134246&r1=1134245&r2=1134246&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java Fri Jun 10 09:16:37 2011 @@ -625,7 +625,7 @@ public class JobHistory extends Abstract addToLoadedJobCache(job); return job; } catch (IOException e) { - throw new YarnException(e); + throw new YarnException("Could not find/load job: " + metaInfo.getJobIndexInfo().getJobId(), e); } } } @@ -979,6 +979,7 @@ public class JobHistory extends Abstract Job job = null; try { job = findJob(jobId); + //This could return a null job. } catch (IOException e) { throw new YarnException(e); }