hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject svn commit: r1408349 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Date Mon, 12 Nov 2012 16:56:14 GMT
Author: bobby
Date: Mon Nov 12 16:56:13 2012
New Revision: 1408349

URL: http://svn.apache.org/viewvc?rev=1408349&view=rev
Log:
svn merge -c 1408314 FIXES: MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1408349&r1=1408348&r2=1408349&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Mon Nov 12 16:56:13
2012
@@ -86,6 +86,8 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
     state (jlowe via bobby)
+
+    MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1408349&r1=1408348&r2=1408349&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Mon Nov 12 16:56:13 2012
@@ -685,7 +685,10 @@ public class JobImpl implements org.apac
    * The only entry point to change the Job.
    */
   public void handle(JobEvent event) {
-    LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing " + event.getJobId() + " of type "
+          + event.getType());
+    }
     try {
       writeLock.lock();
       JobStateInternal oldState = getInternalState();

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408349&r1=1408348&r2=1408349&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Mon Nov 12 16:56:13 2012
@@ -22,9 +22,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -116,9 +118,18 @@ public abstract class TaskImpl implement
   protected Credentials credentials;
   protected Token<JobTokenIdentifier> jobToken;
   
+  //should be set to one which comes first
+  //saying COMMIT_PENDING
+  private TaskAttemptId commitAttempt;
+
+  private TaskAttemptId successfulAttempt;
+
+  private final Set<TaskAttemptId> failedAttempts;
+  // Track the finished attempts - successful, failed and killed
+  private final Set<TaskAttemptId> finishedAttempts;
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
-  private int numberUncompletedAttempts = 0;
+  private final Set<TaskAttemptId> inProgressAttempts;
 
   private boolean historyTaskStartGenerated = false;
   
@@ -180,6 +191,14 @@ public abstract class TaskImpl implement
         EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
         new KillWaitAttemptKilledTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+        TaskEventType.T_ATTEMPT_SUCCEEDED,
+        new KillWaitAttemptSucceededTransition())
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+        TaskEventType.T_ATTEMPT_FAILED,
+        new KillWaitAttemptFailedTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.KILL_WAIT,
@@ -187,8 +206,6 @@ public abstract class TaskImpl implement
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
-            TaskEventType.T_ATTEMPT_FAILED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED,
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
@@ -238,15 +255,6 @@ public abstract class TaskImpl implement
   private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
       new RecoverdAttemptsComparator();
 
-  //should be set to one which comes first
-  //saying COMMIT_PENDING
-  private TaskAttemptId commitAttempt;
-
-  private TaskAttemptId successfulAttempt;
-
-  private int failedAttempts;
-  private int finishedAttempts;//finish are total of success, failed and killed
-
   @Override
   public TaskState getState() {
     readLock.lock();
@@ -271,6 +279,9 @@ public abstract class TaskImpl implement
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
+    this.finishedAttempts = new HashSet<TaskAttemptId>(2);
+    this.failedAttempts = new HashSet<TaskAttemptId>(2);
+    this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
     // This overridable method call is okay in a constructor because we
     //  have a convention that none of the overrides depends on any
     //  fields that need initialization.
@@ -605,9 +616,9 @@ public abstract class TaskImpl implement
           taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
     }
 
-    ++numberUncompletedAttempts;
+    inProgressAttempts.add(attempt.getID());
     //schedule the nextAttemptNumber
-    if (failedAttempts > 0) {
+    if (failedAttempts.size() > 0) {
       eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
         TaskAttemptEventType.TA_RESCHEDULE));
     } else {
@@ -654,7 +665,6 @@ public abstract class TaskImpl implement
   // always called inside a transition, in turn inside the Write Lock
   private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
       TaskAttemptCompletionEventStatus status) {
-    finishedAttempts++;
     TaskAttempt attempt = attempts.get(attemptId);
     //raise the completion event only if the container is assigned
     // to nextAttemptNumber
@@ -707,6 +717,11 @@ public abstract class TaskImpl implement
     return taskFailedEvent;
   }
 
