hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sunc...@apache.org
Subject [3/8] hadoop git commit: MAPREDUCE-7166. map-only job should ignore node lost event when task is already succeeded. Contributed by Lei Li.
Date Thu, 20 Dec 2018 06:21:35 GMT
MAPREDUCE-7166. map-only job should ignore node lost event when task is already succeeded.
Contributed by Lei Li.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/499c70ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/499c70ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/499c70ed

Branch: refs/heads/HDFS-12943
Commit: 499c70eda5f6315cf7e13a4b3bbefc7e187bc457
Parents: d963575
Author: Akira Ajisaka <aajisaka@apache.org>
Authored: Thu Dec 20 10:09:50 2018 +0900
Committer: Akira Ajisaka <aajisaka@apache.org>
Committed: Thu Dec 20 10:09:50 2018 +0900

----------------------------------------------------------------------
 .../v2/app/job/impl/TaskAttemptImpl.java        | 15 ++++
 .../v2/app/job/impl/TestTaskAttempt.java        | 84 +++++++++++++++++++-
 2 files changed, 95 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/499c70ed/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index 63e7456..d912b60 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -2196,6 +2196,14 @@ public abstract class TaskAttemptImpl implements
                   taskAttempt.getID().toString());
         return TaskAttemptStateInternal.SUCCEEDED;
       }
+      if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP
+          && taskAttempt.conf.getNumReduceTasks() == 0) {
+        // same reason as above for map only job after map task has succeeded.
+        // ignore this for map only tasks
+        LOG.info("Ignoring killed event for successful map only task attempt" +
+            taskAttempt.getID().toString());
+        return TaskAttemptStateInternal.SUCCEEDED;
+      }
       if(event instanceof TaskAttemptKillEvent) {
         TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
         //add to diagnostic
@@ -2246,6 +2254,13 @@ public abstract class TaskAttemptImpl implements
         LOG.info("Ignoring killed event for successful reduce task attempt" +
             taskAttempt.getID().toString());
         return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
+      } else if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP
+          && taskAttempt.conf.getNumReduceTasks() == 0) {
+        // same reason as above for map only job after map task has succeeded.
+        // ignore this for map only tasks
+        LOG.info("Ignoring killed event for successful map only task attempt" +
+            taskAttempt.getID().toString());
+        return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
       } else {
         // Store reschedule flag so that after clean up is completed, new
         // attempt is scheduled/rescheduled based on it.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/499c70ed/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
index d62a6cc..11f16a8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
@@ -1323,6 +1323,42 @@ public class TestTaskAttempt{
   }
 
   @Test
+  public void testKillMapOnlyTaskWhileSuccessFinishing() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createMapOnlyTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not " +
+            "SUCCESS_FINISHING_CONTAINER",
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+        taImpl.getInternalState());
+
+    // If the map only task is killed when it is in SUCCESS_FINISHING_CONTAINER
+    // state, the state will move to SUCCESS_CONTAINER_CLEANUP
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_KILL));
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not " +
+            "SUCCESS_CONTAINER_CLEANUP",
+        TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+        taImpl.getInternalState());
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not SUCCEEDED state",
+        TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState());
+
+    assertFalse("InternalError occurred", eventHandler.internalError);
+  }
+
+  @Test
   public void testKillMapTaskAfterSuccess() throws Exception {
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
@@ -1340,7 +1376,7 @@ public class TestTaskAttempt{
         TaskAttemptEventType.TA_CONTAINER_CLEANED));
     // Send a map task attempt kill event indicating next map attempt has to be
     // reschedule
-    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(), "", true));
     assertEquals("Task attempt is not in KILLED state", taImpl.getState(),
         TaskAttemptState.KILLED);
     assertEquals("Task attempt's internal state is not KILLED",
@@ -1354,6 +1390,34 @@ public class TestTaskAttempt{
   }
 
   @Test
+  public void testKillMapOnlyTaskAfterSuccess() throws Exception {
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptImpl taImpl = createMapOnlyTaskAttemptImpl(eventHandler);
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not " +
+            "SUCCESS_FINISHING_CONTAINER",
+        TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+        taImpl.getInternalState());
+
+    taImpl.handle(new TaskAttemptEvent(taImpl.getID(),
+        TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    // Succeeded
+    taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true));
+    assertEquals("Task attempt is not in SUCCEEDED state",
+        TaskAttemptState.SUCCEEDED, taImpl.getState());
+    assertEquals("Task attempt's internal state is not SUCCEEDED",
+        TaskAttemptStateInternal.SUCCEEDED, taImpl.getInternalState());
+    assertFalse("InternalError occurred", eventHandler.internalError);
+    TaskEvent event = eventHandler.lastTaskEvent;
+    assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, event.getType());
+  }
+
+  @Test
   public void testKillMapTaskWhileFailFinishing() throws Exception {
     MockEventHandler eventHandler = new MockEventHandler();
     TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler);
@@ -1765,8 +1829,8 @@ public class TestTaskAttempt{
         thenReturn(taskAttemptFinishingMonitor);
   }
 
-  private TaskAttemptImpl createTaskAttemptImpl(
-      MockEventHandler eventHandler) {
+  private TaskAttemptImpl createCommonTaskAttemptImpl(
+      MockEventHandler eventHandler, JobConf jobConf) {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 0);
@@ -1778,7 +1842,6 @@ public class TestTaskAttempt{
     TaskAttemptListener taListener = mock(TaskAttemptListener.class);
     when(taListener.getAddress()).thenReturn(new InetSocketAddress("localhost", 0));
 
-    JobConf jobConf = new JobConf();
     jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
     jobConf.setBoolean("fs.file.impl.disable.cache", true);
     jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
@@ -1813,6 +1876,19 @@ public class TestTaskAttempt{
     return taImpl;
   }
 
+  private TaskAttemptImpl createTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    JobConf jobConf = new JobConf();
+    return createCommonTaskAttemptImpl(eventHandler, jobConf);
+  }
+
+  private TaskAttemptImpl createMapOnlyTaskAttemptImpl(
+      MockEventHandler eventHandler) {
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(MRJobConfig.NUM_REDUCES, 0);
+    return createCommonTaskAttemptImpl(eventHandler, jobConf);
+  }
+
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
     public TaskEvent lastTaskEvent;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message