hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jl...@apache.org
Subject hadoop git commit: MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li
Date Fri, 18 Sep 2015 19:22:21 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 521196874 -> 7429438c8


MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed
by Chang Li


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7429438c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7429438c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7429438c

Branch: refs/heads/branch-2.7
Commit: 7429438c859da06ed1ed34aec0ff9873bd33f98d
Parents: 5211968
Author: Jason Lowe <jlowe@apache.org>
Authored: Fri Sep 18 19:21:35 2015 +0000
Committer: Jason Lowe <jlowe@apache.org>
Committed: Fri Sep 18 19:21:35 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../v2/app/job/impl/TaskAttemptImpl.java        |  93 ++++++------
 .../apache/hadoop/mapreduce/v2/app/MRApp.java   |  11 +-
 .../v2/app/job/impl/TestTaskAttempt.java        | 142 +++++++++++++++++++
 4 files changed, 202 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7429438c/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index cd98559..5469543 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -38,6 +38,9 @@ Release 2.7.2 - UNRELEASED
     position/key information for uncompressed input sometimes. (Zhihai Xu via
     jlowe)
 
+    MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can
+    disappear (Chang Li via jlowe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7429438c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index aa022a1..7aed78a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -1378,6 +1378,19 @@ public abstract class TaskAttemptImpl implements
     return tauce;
   }
 
+  private static void
+      sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) {
+    TaskAttemptContainerLaunchedEvent event;
+    taskAttempt.launchTime = taskAttempt.clock.getTime();
+
+    InetSocketAddress nodeHttpInetAddr =
+        NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress());
+    taskAttempt.trackerName = nodeHttpInetAddr.getHostName();
+    taskAttempt.httpPort = nodeHttpInetAddr.getPort();
+    taskAttempt.sendLaunchedEvents();
+  }
+
+
   @SuppressWarnings("unchecked")
   private void sendLaunchedEvents() {
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId()
@@ -1566,6 +1579,9 @@ public abstract class TaskAttemptImpl implements
     @Override
     public void transition(TaskAttemptImpl taskAttempt, 
         TaskAttemptEvent event) {
+      if (taskAttempt.getLaunchTime() == 0) {
+        sendJHStartEventForAssignedFailTask(taskAttempt);
+      }
       //set the finish time
       taskAttempt.setFinishTime();
 
@@ -1600,23 +1616,19 @@ public abstract class TaskAttemptImpl implements
         default:
           LOG.error("Task final state is not FAILED or KILLED: " + finalState);
       }
-      if (taskAttempt.getLaunchTime() != 0) {
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                finalState);
-        if(finalState == TaskAttemptStateInternal.FAILED) {
-          taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-        } else if(finalState == TaskAttemptStateInternal.KILLED) {
-          taskAttempt.eventHandler
-          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
-        }
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-      } else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
+
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              finalState);
+      if(finalState == TaskAttemptStateInternal.FAILED) {
+        taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+      } else if(finalState == TaskAttemptStateInternal.KILLED) {
+        taskAttempt.eventHandler
+        .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
       }
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
     }
   }
 
@@ -1708,23 +1720,20 @@ public abstract class TaskAttemptImpl implements
     @SuppressWarnings("unchecked")
     @Override
     public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      if (taskAttempt.getLaunchTime() == 0) {
+        sendJHStartEventForAssignedFailTask(taskAttempt);
+      }
       // set the finish time
       taskAttempt.setFinishTime();
+
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              TaskAttemptStateInternal.FAILED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
       
-      if (taskAttempt.getLaunchTime() != 0) {
-        taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptStateInternal.FAILED);
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-        // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
-        // handling failed map/reduce events.
-      }else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
-      }
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
     }
@@ -1861,27 +1870,25 @@ public abstract class TaskAttemptImpl implements
     @Override
     public void transition(TaskAttemptImpl taskAttempt,
         TaskAttemptEvent event) {
+      if (taskAttempt.getLaunchTime() == 0) {
+        sendJHStartEventForAssignedFailTask(taskAttempt);
+      }
       //set the finish time
       taskAttempt.setFinishTime();
-      if (taskAttempt.getLaunchTime() != 0) {
-        taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
-        TaskAttemptUnsuccessfulCompletionEvent tauce =
-            createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
-                TaskAttemptStateInternal.KILLED);
-        taskAttempt.eventHandler.handle(new JobHistoryEvent(
-            taskAttempt.attemptId.getTaskId().getJobId(), tauce));
-      }else {
-        LOG.debug("Not generating HistoryFinish event since start event not " +
-            "generated for taskAttempt: " + taskAttempt.getID());
-      }
+
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
+      TaskAttemptUnsuccessfulCompletionEvent tauce =
+          createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+              TaskAttemptStateInternal.KILLED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(
+          taskAttempt.attemptId.getTaskId().getJobId(), tauce));
 
       if (event instanceof TaskAttemptKillEvent) {
         taskAttempt.addDiagnosticInfo(
             ((TaskAttemptKillEvent) event).getMessage());
       }
 
