hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject svn commit: r1408411 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Date Mon, 12 Nov 2012 19:02:34 GMT
Author: bobby
Date: Mon Nov 12 19:02:32 2012
New Revision: 1408411

URL: http://svn.apache.org/viewvc?rev=1408411&view=rev
Log:
MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1408411&r1=1408410&r2=1408411&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Nov 12 19:02:32
2012
@@ -90,6 +90,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
 
     MAPREDUCE-4787. TestJobMonitorAndPrint is broken (Rob Parker via bobby)
+
+    MAPREDUCE-4425. Speculation + Fetch failures can lead to a hung job (jlowe 
+    via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408411&r1=1408410&r2=1408411&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Mon Nov 12 19:02:32 2012
@@ -212,15 +212,17 @@ public abstract class TaskImpl implement
     .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        EnumSet.of(TaskEventType.T_ATTEMPT_KILLED,
+            TaskEventType.T_ATTEMPT_SUCCEEDED),
+            new AttemptCompletedAtSucceededTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
-            TaskEventType.T_ATTEMPT_LAUNCHED,
-            TaskEventType.T_ATTEMPT_KILLED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED))
+            TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from FAILED state        
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -964,6 +966,8 @@ public abstract class TaskImpl implement
           !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
         // don't allow a different task attempt to override a previous
         // succeeded state
+        task.finishedAttempts.add(castEvent.getTaskAttemptID());
+        task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
         return TaskStateInternal.SUCCEEDED;
       }
       
@@ -993,6 +997,16 @@ public abstract class TaskImpl implement
     }
   }
 
+  private static class AttemptCompletedAtSucceededTransition
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      task.finishedAttempts.add(castEvent.getTaskAttemptID());
+      task.inProgressAttempts.remove(castEvent.getTaskAttemptID());
+    }
+  }
+
   private static class KillNewTransition 
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1408411&r1=1408410&r2=1408411&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
Mon Nov 12 19:02:32 2012
@@ -140,7 +140,6 @@ public class TestTaskImpl {
 
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
-    private TaskAttemptId attemptId;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -150,13 +149,10 @@ public class TestTaskImpl {
         AppContext appContext) {
       super(taskId, id, eventHandler, taskAttemptListener, jobFile, partition, conf,
           dataLocations, committer, jobToken, credentials, clock, appContext);
-      attemptId = Records.newRecord(TaskAttemptId.class);
-      attemptId.setId(id);
-      attemptId.setTaskId(taskId);
     }
 
     public TaskAttemptId getAttemptId() {
-      return attemptId;
+      return getID();
     }
     
     @Override
@@ -522,4 +518,46 @@ public class TestTaskImpl {
   public void testCommitAfterSucceeds() {
     runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
   }
+
+  @Test
+  public void testSpeculativeMapFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt killed
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
+
+  @Test
+  public void testSpeculativeMapMultipleSucceedFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt succeeds
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
+
+  @Test
+  public void testSpeculativeMapFailedFetchFailure() {
+    // Setup a scenario where speculative task wins, first attempt succeeds
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
+    assertEquals(2, taskAttempts.size());
+
+    // speculative attempt retroactively fails from fetch failures
+    mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(1).getAttemptId(),
+        TaskEventType.T_ATTEMPT_FAILED));
+
+    assertTaskScheduledState();
+    assertEquals(3, taskAttempts.size());
+  }
 }



Mime
View raw message