+  private static void unSucceed(TaskImpl task) {
+    task.commitAttempt = null;
+    task.successfulAttempt = null;
+  }
+
   /**
   * @return a String representation of the splits.
   *
@@ -777,11 +792,14 @@ public abstract class TaskImpl implement
       implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
+      TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
+      TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
       task.handleTaskAttemptCompletion(
-          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          taskAttemptId, 
           TaskAttemptCompletionEventStatus.SUCCEEDED);
-      --task.numberUncompletedAttempts;
-      task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
+      task.finishedAttempts.add(taskAttemptId);
+      task.inProgressAttempts.remove(taskAttemptId);
+      task.successfulAttempt = taskAttemptId;
       task.eventHandler.handle(new JobTaskEvent(
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
@@ -812,10 +830,13 @@ public abstract class TaskImpl implement
       SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
+      TaskAttemptId taskAttemptId =
+          ((TaskTAttemptEvent) event).getTaskAttemptID();
       task.handleTaskAttemptCompletion(
-          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          taskAttemptId, 
           TaskAttemptCompletionEventStatus.KILLED);
-      --task.numberUncompletedAttempts;
+      task.finishedAttempts.add(taskAttemptId);
+      task.inProgressAttempts.remove(taskAttemptId);
       if (task.successfulAttempt == null) {
         task.addAndScheduleAttempt();
       }
@@ -827,14 +848,25 @@ public abstract class TaskImpl implement
       MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     protected TaskStateInternal finalState = TaskStateInternal.KILLED;
+    protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
+
+    public KillWaitAttemptKilledTransition() {
+      this(TaskAttemptCompletionEventStatus.KILLED);
+    }
+
+    public KillWaitAttemptKilledTransition(
+        TaskAttemptCompletionEventStatus taCompletionEventStatus) {
+      this.taCompletionEventStatus = taCompletionEventStatus;
+    }
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      task.handleTaskAttemptCompletion(
-          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
-          TaskAttemptCompletionEventStatus.KILLED);
+      TaskAttemptId taskAttemptId =
+          ((TaskTAttemptEvent) event).getTaskAttemptID();
+      task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
+      task.finishedAttempts.add(taskAttemptId);
       // check whether all attempts are finished
-      if (task.finishedAttempts == task.attempts.size()) {
+      if (task.finishedAttempts.size() == task.attempts.size()) {
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
               finalState, null); // TODO JH verify failedAttempt null
@@ -853,42 +885,57 @@ public abstract class TaskImpl implement
     }
   }
 
+  private static class KillWaitAttemptSucceededTransition extends
+      KillWaitAttemptKilledTransition {
+    public KillWaitAttemptSucceededTransition() {
+      super(TaskAttemptCompletionEventStatus.SUCCEEDED);
+    }
+  }
+
+  private static class KillWaitAttemptFailedTransition extends
+      KillWaitAttemptKilledTransition {
+    public KillWaitAttemptFailedTransition() {
+      super(TaskAttemptCompletionEventStatus.FAILED);
+    }
+  }
+
   private static class AttemptFailedTransition implements
     MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-      if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+      TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
+      task.failedAttempts.add(taskAttemptId); 
+      if (taskAttemptId.equals(task.commitAttempt)) {
         task.commitAttempt = null;
       }
-      TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
+      TaskAttempt attempt = task.attempts.get(taskAttemptId);
       if (attempt.getAssignedContainerMgrAddress() != null) {
         //container was assigned
         task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(), 
             attempt.getAssignedContainerMgrAddress()));
       }
       
-      if (task.failedAttempts < task.maxAttempts) {
+      task.finishedAttempts.add(taskAttemptId);
+      if (task.failedAttempts.size() < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
-            ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+            taskAttemptId, 
             TaskAttemptCompletionEventStatus.FAILED);
         // we don't need a new event if we already have a spare
-        if (--task.numberUncompletedAttempts == 0
+        task.inProgressAttempts.remove(taskAttemptId);
+        if (task.inProgressAttempts.size() == 0
             && task.successfulAttempt == null) {
           task.addAndScheduleAttempt();
         }
       } else {
         task.handleTaskAttemptCompletion(
-            ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+            taskAttemptId, 
             TaskAttemptCompletionEventStatus.TIPFAILED);
-        TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
-        TaskAttemptId taId = ev.getTaskAttemptID();
         
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
-            TaskStateInternal.FAILED, taId);
+            TaskStateInternal.FAILED, taskAttemptId);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             taskFailedEvent));
         } else {
@@ -905,12 +952,6 @@ public abstract class TaskImpl implement
     protected TaskStateInternal getDefaultState(TaskImpl task) {
       return task.getInternalState();
     }
-
-    protected void unSucceed(TaskImpl task) {
-      ++task.numberUncompletedAttempts;
-      task.commitAttempt = null;
-      task.successfulAttempt = null;
-    }
   }
 
   private static class MapRetroactiveFailureTransition
@@ -918,14 +959,12 @@ public abstract class TaskImpl implement
 
     @Override
     public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
-      if (event instanceof TaskTAttemptEvent) {
-        TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-        if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
-            !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
-          // don't allow a different task attempt to override a previous
-          // succeeded state
-          return TaskStateInternal.SUCCEEDED;
-        }
+      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
+          !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+        // don't allow a different task attempt to override a previous
+        // succeeded state
+        return TaskStateInternal.SUCCEEDED;
       }
       
       //verify that this occurs only for map task
@@ -943,6 +982,8 @@ public abstract class TaskImpl implement
       //  fails, we have to let AttemptFailedTransition.transition
       //  believe that there's no redundancy.
       unSucceed(task);
+      // fake increase in Uncomplete attempts for super.transition
+      task.inProgressAttempts.add(castEvent.getTaskAttemptID());
       return super.transition(task, event);
     }
 
@@ -991,7 +1032,7 @@ public abstract class TaskImpl implement
             (attempt, "Task KILL is received. Killing attempt!");
       }
 
-      task.numberUncompletedAttempts = 0;
+      task.inProgressAttempts.clear();
     }
   }
 

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1408349&r1=1408348&r2=1408349&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
Mon Nov 12 16:56:13 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -243,6 +245,39 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
+  public void waitForInternalState(JobImpl job,
+      JobStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    JobStateInternal iState = job.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("Job Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState);
+      Thread.sleep(500);
+      iState = job.getInternalState();
+    }
+    System.out.println("Task Internal State is : " + iState);
+    Assert.assertEquals("Task Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
+  public void waitForInternalState(TaskImpl task,
+      TaskStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskReport report = task.getReport();
+    TaskStateInternal iState = task.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("Task Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState + "   progress : "
+          + report.getProgress());
+      Thread.sleep(500);
+      report = task.getReport();
+      iState = task.getInternalState();
+    }
+    System.out.println("Task Internal State is : " + iState);
+    Assert.assertEquals("Task Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
   public void waitForInternalState(TaskAttemptImpl attempt,
       TaskAttemptStateInternal finalState) throws Exception {
     int timeoutSecs = 0;

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1408349&r1=1408348&r2=1408349&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
Mon Nov 12 16:56:13 2012
@@ -25,12 +25,15 @@ import java.util.concurrent.CountDownLat
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.junit.Test;
 
 /**
  * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
  *
  */
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TestKill {
 
   @Test
@@ -132,6 +141,80 @@ public class TestKill {
   }
 
   @Test
+  public void testKillTaskWait() throws Exception {
+    final Dispatcher dispatcher = new AsyncDispatcher() {
+      private TaskAttemptEvent cachedKillEvent;
+      @Override
+      protected void dispatch(Event event) {
+        if (event instanceof TaskAttemptEvent) {
+          TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
+          if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
+            TaskAttemptId taID = killEvent.getTaskAttemptID();
+            if (taID.getTaskId().getTaskType() == TaskType.REDUCE
+                && taID.getTaskId().getId() == 0 && taID.getId() == 0) {
+              // Task is asking the reduce TA to kill itself. 'Create' a race
+              // condition. Make the task succeed and then inform the task that
+              // TA has succeeded. Once Task gets the TA succeeded event at
+              // KILL_WAIT, then relay the actual kill signal to TA
+              super.dispatch(new TaskAttemptEvent(taID,
+                TaskAttemptEventType.TA_DONE));
+              super.dispatch(new TaskAttemptEvent(taID,
+                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+              super.dispatch(new TaskTAttemptEvent(taID,
+                TaskEventType.T_ATTEMPT_SUCCEEDED));
+              this.cachedKillEvent = killEvent;
+              return;
+            }
+          }
+        } else if (event instanceof TaskEvent) {
+          TaskEvent taskEvent = (TaskEvent) event;
+          if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
+              && this.cachedKillEvent != null) {
+            // When the TA comes and reports that it is done, send the
+            // cachedKillEvent
+            super.dispatch(this.cachedKillEvent);
+            return;
+          }
+
+        }
+        super.dispatch(event);
+      }
+    };
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+      @Override
+      public Dispatcher createDispatcher() {
+        return dispatcher;
+      }
+    };
+    Job job = app.submit(new Configuration());
+    JobId jobId = app.getJobId();
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    app.waitForState(mapTask, TaskState.RUNNING);
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+    TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    // Finish map
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+    // Now kill the job
+    app.getContext().getEventHandler()
+      .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+    app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+  }
+
+  @Test
   public void testKillTaskAttempt() throws Exception {
     final CountDownLatch latch = new CountDownLatch(1);
     MRApp app = new BlockingMRApp(2, 0, latch);



Mime
View raw message