Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 95A1C917F for ; Tue, 14 Feb 2012 00:12:25 +0000 (UTC) Received: (qmail 86442 invoked by uid 500); 14 Feb 2012 00:12:25 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 86277 invoked by uid 500); 14 Feb 2012 00:12:24 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 86269 invoked by uid 99); 14 Feb 2012 00:12:24 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Feb 2012 00:12:24 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Feb 2012 00:12:18 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 890072388860; Tue, 14 Feb 2012 00:11:55 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1243755 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src... Date: Tue, 14 Feb 2012 00:11:54 -0000 To: mapreduce-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120214001155.890072388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Tue Feb 14 00:11:54 2012 New Revision: 1243755 URL: http://svn.apache.org/viewvc?rev=1243755&view=rev Log: MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then the recovery. (vinodkv) svn merge --ignore-ancestry -c 1243752 ../../trunk/ Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Feb 14 00:11:54 2012 @@ -759,6 +759,9 @@ Release 0.23.1 - 2012-02-08 MAPREDUCE-3843. Job summary log file found missing on the RM host (Anupam Seth via tgraves) + MAPREDUCE-3846. Addressed MR AM hanging issues during AM restart and then + the recovery. (vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Feb 14 00:11:54 2012 @@ -244,7 +244,7 @@ public class JobHistoryEventHandler exte while (!stopped && !Thread.currentThread().isInterrupted()) { // Log the size of the history-event-queue every so often. - if (eventCounter % 1000 == 0) { + if (eventCounter != 0 && eventCounter % 1000 == 0) { eventCounter = 0; LOG.info("Size of the JobHistory event queue is " + eventQueue.size()); @@ -464,8 +464,10 @@ public class JobHistoryEventHandler exte } processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(), event.getJobID()); - LOG.info("In HistoryEventHandler " - + event.getHistoryEvent().getEventType()); + if (LOG.isDebugEnabled()) { + LOG.debug("In HistoryEventHandler " + + event.getHistoryEvent().getEventType()); + } } catch (IOException e) { LOG.error("Error writing History Event: " + event.getHistoryEvent(), e); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Feb 14 00:11:54 2012 @@ -26,7 +26,6 @@ import java.security.PrivilegedException import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; @@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.TypeC import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; @@ -123,7 +123,7 @@ import org.apache.hadoop.yarn.util.Conve * The information is shared across different components using AppContext. */ -@SuppressWarnings("deprecation") +@SuppressWarnings("rawtypes") public class MRAppMaster extends CompositeService { private static final Log LOG = LogFactory.getLog(MRAppMaster.class); @@ -138,7 +138,7 @@ public class MRAppMaster extends Composi private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; - private Set completedTasksFromPreviousRun; + private Map completedTasksFromPreviousRun; private List amInfos; private AppContext context; private Dispatcher dispatcher; @@ -596,7 +596,7 @@ public class MRAppMaster extends Composi return dispatcher; } - public Set getCompletedTaskFromPreviousRun() { + public Map getCompletedTaskFromPreviousRun() { return completedTasksFromPreviousRun; } @@ -737,7 +737,6 @@ public class MRAppMaster extends Composi return jobs; } - @SuppressWarnings("rawtypes") @Override public EventHandler getEventHandler() { return dispatcher.getEventHandler(); Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Feb 14 00:11:54 2012 @@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.Outpu import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent; import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent; import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent; @@ -133,7 +134,7 @@ public class JobImpl implements org.apac private float cleanupWeight = 0.05f; private float mapWeight = 0.0f; private float reduceWeight = 0.0f; - private final Set completedTasksFromPreviousRun; + private final Map completedTasksFromPreviousRun; private final List amInfos; private final Lock readLock; private final Lock writeLock; @@ -376,7 +377,7 @@ public class JobImpl implements org.apac TaskAttemptListener taskAttemptListener, JobTokenSecretManager jobTokenSecretManager, Credentials fsTokenCredentials, Clock clock, - Set completedTasksFromPreviousRun, MRAppMetrics metrics, + Map completedTasksFromPreviousRun, MRAppMetrics metrics, OutputCommitter committer, boolean newApiCommitter, String userName, long appSubmitTime, List amInfos) { this.applicationAttemptId = applicationAttemptId; Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java Tue Feb 14 00:11:54 2012 @@ -19,13 +19,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; -import java.util.Set; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -38,7 +39,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public class MapTaskImpl extends TaskImpl { private final TaskSplitMetaInfo taskSplitMetaInfo; @@ -49,7 +50,7 @@ public class MapTaskImpl extends TaskImp TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, TaskType.MAP, partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, jobToken, fsTokens, clock, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ReduceTaskImpl.java Tue Feb 14 00:11:54 2012 @@ -19,13 +19,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; import java.util.Collection; -import java.util.Set; +import java.util.Map; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.ReduceTaskAttemptImpl; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.records.JobId; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; @@ -37,7 +38,7 @@ import org.apache.hadoop.security.token. import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.event.EventHandler; -@SuppressWarnings({ "rawtypes", "deprecation" }) +@SuppressWarnings({ "rawtypes" }) public class ReduceTaskImpl extends TaskImpl { private final int numMapTasks; @@ -47,7 +48,7 @@ public class ReduceTaskImpl extends Task int numMapTasks, TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, TaskType.REDUCE, partition, eventHandler, jobFile, conf, taskAttemptListener, committer, jobToken, fsTokens, clock, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Tue Feb 14 00:11:54 2012 @@ -18,13 +18,14 @@ package org.apache.hadoop.mapreduce.v2.app.job.impl; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.EnumSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -35,8 +36,11 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent; import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent; @@ -66,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; +import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.Clock; @@ -208,8 +213,23 @@ public abstract class TaskImpl implement private final StateMachine stateMachine; - - protected int nextAttemptNumber; + + // By default, the next TaskAttempt number is zero. Changes during recovery + protected int nextAttemptNumber = 0; + private List taskAttemptsFromPreviousGeneration = + new ArrayList(); + + private static final class RecoverdAttemptsComparator implements + Comparator { + @Override + public int compare(TaskAttemptInfo attempt1, TaskAttemptInfo attempt2) { + long diff = attempt1.getStartTime() - attempt2.getStartTime(); + return diff == 0 ? 0 : (diff < 0 ? -1 : 1); + } + } + + private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR = + new RecoverdAttemptsComparator(); //should be set to one which comes first //saying COMMIT_PENDING @@ -230,7 +250,7 @@ public abstract class TaskImpl implement TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { this.conf = conf; this.clock = clock; @@ -243,10 +263,7 @@ public abstract class TaskImpl implement // have a convention that none of the overrides depends on any // fields that need initialization. maxAttempts = getMaxAttempts(); - taskId = recordFactory.newRecordInstance(TaskId.class); - taskId.setJobId(jobId); - taskId.setId(partition); - taskId.setTaskType(taskType); + taskId = MRBuilderUtils.newTaskId(jobId, partition, taskType); this.partition = partition; this.taskAttemptListener = taskAttemptListener; this.eventHandler = eventHandler; @@ -255,18 +272,38 @@ public abstract class TaskImpl implement this.jobToken = jobToken; this.metrics = metrics; + // See if this is from a previous generation. if (completedTasksFromPreviousRun != null - && completedTasksFromPreviousRun.contains(taskId)) { + && completedTasksFromPreviousRun.containsKey(taskId)) { + // This task has TaskAttempts from previous generation. We have to replay + // them. LOG.info("Task is from previous run " + taskId); - startCount = startCount - 1; + TaskInfo taskInfo = completedTasksFromPreviousRun.get(taskId); + Map allAttempts = + taskInfo.getAllTaskAttempts(); + taskAttemptsFromPreviousGeneration = new ArrayList(); + taskAttemptsFromPreviousGeneration.addAll(allAttempts.values()); + Collections.sort(taskAttemptsFromPreviousGeneration, + RECOVERED_ATTEMPTS_COMPARATOR); + } + + if (taskAttemptsFromPreviousGeneration.isEmpty()) { + // All the previous attempts are exhausted, now start with a new + // generation. + + // All the new TaskAttemptIDs are generated based on MR + // ApplicationAttemptID so that attempts from previous lives don't + // over-step the current one. This assumes that a task won't have more + // than 1000 attempts in its single generation, which is very reasonable. + // Someone is nuts if he/she thinks he/she can live with 1000 TaskAttempts + // and requires serious medical attention. + nextAttemptNumber = (startCount - 1) * 1000; + } else { + // There are still some TaskAttempts from previous generation, use them + nextAttemptNumber = + taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); } - //attempt ids are generated based on MR app startCount so that attempts - //from previous lives don't overstep the current one. - //this assumes that a task won't have more than 1000 attempts in its single - //life - nextAttemptNumber = (startCount - 1) * 1000; - // This "this leak" is okay because the retained pointer is in an // instance variable. stateMachine = stateMachineFactory.make(this); @@ -390,17 +427,23 @@ public abstract class TaskImpl implement //this is always called in read/write lock private long getLaunchTime() { - long launchTime = 0; + long taskLaunchTime = 0; + boolean launchTimeSet = false; for (TaskAttempt at : attempts.values()) { - //select the least launch time of all attempts - if (launchTime == 0 || launchTime > at.getLaunchTime()) { - launchTime = at.getLaunchTime(); + // select the least launch time of all attempts + long attemptLaunchTime = at.getLaunchTime(); + if (attemptLaunchTime != 0 && !launchTimeSet) { + // For the first non-zero launch time + launchTimeSet = true; + taskLaunchTime = attemptLaunchTime; + } else if (attemptLaunchTime != 0 && taskLaunchTime > attemptLaunchTime) { + taskLaunchTime = attemptLaunchTime; } } - if (launchTime == 0) { + if (!launchTimeSet) { return this.scheduledTime; } - return launchTime; + return taskLaunchTime; } //this is always called in read/write lock @@ -525,7 +568,16 @@ public abstract class TaskImpl implement attempts.put(attempt.getID(), attempt); break; } - ++nextAttemptNumber; + + // Update nextATtemptNumber + if (taskAttemptsFromPreviousGeneration.isEmpty()) { + ++nextAttemptNumber; + } else { + // There are still some TaskAttempts from previous generation, use them + nextAttemptNumber = + taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId(); + } + ++numberUncompletedAttempts; //schedule the nextAttemptNumber if (failedAttempts > 0) { Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/Recovery.java Tue Feb 14 00:11:54 2012 @@ -19,8 +19,9 @@ package org.apache.hadoop.mapreduce.v2.app.recover; import java.util.List; -import java.util.Set; +import java.util.Map; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.TaskId; import org.apache.hadoop.yarn.Clock; @@ -32,7 +33,7 @@ public interface Recovery { Clock getClock(); - Set getCompletedTasks(); + Map getCompletedTasks(); List getAMInfos(); } Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Tue Feb 14 00:11:54 2012 @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -153,8 +153,8 @@ public class RecoveryService extends Com } @Override - public Set getCompletedTasks() { - return completedTasks.keySet(); + public Map getCompletedTasks() { + return completedTasks; } @Override @@ -189,7 +189,8 @@ public class RecoveryService extends Com getConfig()); //read the previous history file historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile( - histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); + histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1))); + LOG.info("History file is at " + historyFile); in = fc.open(historyFile); JobHistoryParser parser = new JobHistoryParser(in); jobInfo = parser.parse(); @@ -242,7 +243,7 @@ public class RecoveryService extends Com if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) .getTaskAttemptID()); - LOG.info("Attempt start time " + attInfo.getStartTime()); + LOG.info("Recovered Attempt start time " + attInfo.getStartTime()); clock.setTime(attInfo.getStartTime()); } else if (event.getType() == TaskAttemptEventType.TA_DONE @@ -250,7 +251,7 @@ public class RecoveryService extends Com || event.getType() == TaskAttemptEventType.TA_KILL) { TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event) .getTaskAttemptID()); - LOG.info("Attempt finish time " + attInfo.getFinishTime()); + LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime()); clock.setTime(attInfo.getFinishTime()); } @@ -380,17 +381,17 @@ public class RecoveryService extends Com } // send the done event - LOG.info("Sending done event to " + aId); + LOG.info("Sending done event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_DONE)); break; case KILLED: - LOG.info("Sending kill event to " + aId); + LOG.info("Sending kill event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_KILL)); break; default: - LOG.info("Sending fail event to " + aId); + LOG.info("Sending fail event to recovered attempt " + aId); actualHandler.handle(new TaskAttemptEvent(aId, TaskAttemptEventType.TA_FAILMSG)); break; Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Tue Feb 14 00:11:54 2012 @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.o import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobState; +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState; import org.apache.hadoop.mapreduce.v2.api.records.TaskState; import org.apache.hadoop.mapreduce.v2.app.job.Job; @@ -52,6 +53,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; @@ -74,7 +76,14 @@ public class TestRecovery { private Text val1 = new Text("val1"); private Text val2 = new Text("val2"); - + /** + * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt + * completely disappears because of failed launch, one attempt gets killed and + * one attempt succeeds. AM crashes after the first tasks finishes and + * recovers completely and succeeds in the second generation. + * + * @throws Exception + */ @Test public void testCrashed() throws Exception { @@ -112,7 +121,8 @@ public class TestRecovery { // reduces must be in NEW state Assert.assertEquals("Reduce Task state not correct", TaskState.RUNNING, reduceTask.getReport().getTaskState()); - + + /////////// Play some games with the TaskAttempts of the first task ////// //send the fail signal to the 1st map task attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( @@ -120,42 +130,68 @@ public class TestRecovery { TaskAttemptEventType.TA_FAILMSG)); app.waitForState(task1Attempt1, TaskAttemptState.FAILED); - - while (mapTask1.getAttempts().size() != 2) { + + int timeOut = 0; + while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) { Thread.sleep(2000); LOG.info("Waiting for next attempt to start"); } + Assert.assertEquals(2, mapTask1.getAttempts().size()); Iterator itr = mapTask1.getAttempts().values().iterator(); itr.next(); TaskAttempt task1Attempt2 = itr.next(); - app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); + // This attempt will automatically fail because of the way ContainerLauncher + // is setup + // This attempt 'disappears' from JobHistory and so causes MAPREDUCE-3846 + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(task1Attempt2.getID(), + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED)); + app.waitForState(task1Attempt2, TaskAttemptState.FAILED); - //send the kill signal to the 1st map 2nd attempt + timeOut = 0; + while (mapTask1.getAttempts().size() != 3 && timeOut++ < 10) { + Thread.sleep(2000); + LOG.info("Waiting for next attempt to start"); + } + Assert.assertEquals(3, mapTask1.getAttempts().size()); + itr = mapTask1.getAttempts().values().iterator(); + itr.next(); + itr.next(); + TaskAttempt task1Attempt3 = itr.next(); + + app.waitForState(task1Attempt3, TaskAttemptState.RUNNING); + + //send the kill signal to the 1st map 3rd attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( - task1Attempt2.getID(), + task1Attempt3.getID(), TaskAttemptEventType.TA_KILL)); - app.waitForState(task1Attempt2, TaskAttemptState.KILLED); - - while (mapTask1.getAttempts().size() != 3) { + app.waitForState(task1Attempt3, TaskAttemptState.KILLED); + + timeOut = 0; + while (mapTask1.getAttempts().size() != 4 && timeOut++ < 10) { Thread.sleep(2000); LOG.info("Waiting for next attempt to start"); } + Assert.assertEquals(4, mapTask1.getAttempts().size()); itr = mapTask1.getAttempts().values().iterator(); itr.next(); itr.next(); - TaskAttempt task1Attempt3 = itr.next(); + itr.next(); + TaskAttempt task1Attempt4 = itr.next(); - app.waitForState(task1Attempt3, TaskAttemptState.RUNNING); + app.waitForState(task1Attempt4, TaskAttemptState.RUNNING); - //send the done signal to the 1st map 3rd attempt + //send the done signal to the 1st map 4th attempt app.getContext().getEventHandler().handle( new TaskAttemptEvent( - task1Attempt3.getID(), + task1Attempt4.getID(), TaskAttemptEventType.TA_DONE)); + /////////// End of games with the TaskAttempts of the first task ////// + //wait for first map task to complete app.waitForState(mapTask1, TaskState.SUCCEEDED); long task1StartTime = mapTask1.getReport().getStartTime(); @@ -552,7 +588,7 @@ public class TestRecovery { } - class MRAppWithHistory extends MRApp { + static class MRAppWithHistory extends MRApp { public MRAppWithHistory(int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount) { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); @@ -567,7 +603,17 @@ public class TestRecovery { @Override protected ContainerLauncher createContainerLauncher(AppContext context) { - MockContainerLauncher launcher = new MockContainerLauncher(); + MockContainerLauncher launcher = new MockContainerLauncher() { + @Override + public void handle(ContainerLauncherEvent event) { + TaskAttemptId taskAttemptID = event.getTaskAttemptID(); + // Pass everything except the 2nd attempt of the first task. + if (taskAttemptID.getId() != 1 + || taskAttemptID.getTaskId().getId() != 0) { + super.handle(event); + } + } + }; launcher.shufflePort = 5467; return launcher; } @@ -581,7 +627,7 @@ public class TestRecovery { } } - class RecoveryServiceWithCustomDispatcher extends RecoveryService { + static class RecoveryServiceWithCustomDispatcher extends RecoveryService { public RecoveryServiceWithCustomDispatcher( ApplicationAttemptId applicationAttemptId, Clock clock, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Tue Feb 14 00:11:54 2012 @@ -25,7 +25,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Set; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -34,6 +34,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier; import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -72,7 +73,7 @@ public class TestTaskImpl { private Path remoteJobConfFile; private Collection> fsTokens; private Clock clock; - private Set completedTasksFromPreviousRun; + private Map completedTasksFromPreviousRun; private MRAppMetrics metrics; private TaskImpl mockTask; private ApplicationId appId; @@ -96,7 +97,7 @@ public class TestTaskImpl { TaskAttemptListener taskAttemptListener, OutputCommitter committer, Token jobToken, Collection> fsTokens, Clock clock, - Set completedTasksFromPreviousRun, int startCount, + Map completedTasksFromPreviousRun, int startCount, MRAppMetrics metrics) { super(jobId, taskType , partition, eventHandler, remoteJobConfFile, conf, taskAttemptListener, committer, Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1243755&r1=1243754&r2=1243755&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Tue Feb 14 00:11:54 2012 @@ -52,7 +52,6 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -@SuppressWarnings("deprecation") public class TypeConverter { private static RecordFactory recordFactory; @@ -116,8 +115,8 @@ public class TypeConverter { } public static org.apache.hadoop.mapred.TaskID fromYarn(TaskId id) { - return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), fromYarn(id.getTaskType()), - id.getId()); + return new org.apache.hadoop.mapred.TaskID(fromYarn(id.getJobId()), + fromYarn(id.getTaskType()), id.getId()); } public static TaskId toYarn(org.apache.hadoop.mapreduce.TaskID id) {