Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BA01B1A34 for ; Tue, 26 Apr 2011 08:01:38 +0000 (UTC) Received: (qmail 44215 invoked by uid 500); 26 Apr 2011 08:01:38 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 44176 invoked by uid 500); 26 Apr 2011 08:01:36 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 44168 invoked by uid 99); 26 Apr 2011 08:01:35 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Apr 2011 08:01:35 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 26 Apr 2011 08:01:27 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 463F323889EA; Tue, 26 Apr 2011 08:01:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1096692 - in /hadoop/mapreduce/branches/MR-279/mr-client: hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-clie... Date: Tue, 26 Apr 2011 08:01:05 -0000 To: mapreduce-commits@hadoop.apache.org From: sharad@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110426080105.463F323889EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sharad Date: Tue Apr 26 08:01:04 2011 New Revision: 1096692 URL: http://svn.apache.org/viewvc?rev=1096692&view=rev Log: Recovery of MR Application Master from failures. Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Apr 26 08:01:04 2011 @@ -56,6 +56,7 @@ public class JobHistoryEventHandler exte implements EventHandler { private final AppContext context; + private final int startCount; private FileContext logDirFc; // log Dir FileContext private FileContext doneDirFc; // done Dir FileContext @@ -80,9 +81,10 @@ public class JobHistoryEventHandler exte public static final FsPermission HISTORY_FILE_PERMISSION = FsPermission.createImmutable((short) 0740); // rwxr----- - public JobHistoryEventHandler(AppContext context) { + public JobHistoryEventHandler(AppContext context, int startCount) { super("JobHistoryEventHandler"); this.context = context; + this.startCount = startCount; } @Override @@ -344,8 +346,10 @@ public class JobHistoryEventHandler exte /** * Get the job history file path */ - public static Path getJobHistoryFile(Path dir, JobId jobId) { - return new Path(dir, TypeConverter.fromYarn(jobId).toString()); + private Path getJobHistoryFile(Path dir, JobId jobId) { + return new Path(dir, TypeConverter.fromYarn(jobId).toString() + "_" + + startCount); + } /* Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Apr 26 08:01:04 2011 @@ -23,6 +23,7 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.jobhi import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.app.client.ClientService; import org.apache.hadoop.mapreduce.v2.app.client.MRClientService; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -56,6 +58,8 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl; import org.apache.hadoop.mapreduce.v2.app.local.LocalContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; +import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator; @@ -102,9 +106,10 @@ public class MRAppMaster extends Composi private static final Log LOG = LogFactory.getLog(MRAppMaster.class); - private final Clock clock; - - private ApplicationId appID; + private Clock clock; + private final int startCount; + private final ApplicationId appID; + private Set completedTasksFromPreviousRun; private AppContext context; private Dispatcher dispatcher; private ClientService clientService; @@ -117,21 +122,16 @@ public class MRAppMaster extends Composi new JobTokenSecretManager(); private Job job; - private int failCount = 0; - - public MRAppMaster(ApplicationId applicationId) { - this(applicationId, new SystemClock()); - } - public MRAppMaster(ApplicationId applicationId, int failCount) { - this(applicationId); - this.failCount = failCount; + public MRAppMaster(ApplicationId applicationId, int startCount) { + this(applicationId, new SystemClock(), startCount); } - - public MRAppMaster(ApplicationId applicationId, Clock clock) { + + public MRAppMaster(ApplicationId applicationId, Clock clock, int startCount) { super(MRAppMaster.class.getName()); this.clock = clock; this.appID = applicationId; + this.startCount = startCount; LOG.info("Created MRAppMaster for application " + applicationId); } @@ -139,8 +139,18 @@ public class MRAppMaster extends Composi public void init(final Configuration conf) { context = new RunningAppContext(); - dispatcher = new AsyncDispatcher(); - addIfService(dispatcher); + if (conf.getBoolean(YarnMRJobConfig.RECOVERY_ENABLE, false) + && startCount > 1) { + LOG.info("Recovery is enabled. Will try to recover from previous life."); + Recovery recoveryServ = new RecoveryService(appID, clock, startCount); + addIfService(recoveryServ); + dispatcher = recoveryServ.getDispatcher(); + clock = recoveryServ.getClock(); + completedTasksFromPreviousRun = recoveryServ.getCompletedTasks(); + } else { + dispatcher = new AsyncDispatcher(); + addIfService(dispatcher); + } //service to handle requests to TaskUmbilicalProtocol taskAttemptListener = createTaskAttemptListener(context); @@ -259,7 +269,7 @@ public class MRAppMaster extends Composi // create single job Job newJob = new JobImpl(appID, conf, dispatcher.getEventHandler(), taskAttemptListener, jobTokenSecretManager, fsTokens, - clock); + clock, startCount, completedTasksFromPreviousRun); ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); dispatcher.register(JobFinishEvent.Type.class, @@ -312,7 +322,8 @@ public class MRAppMaster extends Composi protected EventHandler createJobHistoryHandler( AppContext context) { - JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context); + JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, + getStartCount()); return eventHandler; } @@ -385,6 +396,10 @@ public class MRAppMaster extends Composi return appID; } + public int getStartCount() { + return startCount; + } + public AppContext getContext() { return context; } @@ -393,6 +408,10 @@ public class MRAppMaster extends Composi return dispatcher; } + public Set getCompletedTaskFromPreviousRun() { + return completedTasksFromPreviousRun; + } + //Returns null if speculation is not enabled public Speculator getSpeculator() { return speculator; @@ -503,7 +522,7 @@ public class MRAppMaster extends Composi applicationId.setClusterTimestamp(Long.valueOf(args[0])); applicationId.setId(Integer.valueOf(args[1])); int failCount = Integer.valueOf(args[2]); - MRAppMaster appMaster = new MRAppMaster(applicationId, failCount); + MRAppMaster appMaster = new MRAppMaster(applicationId, ++failCount); YarnConfiguration conf = new YarnConfiguration(new JobConf()); conf.addResource(new Path(YARNApplicationConstants.JOB_CONF_FILE)); conf.set(MRJobConfig.USER_NAME, Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Apr 26 08:01:04 2011 @@ -127,6 +127,8 @@ public class JobImpl implements org.apac //final fields private final Clock clock; + private final int startCount; + private final Set completedTasksFromPreviousRun; private final Lock readLock; private final Lock writeLock; private final JobId jobId; @@ -341,11 +343,14 @@ public class JobImpl implements org.apac public JobImpl(ApplicationId appID, Configuration conf, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, - Credentials fsTokenCredentials, Clock clock) { + Credentials fsTokenCredentials, Clock clock, int startCount, + Set completedTasksFromPreviousRun) { this.jobId = recordFactory.newRecordInstance(JobId.class); this.conf = conf; this.clock = clock; + this.completedTasksFromPreviousRun = completedTasksFromPreviousRun; + this.startCount = startCount; jobId.setAppId(appID); jobId.setId(appID.getId()); oldJobId = TypeConverter.fromYarn(jobId); @@ -900,7 +905,7 @@ public class JobImpl implements org.apac job.conf, splits[i], job.taskAttemptListener, job.committer, job.jobToken, job.fsTokens.getAllTokens(), - job.clock); + job.clock, job.completedTasksFromPreviousRun, job.startCount); job.addTask(task); } LOG.info("Input size for job " + job.jobId + " = " + inputLength @@ -915,7 +920,8 @@ public class JobImpl implements org.apac job.remoteJobConfFile, job.conf, job.numMapTasks, job.taskAttemptListener, job.committer, job.jobToken, - job.fsTokens.getAllTokens(), job.clock); + job.fsTokens.getAllTokens(), job.clock, + job.completedTasksFromPreviousRun, job.startCount); job.addTask(task); } LOG.info("Number of reduces for job " + job.jobId + " = " Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Tue Apr 26 08:01:04 2011 @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a import java.util.Collection; import java.util.List; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -29,6 +30,7 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.security.token.Token; @@ -46,9 +48,11 @@ public class MapTaskImpl extends TaskImp TaskSplitMetaInfo taskSplitMetaInfo, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock) { + Collection> fsTokens, Clock clock, + Set completedTasksFromPreviousRun, int startCount) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, - conf, taskAttemptListener, committer, jobToken, fsTokens, clock); + conf, taskAttemptListener, committer, jobToken, fsTokens, clock, + completedTasksFromPreviousRun, startCount); this.taskSplitMetaInfo = taskSplitMetaInfo; } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Tue Apr 26 08:01:04 2011 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -27,6 +28,7 @@ import org.apache.hadoop.mapreduce.MRJob import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.mapreduce.v2.api.records.TaskType; import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.security.token.Token; @@ -42,9 +44,11 @@ public class ReduceTaskImpl extends Task EventHandler eventHandler, Path jobFile, Configuration conf, int numMapTasks, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock) { + Collection> fsTokens, Clock clock, + Set completedTasksFromPreviousRun, int startCount) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, - taskAttemptListener, committer, jobToken, fsTokens, clock); + taskAttemptListener, committer, jobToken, fsTokens, clock, + completedTasksFromPreviousRun, startCount); this.numMapTasks = numMapTasks; } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Tue Apr 26 08:01:04 2011 @@ -989,30 +989,7 @@ public abstract class TaskAttemptImpl im String taskType = TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()).toString(); LOG.info("In TaskAttemptImpl taskType: " + taskType); - if (taskType.equals("MAP")) { - MapAttemptFinishedEvent mfe = - new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - TaskAttemptState.SUCCEEDED.toString(), - taskAttempt.finishTime, - taskAttempt.finishTime, "hostname", - TaskAttemptState.SUCCEEDED.toString(), - TypeConverter.fromYarn(taskAttempt.getCounters()),null); - taskAttempt.eventHandler.handle( - new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe)); - } else { - ReduceAttemptFinishedEvent rfe = - new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - TaskAttemptState.SUCCEEDED.toString(), - taskAttempt.finishTime, - taskAttempt.finishTime, - taskAttempt.finishTime, "hostname", - TaskAttemptState.SUCCEEDED.toString(), - TypeConverter.fromYarn(taskAttempt.getCounters()),null); - taskAttempt.eventHandler.handle( - new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe)); - } + taskAttempt.logAttemptFinishedEvent(TaskAttemptState.SUCCEEDED); /* TaskAttemptFinishedEvent tfe = new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), @@ -1047,36 +1024,40 @@ public abstract class TaskAttemptImpl im taskAttempt.reportedStatus.diagnosticInfo.toString()); taskAttempt.eventHandler.handle( new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta)); - if (taskAttempt.attemptId.getTaskId().getTaskType() == TaskType.MAP) { - MapAttemptFinishedEvent mfe = - new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - TaskAttemptState.FAILED.toString(), - taskAttempt.finishTime, - taskAttempt.finishTime, "hostname", - TaskAttemptState.FAILED.toString(), - TypeConverter.fromYarn(taskAttempt.getCounters()),null); - taskAttempt.eventHandler.handle( - new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), mfe)); - } else { - ReduceAttemptFinishedEvent rfe = - new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId), - TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()), - TaskAttemptState.FAILED.toString(), - taskAttempt.finishTime, - taskAttempt.finishTime, - taskAttempt.finishTime, "hostname", - TaskAttemptState.FAILED.toString(), - TypeConverter.fromYarn(taskAttempt.getCounters()),null); - taskAttempt.eventHandler.handle( - new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), rfe)); - } + taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); } } + private void logAttemptFinishedEvent(TaskAttemptState state) { + if (attemptId.getTaskId().getTaskType() == TaskType.MAP) { + MapAttemptFinishedEvent mfe = + new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + state.toString(), + finishTime, + finishTime, "hostname", + state.toString(), + TypeConverter.fromYarn(getCounters()),null); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe)); + } else { + ReduceAttemptFinishedEvent rfe = + new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId), + TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()), + state.toString(), + finishTime, + finishTime, + finishTime, "hostname", + state.toString(), + TypeConverter.fromYarn(getCounters()),null); + eventHandler.handle( + new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe)); + } + } + private static class TooManyFetchFailureTransition implements SingleArcTransition { @Override @@ -1108,6 +1089,7 @@ public abstract class TaskAttemptImpl im taskAttempt.reportedStatus.diagnosticInfo.toString()); taskAttempt.eventHandler.handle( new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke)); + taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Apr 26 08:01:04 2011 @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -219,7 +220,8 @@ public abstract class TaskImpl implement EventHandler eventHandler, Path remoteJobConfFile, Configuration conf, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, - Collection> fsTokens, Clock clock) { + Collection> fsTokens, Clock clock, + Set completedTasksFromPreviousRun, int startCount) { this.conf = conf; this.clock = clock; this.jobFile = remoteJobConfFile; @@ -242,6 +244,18 @@ public abstract class TaskImpl implement this.fsTokens = fsTokens; this.jobToken = jobToken; + if (completedTasksFromPreviousRun != null + && completedTasksFromPreviousRun.contains(taskId)) { + LOG.info("Task is from previous run " + taskId); + startCount = startCount - 1; + } + + //attempt ids are generated based on MR app startCount so that attempts + //from previous lives don't overstep the current one. + //this assumes that a task won't have more than 1000 attempts in its single + //life + nextAttemptNumber = (startCount - 1) * 1000; + // This "this leak" is okay because the retained pointer is in an // instance variable. stateMachine = stateMachineFactory.make(this); Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java?rev=1096692&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ControlledClock.java Tue Apr 26 08:01:04 2011 @@ -0,0 +1,43 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.hadoop.mapreduce.v2.app.recover; + +import org.apache.hadoop.yarn.Clock; + +class ControlledClock implements Clock { + private long time = -1; + private final Clock actualClock; + ControlledClock(Clock actualClock) { + this.actualClock = actualClock; + } + synchronized void setTime(long time) { + this.time = time; + } + synchronized void reset() { + time = -1; + } + + @Override + public synchronized long getTime() { + if (time != -1) { + return time; + } + return actualClock.getTime(); + } + +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1096692&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Tue Apr 26 08:01:04 2011 @@ -0,0 +1,34 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.recover; + +import java.util.Set; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.event.Dispatcher; + +public interface Recovery { + + Dispatcher getDispatcher(); + + Clock getClock(); + + Set getCompletedTasks(); +} Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1096692&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Apr 26 08:01:04 2011 @@ -0,0 +1,363 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.recover; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; +import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.Phase; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskId; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleaner; +import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YARNApplicationConstants; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.service.Service; + +/* + * Recovers the completed tasks from the previous life of Application Master. + * The completed tasks are deciphered from the history file of the previous life. + * Recovery service intercepts and replay the events for completed tasks. + * While recovery is in progress, the scheduling of new tasks are delayed by + * buffering the task schedule events. + * The recovery service controls the clock while recovery is in progress. + */ + +//TODO: +//task cleanup for all non completed tasks +//change job output committer to have +// - atomic job output promotion +// - recover output of completed tasks + +public class RecoveryService extends CompositeService implements Recovery { + + private static final Log LOG = LogFactory.getLog(RecoveryService.class); + + private final ApplicationId appID; + private final Dispatcher dispatcher; + private final ControlledClock clock; + private final int startCount; + + private JobInfo jobInfo = null; + private final Map completedTasks = + new HashMap(); + + private final List pendingTaskScheduleEvents = + new ArrayList(); + + private volatile boolean recoveryMode = false; + + public RecoveryService(ApplicationId appID, Clock clock, int startCount) { + super("RecoveringDispatcher"); + this.appID = appID; + this.startCount = startCount; + this.dispatcher = new RecoveryDispatcher(); + this.clock = new ControlledClock(clock); + if (dispatcher instanceof Service) { + addService((Service) dispatcher); + } + } + + @Override + public void init(Configuration conf) { + super.init(conf); + // parse the history file + try { + parse(); + if (completedTasks.size() > 0) { + recoveryMode = true; + LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS " + + "TO RECOVER " + completedTasks.size()); + LOG.info("Job launch time " + jobInfo.getLaunchTime()); + clock.setTime(jobInfo.getLaunchTime()); + } + } catch (IOException e) { + LOG.warn(e); + LOG.warn("Could not parse the old history file. Aborting recovery. " + + "Starting afresh."); + } + } + + @Override + public Dispatcher getDispatcher() { + return dispatcher; + } + + @Override + public Clock getClock() { + return clock; + } + + @Override + public Set getCompletedTasks() { + return completedTasks.keySet(); + } + + private void parse() throws IOException { + // TODO: parse history file based on startCount + String jobName = TypeConverter.fromYarn(appID).toString(); + String defaultStagingDir = getConfig().get( + YARNApplicationConstants.APPS_STAGING_DIR_KEY) + + "/history/staging"; + String jobhistoryDir = getConfig().get( + YarnMRJobConfig.HISTORY_STAGING_DIR_KEY, defaultStagingDir); + FSDataInputStream in = null; + Path historyFile = null; + Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified( + new Path(jobhistoryDir)); + FileContext fc = FileContext.getFileContext(histDirPath.toUri(), + getConfig()); + historyFile = fc.makeQualified(new Path(histDirPath, jobName + "_" + + (startCount -1))); //read the previous history file + in = fc.open(historyFile); + JobHistoryParser parser = new JobHistoryParser(in); + jobInfo = parser.parse(); + Map taskInfos = jobInfo + .getAllTasks(); + for (TaskInfo taskInfo : taskInfos.values()) { + if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { + completedTasks + .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo); + LOG.info("Read from history task " + + TypeConverter.toYarn(taskInfo.getTaskId())); + } + } + LOG.info("Read completed tasks from history " + + completedTasks.size()); + } + + class RecoveryDispatcher extends AsyncDispatcher { + private final EventHandler actualHandler; + private final EventHandler handler; + + RecoveryDispatcher() { + actualHandler = super.getEventHandler(); + handler = new InterceptingEventHandler(actualHandler); + } + + @Override + public void dispatch(Event event) { + if (recoveryMode) { + if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { + TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) + .getTaskAttemptID()); + LOG.info("Attempt start time " + attInfo.getStartTime()); + clock.setTime(attInfo.getStartTime()); + + } else if (event.getType() == TaskAttemptEventType.TA_DONE + || event.getType() == TaskAttemptEventType.TA_FAILMSG + || event.getType() == TaskAttemptEventType.TA_KILL) { + TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) + .getTaskAttemptID()); + LOG.info("Attempt finish time " + attInfo.getFinishTime()); + clock.setTime(attInfo.getFinishTime()); + } + + else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED + || event.getType() == TaskEventType.T_ATTEMPT_KILLED + || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) { + TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event; + LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID()); + TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID() + .getTaskId()); + taskInfo.getAllTaskAttempts().remove( + TypeConverter.fromYarn(tEvent.getTaskAttemptID())); + // remove the task info from completed tasks if all attempts are + // recovered + if (taskInfo.getAllTaskAttempts().size() == 0) { + completedTasks.remove(tEvent.getTaskAttemptID().getTaskId()); + // checkForRecoveryComplete + LOG.info("CompletedTasks() " + completedTasks.size()); + if (completedTasks.size() == 0) { + recoveryMode = false; + clock.reset(); + LOG.info("Setting the recovery mode to false. " + + "Recovery is complete!"); + + // send all pending tasks schedule events + for (TaskEvent tEv : pendingTaskScheduleEvents) { + actualHandler.handle(tEv); + } + + } + } + } + } + super.dispatch(event); + } + + @Override + public EventHandler getEventHandler() { + return handler; + } + } + + private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) { + TaskInfo taskInfo = completedTasks.get(id.getTaskId()); + return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); + } + + private class InterceptingEventHandler implements EventHandler { + EventHandler actualHandler; + + InterceptingEventHandler(EventHandler actualHandler) { + this.actualHandler = actualHandler; + } + + @Override + public void handle(Event event) { + if (!recoveryMode) { + // delegate to the dispatcher one + actualHandler.handle(event); + return; + } + + else if (event.getType() == TaskEventType.T_SCHEDULE) { + TaskEvent taskEvent = (TaskEvent) event; + // delay the scheduling of new tasks till previous ones are recovered + if (completedTasks.get(taskEvent.getTaskID()) == null) { + LOG.debug("Adding to pending task events " + + taskEvent.getTaskID()); + pendingTaskScheduleEvents.add(taskEvent); + return; + } + } + + else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) { + TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID(); + TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); + LOG.debug("CONTAINER_REQ " + aId); + sendAssignedEvent(aId, attInfo); + return; + } + + else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) { + TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID(); + LOG.debug("TASK_CLEAN"); + actualHandler.handle(new TaskAttemptEvent(aId, + TaskAttemptEventType.TA_CLEANUP_DONE)); + return; + } + + else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) { + TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event) + .getTaskAttemptID(); + TaskAttemptInfo attInfo = getTaskAttemptInfo(aId); + actualHandler.handle(new TaskAttemptEvent(aId, + TaskAttemptEventType.TA_CONTAINER_LAUNCHED)); + // send the status update event + sendStatusUpdateEvent(aId, attInfo); + + TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getState()); + switch (state) { + case SUCCEEDED: + // send the done event + LOG.info("Sending done event to " + aId); + actualHandler.handle(new TaskAttemptEvent(aId, + TaskAttemptEventType.TA_DONE)); + break; + case KILLED: + LOG.info("Sending kill event to " + aId); + actualHandler.handle(new TaskAttemptEvent(aId, + TaskAttemptEventType.TA_KILL)); + break; + default: + LOG.info("Sending fail event to " + aId); + actualHandler.handle(new TaskAttemptEvent(aId, + TaskAttemptEventType.TA_FAILMSG)); + break; + } + return; + } + + // delegate to the actual handler + actualHandler.handle(event); + } + + private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID, + TaskAttemptInfo attemptInfo) { + LOG.info("Sending status update event to " + yarnAttemptID); + TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus(); + taskAttemptStatus.id = yarnAttemptID; + taskAttemptStatus.progress = 1.0f; + taskAttemptStatus.diagnosticInfo = ""; + taskAttemptStatus.stateString = attemptInfo.getState(); + // taskAttemptStatus.outputSize = attemptInfo.getOutputSize(); + taskAttemptStatus.phase = Phase.CLEANUP; + org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters(); + if (cntrs == null) { + taskAttemptStatus.counters = null; + } else { + taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo + .getCounters()); + } + actualHandler.handle(new TaskAttemptStatusUpdateEvent( + taskAttemptStatus.id, taskAttemptStatus)); + } + + private void sendAssignedEvent(TaskAttemptId yarnAttemptID, + TaskAttemptInfo attemptInfo) { + LOG.info("Sending assigned event to " + yarnAttemptID); + ContainerId cId = RecordFactoryProvider.getRecordFactory(null) + .newRecordInstance(ContainerId.class); + actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID, + cId, null, attemptInfo.getHostname() + ":" + + attemptInfo.getHttpPort(), null)); + } + } + +} Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Tue Apr 26 08:01:04 2011 @@ -85,10 +85,20 @@ public class MRApp extends MRAppMaster { //if true, tasks complete automatically as soon as they are launched protected boolean autoComplete = false; + static ApplicationId applicationId; + + static { + applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class); + applicationId.setClusterTimestamp(0); + applicationId.setId(0); + } + public MRApp(int maps, int reduces, boolean autoComplete) { - - super(RecordFactoryProvider.getRecordFactory(null).newRecordInstance( - ApplicationId.class)); + this(maps, reduces, autoComplete, 1); + } + + public MRApp(int maps, int reduces, boolean autoComplete, int startCount) { + super(applicationId, startCount); this.maps = maps; this.reduces = reduces; this.autoComplete = autoComplete; @@ -163,10 +173,14 @@ public class MRApp extends MRAppMaster { JobReport jobReport = job.getReport(); Assert.assertTrue("Job start time is not less than finish time", jobReport.getStartTime() < jobReport.getFinishTime()); + System.out.println("Job start time :" + jobReport.getStartTime()); + System.out.println("Job finish time :" + jobReport.getFinishTime()); Assert.assertTrue("Job finish time is in future", jobReport.getFinishTime() < System.currentTimeMillis()); for (Task task : job.getTasks().values()) { TaskReport taskReport = task.getReport(); + System.out.println("Task start time : " + taskReport.getStartTime()); + System.out.println("Task finish time : " + taskReport.getFinishTime()); Assert.assertTrue("Task start time is not less than finish time", taskReport.getStartTime() < taskReport.getFinishTime()); for (TaskAttempt attempt : task.getAttempts().values()) { @@ -310,7 +324,8 @@ public class MRApp extends MRAppMaster { public TestJob(ApplicationId appID, EventHandler eventHandler, TaskAttemptListener taskAttemptListener, Clock clock) { super(appID, new Configuration(), eventHandler, taskAttemptListener, - new JobTokenSecretManager(), new Credentials(), clock); + new JobTokenSecretManager(), new Credentials(), clock, getStartCount(), + getCompletedTaskFromPreviousRun()); // This "this leak" is okay because the retained pointer is in an // instance variable. Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1096692&view=auto ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (added) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Apr 26 08:01:04 2011 @@ -0,0 +1,185 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app; + +import java.util.Iterator; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig; +import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskState; +import org.apache.hadoop.mapreduce.v2.app.job.Job; +import org.apache.hadoop.mapreduce.v2.app.job.Task; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.junit.Test; + +public class TestRecovery { + + private static final Log LOG = LogFactory.getLog(TestRecovery.class); + + @Test + public void testCrashed() throws Exception { + int runCount = 0; + MRApp app = new MRApp(2, 1, false, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + long jobStartTime = job.getReport().getStartTime(); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next(); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + // reduces must be in NEW state + Assert.assertEquals("Reduce Task state not correct", + TaskState.NEW, reduceTask.getReport().getTaskState()); + + //send the fail signal to the 1st map task attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_FAILMSG)); + + app.waitForState(task1Attempt1, TaskAttemptState.FAILED); + + while (mapTask1.getAttempts().size() != 2) { + Thread.sleep(2000); + LOG.info("Waiting for next attempt to start"); + } + Iterator itr = mapTask1.getAttempts().values().iterator(); + itr.next(); + TaskAttempt task1Attempt2 = itr.next(); + + app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); + + //send the kill signal to the 1st map 2nd attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt2.getID(), + TaskAttemptEventType.TA_KILL)); + + app.waitForState(task1Attempt2, TaskAttemptState.KILLED); + + while (mapTask1.getAttempts().size() != 3) { + Thread.sleep(2000); + LOG.info("Waiting for next attempt to start"); + } + itr = mapTask1.getAttempts().values().iterator(); + itr.next(); + itr.next(); + TaskAttempt task1Attempt3 = itr.next(); + + app.waitForState(task1Attempt3, TaskAttemptState.RUNNING); + + //send the done signal to the 1st map 3rd attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt3.getID(), + TaskAttemptEventType.TA_DONE)); + + //wait for first map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + long task1StartTime = mapTask1.getReport().getStartTime(); + long task1FinishTime = mapTask1.getReport().getFinishTime(); + + //stop the app + app.stop(); + + //rerun + //in rerun the 1st map will be recovered from previous run + app = new MRApp(2, 1, false, ++runCount); + conf = new Configuration(); + conf.setBoolean(YarnMRJobConfig.RECOVERY_ENABLE, true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // first map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + app.waitForState(mapTask2, TaskState.RUNNING); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask2.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + //wait for reduce to be running before sending done + app.waitForState(reduceTask, TaskState.RUNNING); + //send the done signal to the reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduceTask.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + Assert.assertEquals("Job Start time not correct", + jobStartTime, job.getReport().getStartTime()); + Assert.assertEquals("Task Start time not correct", + task1StartTime, mapTask1.getReport().getStartTime()); + Assert.assertEquals("Task Finish time not correct", + task1FinishTime, mapTask1.getReport().getFinishTime()); + } + + public static void main(String[] arg) throws Exception { + TestRecovery test = new TestRecovery(); + test.testCrashed(); + } +} Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Tue Apr 26 08:01:04 2011 @@ -53,4 +53,7 @@ public class YarnMRJobConfig { "address.webapp"; public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS = "0.0.0.0:19888"; + + public static final String RECOVERY_ENABLE + = "yarn.mapreduce.job.recovery.enable"; } Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1096692&r1=1096691&r2=1096692&view=diff ============================================================================== --- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original) +++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Tue Apr 26 08:01:04 2011 @@ -68,6 +68,7 @@ class EventWriter { void flush() throws IOException { encoder.flush(); + out.hflush(); } void close() throws IOException {