-//      taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging
Map/Reduce attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7429438c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index 3100d12..4005671 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -528,10 +528,7 @@ public class MRApp extends MRAppMaster {
     public void handle(ContainerLauncherEvent event) {
       switch (event.getType()) {
       case CONTAINER_REMOTE_LAUNCH:
-        getContext().getEventHandler().handle(
-            new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(),
-                shufflePort));
-        
+        containerLaunched(event.getTaskAttemptID(), shufflePort);
         attemptLaunched(event.getTaskAttemptID());
         break;
       case CONTAINER_REMOTE_CLEANUP:
@@ -543,6 +540,12 @@ public class MRApp extends MRAppMaster {
     }
   }
 
+  protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) {
+    getContext().getEventHandler().handle(
+      new TaskAttemptContainerLaunchedEvent(attemptID,
+          shufflePort));
+  }
+
   protected void attemptLaunched(TaskAttemptId attemptID) {
     if (autoComplete) {
       // send the done event

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7429438c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index 1807c1c..e3dd67a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -113,6 +113,57 @@ public class TestTaskAttempt{
   }
 
   @Test
+  public void testMRAppHistoryForTAFailedInAssigned() throws Exception {
+    // test TA_CONTAINER_LAUNCH_FAILED for map
+    FailingAttemptsDuringAssignedMRApp app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_CONTAINER_LAUNCH_FAILED for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_CONTAINER_COMPLETED for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_CONTAINER_COMPLETED for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_FAILMSG for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_FAILMSG);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_FAILMSG for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_FAILMSG);
+    testTaskAttemptAssignedFailHistory(app);
+
+    // test TA_KILL for map
+    app =
+        new FailingAttemptsDuringAssignedMRApp(1, 0,
+            TaskAttemptEventType.TA_KILL);
+    testTaskAttemptAssignedKilledHistory(app);
+
+    // test TA_KILL for reduce
+    app =
+        new FailingAttemptsDuringAssignedMRApp(0, 1,
+            TaskAttemptEventType.TA_KILL);
+    testTaskAttemptAssignedKilledHistory(app);
+  }
+
+  @Test
   public void testSingleRackRequest() throws Exception {
     TaskAttemptImpl.RequestContainerTransition rct =
         new TaskAttemptImpl.RequestContainerTransition(false);
@@ -299,6 +350,31 @@ public class TestTaskAttempt{
         report.getTaskAttemptState());
   }
 
+  private void testTaskAttemptAssignedFailHistory
+      (FailingAttemptsDuringAssignedMRApp app) throws Exception {
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
+    Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent());
+  }
+
+  private void testTaskAttemptAssignedKilledHistory
+      (FailingAttemptsDuringAssignedMRApp app) throws Exception {
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+    Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.KILLED);
+    Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent());
+    Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent());
+  }
+
   static class FailingAttemptsMRApp extends MRApp {
     FailingAttemptsMRApp(int maps, int reduces) {
       super(maps, reduces, true, "FailingAttemptsMRApp", true);
@@ -329,6 +405,72 @@ public class TestTaskAttempt{
     }
   }
 
+  static class FailingAttemptsDuringAssignedMRApp extends MRApp {
+    FailingAttemptsDuringAssignedMRApp(int maps, int reduces,
+        TaskAttemptEventType event) {
+      super(maps, reduces, true, "FailingAttemptsMRApp", true);
+      sendFailEvent = event;
+    }
+
+   TaskAttemptEventType sendFailEvent;
+
+   @Override
+    protected void containerLaunched(TaskAttemptId attemptID,
+        int shufflePort) {
+      //do nothing, not send TA_CONTAINER_LAUNCHED event
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(attemptID, sendFailEvent));
+    }
+
+    private boolean receiveTaStartJHEvent = false;
+    private boolean receiveTaFailedJHEvent = false;
+    private boolean receiveTaKilledJHEvent = false;
+
+    public boolean getTaStartJHEvent(){
+      return receiveTaStartJHEvent;
+    }
+
+    public boolean getTaFailedJHEvent(){
+      return receiveTaFailedJHEvent;
+    }
+
+    public boolean getTaKilledJHEvent(){
+        return receiveTaKilledJHEvent;
+    }
+
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      return new EventHandler<JobHistoryEvent>() {
+        @Override
+        public void handle(JobHistoryEvent event) {
+          if (event.getType() == org.apache.hadoop.mapreduce.jobhistory.
+              EventType.MAP_ATTEMPT_FAILED) {
+            receiveTaFailedJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.MAP_ATTEMPT_KILLED) {
+            receiveTaKilledJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.MAP_ATTEMPT_STARTED) {
+            receiveTaStartJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.REDUCE_ATTEMPT_FAILED) {
+            receiveTaFailedJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+                  jobhistory.EventType.REDUCE_ATTEMPT_KILLED) {
+            receiveTaKilledJHEvent = true;
+          } else if (event.getType() == org.apache.hadoop.mapreduce.
+              jobhistory.EventType.REDUCE_ATTEMPT_STARTED) {
+            receiveTaStartJHEvent = true;
+          }
+        }
+      };
+    }
+  }
+
   @Test
   public void testLaunchFailedWhileKilling() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);


Mime
View raw message