hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject svn commit: r1445458 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ hadoop-mapreduce-client/hadoop-mapreduce-client-app...
Date Wed, 13 Feb 2013 02:20:00 GMT
Author: jlowe
Date: Wed Feb 13 02:20:00 2013
New Revision: 1445458

URL: http://svn.apache.org/r1445458
Log:
svn merge -c 1445456 FIXES: MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks
with speculative attempts. Contributed by Robert Parker

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1445458&r1=1445457&r2=1445458&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Feb 13 02:20:00
2013
@@ -62,6 +62,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4893. MR AppMaster can do sub-optimal assignment of containers
     to map tasks leading to poor node locality (Bikas Saha via jlowe)
 
+    MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
+    speculative attempts (Robert Parker via jlowe)
+
 Release 0.23.6 - 2013-02-06
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1445458&r1=1445457&r2=1445458&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
Wed Feb 13 02:20:00 2013
@@ -21,9 +21,12 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -108,7 +112,7 @@ public class RecoveryService extends Com
   private JobInfo jobInfo = null;
   private final Map<TaskId, TaskInfo> completedTasks =
     new HashMap<TaskId, TaskInfo>();
-
+  
   private final List<TaskEvent> pendingTaskScheduleEvents =
     new ArrayList<TaskEvent>();
 
@@ -193,6 +197,14 @@ public class RecoveryService extends Com
         .getAllTasks();
     for (TaskInfo taskInfo : taskInfos.values()) {
       if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator =

+            taskInfo.getAllTaskAttempts().entrySet().iterator();
+        while (taskAttemptIterator.hasNext()) {
+          Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+          if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey()))
{
+            taskAttemptIterator.remove();
+          }
+        }
         completedTasks
             .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
         LOG.info("Read from history task "
@@ -215,6 +227,7 @@ public class RecoveryService extends Com
         JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
     Path histDirPath =
         FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+    LOG.info("Trying file " + histDirPath.toString());
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
     // read the previous history file
     historyFile =

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1445458&r1=1445457&r2=1445458&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
Wed Feb 13 02:20:00 2013
@@ -50,11 +50,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
@@ -734,12 +738,173 @@ public class TestRecovery {
     app.verifyCompleted();
     validateOutput();
   }
-  
+
+  /**
+   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+   * completely disappears because of failed launch, one attempt gets killed and
+   * one attempt succeeds. AM crashes after the first tasks finishes and
+   * recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testSpeculative() throws Exception {
+
+    int runCount = 0;
+    long am1StartTimeEst = System.currentTimeMillis();
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    long jobStartTime = job.getReport().getStartTime();
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    // Launch a Speculative Task for the first Task
+    app.getContext().getEventHandler().handle(
+        new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT));
+    int timeOut = 0;
+    while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
+      Thread.sleep(1000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Iterator<TaskAttempt> t1it = mapTask1.getAttempts().values().iterator();
+    TaskAttempt task1Attempt1 = t1it.next();
+    TaskAttempt task1Attempt2 = t1it.next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+
+    ContainerId t1a2contId = task1Attempt2.getAssignedContainerID();
+
+    LOG.info(t1a2contId.toString());
+    LOG.info(task1Attempt1.getID().toString());
+    LOG.info(task1Attempt2.getID().toString());
+
+    // Launch container for speculative attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount));
+
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    //send the done signal to the map 1 attempt 1
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED);
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    long task1StartTime = mapTask1.getReport().getStartTime();
+    long task1FinishTime = mapTask1.getReport().getFinishTime();
+
+    //stop the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    long am2StartTimeEst = System.currentTimeMillis();
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    Assert.assertEquals("Job Start time not correct",
+        jobStartTime, job.getReport().getStartTime());
+    Assert.assertEquals("Task Start time not correct",
+        task1StartTime, mapTask1.getReport().getStartTime());
+    Assert.assertEquals("Task Finish time not correct",
+        task1FinishTime, mapTask1.getReport().getFinishTime());
+    Assert.assertEquals(2, job.getAMInfos().size());
+    int attemptNum = 1;
+    // Verify AMInfo
+    for (AMInfo amInfo : job.getAMInfos()) {
+      Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+          .getAttemptId());
+      Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+          .getApplicationAttemptId());
+      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    }
+    long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+    long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+    Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+        && am1StartTimeReal <= am2StartTimeEst);
+    Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+        && am2StartTimeReal <= System.currentTimeMillis());
+
+  }
+
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
       TypeConverter.fromYarn(attempt.getID()));
-  
+ 
   TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
   RecordWriter theRecordWriter = theOutputFormat
       .getRecordWriter(tContext);

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1445458&r1=1445457&r2=1445458&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
Wed Feb 13 02:20:00 2013
@@ -246,6 +246,7 @@ public class JobHistoryParser implements
     attemptInfo.state = StringInterner.weakIntern(event.getState());
     attemptInfo.counters = event.getCounters();
     attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
+    info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
   }
 
   private void handleReduceAttemptFinishedEvent
@@ -262,6 +263,7 @@ public class JobHistoryParser implements
     attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
     attemptInfo.port = event.getPort();
     attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
+    info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
   }
 
   private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -276,6 +278,7 @@ public class JobHistoryParser implements
     attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
     attemptInfo.port = event.getPort();
     attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
+    info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
   }
 
   private void handleTaskAttemptFailedEvent(
@@ -303,6 +306,7 @@ public class JobHistoryParser implements
         // not resetting the other fields set in handleTaskFinishedEvent()
       }
     }
+    info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
   }
 
   private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -440,6 +444,7 @@ public class JobHistoryParser implements
     Map<JobACL, AccessControlList> jobACLs;
     
     Map<TaskID, TaskInfo> tasksMap;
+    Map<TaskAttemptID, TaskAttemptInfo> completedTaskAttemptsMap;
     List<AMInfo> amInfos;
     AMInfo latestAmInfo;
     boolean uberized;
@@ -453,6 +458,7 @@ public class JobHistoryParser implements
       finishedMaps = finishedReduces = 0;
       username = jobname = jobConfPath = jobQueueName = "";
       tasksMap = new HashMap<TaskID, TaskInfo>();
+      completedTaskAttemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
       jobACLs = new HashMap<JobACL, AccessControlList>();
       priority = JobPriority.NORMAL;
     }
@@ -527,6 +533,8 @@ public class JobHistoryParser implements
     public Counters getReduceCounters() { return reduceCounters; }
     /** @return the map of all tasks in this job */
     public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; }
+    /** @return the map of all completed task attempts in this job */
+    public Map<TaskAttemptID, TaskAttemptInfo> getAllCompletedTaskAttempts() { return
completedTaskAttemptsMap; }
     /** @return the priority of this job */
     public String getPriority() { return priority.toString(); }
     public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }



Mime
View raw message