hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1400351 [2/3] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/mai...
Date Sat, 20 Oct 2012 00:34:02 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Oct 20 00:33:57 2012
@@ -44,7 +44,6 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
@@ -59,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
@@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.state.Sing
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Implementation of Task interface.
  */
@@ -127,62 +129,62 @@ public abstract class TaskImpl implement
      KILL_TRANSITION = new KillTransition();
 
   private static final StateMachineFactory
-               <TaskImpl, TaskState, TaskEventType, TaskEvent> 
+               <TaskImpl, TaskStateInternal, TaskEventType, TaskEvent> 
             stateMachineFactory 
-           = new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent>
-               (TaskState.NEW)
+           = new StateMachineFactory<TaskImpl, TaskStateInternal, TaskEventType, TaskEvent>
+               (TaskStateInternal.NEW)
 
     // define the state machine of Task
 
     // Transitions from NEW state
-    .addTransition(TaskState.NEW, TaskState.SCHEDULED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.SCHEDULED, 
         TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
-    .addTransition(TaskState.NEW, TaskState.KILLED, 
+    .addTransition(TaskStateInternal.NEW, TaskStateInternal.KILLED, 
         TaskEventType.T_KILL, new KillNewTransition())
 
     // Transitions from SCHEDULED state
       //when the first attempt is launched, the task state is set to RUNNING
-     .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.RUNNING, 
          TaskEventType.T_ATTEMPT_LAUNCHED, new LaunchTransition())
-     .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT, 
          TaskEventType.T_KILL, KILL_TRANSITION)
-     .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, 
+     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED, 
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
-     .addTransition(TaskState.SCHEDULED, 
-        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), 
+     .addTransition(TaskStateInternal.SCHEDULED, 
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED), 
         TaskEventType.T_ATTEMPT_FAILED, 
         new AttemptFailedTransition())
  
     // Transitions from RUNNING state
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_COMMIT_PENDING,
         new AttemptCommitPendingTransition())
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
         TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
-    .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.SUCCEEDED, 
         TaskEventType.T_ATTEMPT_SUCCEEDED,
         new AttemptSucceededTransition())
-    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING, 
         TaskEventType.T_ATTEMPT_KILLED,
         ATTEMPT_KILLED_TRANSITION)
-    .addTransition(TaskState.RUNNING, 
-        EnumSet.of(TaskState.RUNNING, TaskState.FAILED), 
+    .addTransition(TaskStateInternal.RUNNING, 
+        EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED), 
         TaskEventType.T_ATTEMPT_FAILED,
         new AttemptFailedTransition())
-    .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, 
+    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.KILL_WAIT, 
         TaskEventType.T_KILL, KILL_TRANSITION)
 
     // Transitions from KILL_WAIT state
-    .addTransition(TaskState.KILL_WAIT,
-        EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED),
+    .addTransition(TaskStateInternal.KILL_WAIT,
+        EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
         TaskEventType.T_ATTEMPT_KILLED,
         new KillWaitAttemptKilledTransition())
     // Ignore-able transitions.
     .addTransition(
-        TaskState.KILL_WAIT,
-        TaskState.KILL_WAIT,
+        TaskStateInternal.KILL_WAIT,
+        TaskStateInternal.KILL_WAIT,
         EnumSet.of(TaskEventType.T_KILL,
             TaskEventType.T_ATTEMPT_LAUNCHED,
             TaskEventType.T_ATTEMPT_COMMIT_PENDING,
@@ -191,32 +193,32 @@ public abstract class TaskImpl implement
             TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from SUCCEEDED state
-    .addTransition(TaskState.SUCCEEDED,
-        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED),
+    .addTransition(TaskStateInternal.SUCCEEDED,
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new RetroactiveFailureTransition())
-    .addTransition(TaskState.SUCCEEDED,
-        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
+    .addTransition(TaskStateInternal.SUCCEEDED,
+        EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_KILLED, new RetroactiveKilledTransition())
     // Ignore-able transitions.
     .addTransition(
-        TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+        TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from FAILED state        
-    .addTransition(TaskState.FAILED, TaskState.FAILED,
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
-    .addTransition(TaskState.KILLED, TaskState.KILLED,
+    .addTransition(TaskStateInternal.KILLED, TaskStateInternal.KILLED,
         EnumSet.of(TaskEventType.T_KILL,
                    TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // create the topology tables
     .installTopology();
 
-  private final StateMachine<TaskState, TaskEventType, TaskEvent>
+  private final StateMachine<TaskStateInternal, TaskEventType, TaskEvent>
     stateMachine;
 
   // By default, the next TaskAttempt number is zero. Changes during recovery  
@@ -247,7 +249,12 @@ public abstract class TaskImpl implement
 
   @Override
   public TaskState getState() {
-    return stateMachine.getCurrentState();
+    readLock.lock();
+    try {
+      return getExternalState(getInternalState());
+    } finally {
+      readLock.unlock();
+    }
   }
 
   public TaskImpl(JobId jobId, TaskType taskType, int partition,
@@ -356,9 +363,9 @@ public abstract class TaskImpl implement
     readLock.lock();
     try {
      // TODO: Use stateMachine level method?
-      return (getState() == TaskState.SUCCEEDED ||
-          getState() == TaskState.FAILED ||
-          getState() == TaskState.KILLED);
+      return (getInternalState() == TaskStateInternal.SUCCEEDED ||
+              getInternalState() == TaskStateInternal.FAILED ||
+              getInternalState() == TaskStateInternal.KILLED);
     } finally {
       readLock.unlock();
     }
@@ -433,6 +440,24 @@ public abstract class TaskImpl implement
     }
   }
 
+  @VisibleForTesting
+  public TaskStateInternal getInternalState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private static TaskState getExternalState(TaskStateInternal smState) {
+    if (smState == TaskStateInternal.KILL_WAIT) {
+      return TaskState.KILLED;
+    } else {
+      return TaskState.valueOf(smState.name());
+    }
+  }
+
   //this is always called in read/write lock
   private long getLaunchTime() {
     long taskLaunchTime = 0;
@@ -484,8 +509,8 @@ public abstract class TaskImpl implement
     return finishTime;
   }
   
-  private TaskState finished(TaskState finalState) {
-    if (getState() == TaskState.RUNNING) {
+  private TaskStateInternal finished(TaskStateInternal finalState) {
+    if (getInternalState() == TaskStateInternal.RUNNING) {
       metrics.endRunningTask(this);
     }
     return finalState;
@@ -500,11 +525,7 @@ public abstract class TaskImpl implement
       switch (at.getState()) {
       
       // ignore all failed task attempts
-      case FAIL_CONTAINER_CLEANUP: 
-      case FAIL_TASK_CLEANUP: 
       case FAILED: 
-      case KILL_CONTAINER_CLEANUP: 
-      case KILL_TASK_CLEANUP: 
       case KILLED:
         continue;      
       }      
@@ -605,7 +626,7 @@ public abstract class TaskImpl implement
     }
     try {
       writeLock.lock();
-      TaskState oldState = getState();
+      TaskStateInternal oldState = getInternalState();
       try {
         stateMachine.doTransition(event.getType(), event);
       } catch (InvalidStateTransitonException e) {
@@ -613,9 +634,9 @@ public abstract class TaskImpl implement
             + this.taskId, e);
         internalError(event.getType());
       }
-      if (oldState != getState()) {
+      if (oldState != getInternalState()) {
         LOG.info(taskId + " Task Transitioned from " + oldState + " to "
-            + getState());
+            + getInternalState());
       }
 
     } finally {
@@ -659,7 +680,7 @@ public abstract class TaskImpl implement
     }
   }
 
-  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) {
+  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskStateInternal taskState) {
     TaskFinishedEvent tfe =
       new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
         TypeConverter.fromYarn(task.successfulAttempt),
@@ -670,7 +691,7 @@ public abstract class TaskImpl implement
     return tfe;
   }
   
-  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskState taskState, TaskAttemptId taId) {
+  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, List<String> diag, TaskStateInternal taskState, TaskAttemptId taId) {
     StringBuilder errorSb = new StringBuilder();
     if (diag != null) {
       for (String d : diag) {
@@ -775,7 +796,7 @@ public abstract class TaskImpl implement
       // issue kill to all other attempts
       if (task.historyTaskStartGenerated) {
         TaskFinishedEvent tfe = createTaskFinishedEvent(task,
-            TaskState.SUCCEEDED);
+            TaskStateInternal.SUCCEEDED);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             tfe));
       }
@@ -791,7 +812,7 @@ public abstract class TaskImpl implement
                   TaskAttemptEventType.TA_KILL));
         }
       }
-      task.finished(TaskState.SUCCEEDED);
+      task.finished(TaskStateInternal.SUCCEEDED);
     }
   }
 
@@ -812,12 +833,12 @@ public abstract class TaskImpl implement
 
 
   private static class KillWaitAttemptKilledTransition implements
-      MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
-    protected TaskState finalState = TaskState.KILLED;
+    protected TaskStateInternal finalState = TaskStateInternal.KILLED;
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
@@ -835,18 +856,18 @@ public abstract class TaskImpl implement
         }
 
         task.eventHandler.handle(
-            new JobTaskEvent(task.taskId, finalState));
+            new JobTaskEvent(task.taskId, getExternalState(finalState)));
         return finalState;
       }
-      return task.getState();
+      return task.getInternalState();
     }
   }
 
   private static class AttemptFailedTransition implements
-    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       task.failedAttempts++;
       TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
       if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
@@ -878,7 +899,7 @@ public abstract class TaskImpl implement
         
         if (task.historyTaskStartGenerated) {
         TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
-            TaskState.FAILED, taId);
+            TaskStateInternal.FAILED, taId);
         task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
             taskFailedEvent));
         } else {
@@ -887,13 +908,13 @@ public abstract class TaskImpl implement
         }
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));
-        return task.finished(TaskState.FAILED);
+        return task.finished(TaskStateInternal.FAILED);
       }
       return getDefaultState(task);
     }
 
-    protected TaskState getDefaultState(Task task) {
-      return task.getState();
+    protected TaskStateInternal getDefaultState(TaskImpl task) {
+      return task.getInternalState();
     }
   }
 
@@ -901,14 +922,14 @@ public abstract class TaskImpl implement
       extends AttemptFailedTransition {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       if (event instanceof TaskTAttemptEvent) {
         TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
-        if (task.getState() == TaskState.SUCCEEDED &&
+        if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
             !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
           // don't allow a different task attempt to override a previous
           // succeeded state
-          return TaskState.SUCCEEDED;
+          return TaskStateInternal.SUCCEEDED;
         }
       }
 
@@ -933,25 +954,25 @@ public abstract class TaskImpl implement
     }
 
     @Override
-    protected TaskState getDefaultState(Task task) {
-      return TaskState.SCHEDULED;
+    protected TaskStateInternal getDefaultState(TaskImpl task) {
+      return TaskStateInternal.SCHEDULED;
     }
   }
 
   private static class RetroactiveKilledTransition implements
-    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
 
     @Override
-    public TaskState transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
       TaskAttemptId attemptId = null;
       if (event instanceof TaskTAttemptEvent) {
         TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
         attemptId = castEvent.getTaskAttemptID(); 
-        if (task.getState() == TaskState.SUCCEEDED &&
+        if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
             !attemptId.equals(task.successfulAttempt)) {
           // don't allow a different task attempt to override a previous
           // succeeded state
-          return TaskState.SUCCEEDED;
+          return TaskStateInternal.SUCCEEDED;
         }
       }
 
@@ -977,7 +998,7 @@ public abstract class TaskImpl implement
       // to the RM. But the RM would ignore that just like it would ignore
       // currently pending container requests affinitized to bad nodes.
       task.addAndScheduleAttempt();
-      return TaskState.SCHEDULED;
+      return TaskStateInternal.SCHEDULED;
     }
   }
 
@@ -988,7 +1009,7 @@ public abstract class TaskImpl implement
       
       if (task.historyTaskStartGenerated) {
       TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
-            TaskState.KILLED, null); // TODO Verify failedAttemptId is null
+            TaskStateInternal.KILLED, null); // TODO Verify failedAttemptId is null
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
           taskFailedEvent));
       }else {
@@ -996,8 +1017,8 @@ public abstract class TaskImpl implement
         		" generated for task: " + task.getID());
       }
 
-      task.eventHandler.handle(
-          new JobTaskEvent(task.taskId, TaskState.KILLED));
+      task.eventHandler.handle(new JobTaskEvent(task.taskId,
+          getExternalState(TaskStateInternal.KILLED)));
       task.metrics.endWaitingTask(task);
     }
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Sat Oct 20 00:33:57 2012
@@ -31,10 +31,11 @@ import org.apache.hadoop.mapreduce.JobID
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -163,13 +164,14 @@ public abstract class RMCommunicator ext
   protected void unregister() {
     try {
       FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
-      if (job.getState() == JobState.SUCCEEDED) {
+      JobImpl jobImpl = (JobImpl)job;
+      if (jobImpl.getInternalState() == JobStateInternal.SUCCEEDED) {
         finishState = FinalApplicationStatus.SUCCEEDED;
-      } else if (job.getState() == JobState.KILLED
-          || (job.getState() == JobState.RUNNING && isSignalled)) {
+      } else if (jobImpl.getInternalState() == JobStateInternal.KILLED
+          || (jobImpl.getInternalState() == JobStateInternal.RUNNING && isSignalled)) {
         finishState = FinalApplicationStatus.KILLED;
-      } else if (job.getState() == JobState.FAILED
-          || job.getState() == JobState.ERROR) {
+      } else if (jobImpl.getInternalState() == JobStateInternal.FAILED
+          || jobImpl.getInternalState() == JobStateInternal.ERROR) {
         finishState = FinalApplicationStatus.FAILED;
       }
       StringBuffer sb = new StringBuffer();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Sat Oct 20 00:33:57 2012
@@ -365,7 +365,7 @@ public class DefaultSpeculator extends A
 
     for (TaskAttempt taskAttempt : attempts.values()) {
       if (taskAttempt.getState() == TaskAttemptState.RUNNING
-          || taskAttempt.getState() == TaskAttemptState.ASSIGNED) {
+          || taskAttempt.getState() == TaskAttemptState.STARTING) {
         if (++numberRunningAttempts > 1) {
           return ALREADY_SPECULATING;
         }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Sat Oct 20 00:33:57 2012
@@ -17,20 +17,33 @@
 */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
 
 public class TestTaskAttemptListenerImpl {
@@ -115,4 +128,67 @@ public class TestTaskAttemptListenerImpl
 
     listener.stop();
   }
+
+  @Test
+  public void testGetMapCompletionEvents() throws IOException {
+    TaskAttemptCompletionEvent[] empty = {};
+    TaskAttemptCompletionEvent[] taskEvents = {
+        createTce(0, true, TaskAttemptCompletionEventStatus.OBSOLETE),
+        createTce(1, false, TaskAttemptCompletionEventStatus.FAILED),
+        createTce(2, true, TaskAttemptCompletionEventStatus.SUCCEEDED),
+        createTce(3, false, TaskAttemptCompletionEventStatus.FAILED) };
+    TaskAttemptCompletionEvent[] mapEvents = { taskEvents[0], taskEvents[2] };
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTaskAttemptCompletionEvents(0, 100))
+      .thenReturn(taskEvents);
+    when(mockJob.getTaskAttemptCompletionEvents(0, 2))
+      .thenReturn(Arrays.copyOfRange(taskEvents, 0, 2));
+    when(mockJob.getTaskAttemptCompletionEvents(2, 100))
+      .thenReturn(Arrays.copyOfRange(taskEvents, 2, 4));
+    when(mockJob.getMapAttemptCompletionEvents(0, 100)).thenReturn(mapEvents);
+    when(mockJob.getMapAttemptCompletionEvents(0, 2)).thenReturn(mapEvents);
+    when(mockJob.getMapAttemptCompletionEvents(2, 100)).thenReturn(empty);
+
+    AppContext appCtx = mock(AppContext.class);
+    when(appCtx.getJob(any(JobId.class))).thenReturn(mockJob);
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+    final TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptListenerImpl listener =
+        new TaskAttemptListenerImpl(appCtx, secret) {
+      @Override
+      protected void registerHeartbeatHandler(Configuration conf) {
+        taskHeartbeatHandler = hbHandler;
+      }
+    };
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+
+    JobID jid = new JobID("12345", 1);
+    TaskAttemptID tid = new TaskAttemptID("12345", 1, TaskType.REDUCE, 1, 0);
+    MapTaskCompletionEventsUpdate update =
+        listener.getMapCompletionEvents(jid, 0, 100, tid);
+    assertEquals(2, update.events.length);
+    update = listener.getMapCompletionEvents(jid, 0, 2, tid);
+    assertEquals(2, update.events.length);
+    update = listener.getMapCompletionEvents(jid, 2, 100, tid);
+    assertEquals(0, update.events.length);
+  }
+
+  private static TaskAttemptCompletionEvent createTce(int eventId,
+      boolean isMap, TaskAttemptCompletionEventStatus status) {
+    JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
+    TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
+        isMap ? org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP
+            : org.apache.hadoop.mapreduce.v2.api.records.TaskType.REDUCE);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
+    RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+    TaskAttemptCompletionEvent tce = recordFactory
+        .newRecordInstance(TaskAttemptCompletionEvent.class);
+    tce.setEventId(eventId);
+    tce.setAttemptId(attemptId);
+    tce.setStatus(status);
+    return tce;
+  }
+
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sat Oct 20 00:33:57 2012
@@ -50,8 +50,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -60,6 +62,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -240,6 +243,24 @@ public class MRApp extends MRAppMaster {
     return job;
   }
 
+  public void waitForInternalState(TaskAttemptImpl attempt,
+      TaskAttemptStateInternal finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    TaskAttemptStateInternal iState = attempt.getInternalState();
+    while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt Internal State is : " + iState
+          + " Waiting for Internal state : " + finalState + "   progress : "
+          + report.getProgress());
+      Thread.sleep(500);
+      report = attempt.getReport();
+      iState = attempt.getInternalState();
+    }
+    System.out.println("TaskAttempt Internal State is : " + iState);
+    Assert.assertEquals("TaskAttempt Internal state is not correct (timedout)",
+        finalState, iState);
+  }
+
   public void waitForState(TaskAttempt attempt, 
       TaskAttemptState finalState) throws Exception {
     int timeoutSecs = 0;
@@ -501,18 +522,18 @@ public class MRApp extends MRAppMaster {
     //override the init transition
     private final TestInitTransition initTransition = new TestInitTransition(
         maps, reduces);
-    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
-        = stateMachineFactory.addTransition(JobState.NEW,
-            EnumSet.of(JobState.INITED, JobState.FAILED),
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
             JobEventType.JOB_INIT,
             // This is abusive.
             initTransition);
 
-    private final StateMachine<JobState, JobEventType, JobEvent>
+    private final StateMachine<JobStateInternal, JobEventType, JobEvent>
         localStateMachine;
 
     @Override
-    protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
       return localStateMachine;
     }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Sat Oct 20 00:33:57 2012
@@ -556,6 +556,12 @@ public class MockJobs extends MockApps {
       }
 
       @Override
+      public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+          int startIndex, int maxEvents) {
+        return null;
+      }
+
+      @Override
       public Map<TaskId, Task> getTasks(TaskType taskType) {
         throw new UnsupportedOperationException("Not supported yet.");
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java Sat Oct 20 00:33:57 2012
@@ -36,8 +36,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
@@ -190,7 +192,8 @@ public class TestFail {
     Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
         .size());
     TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+    app.waitForInternalState((TaskAttemptImpl) attempt,
+        TaskAttemptStateInternal.ASSIGNED);
     app.getDispatcher().getEventHandler().handle(
         new TaskAttemptEvent(attempt.getID(),
             TaskAttemptEventType.TA_CONTAINER_COMPLETED));

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Sat Oct 20 00:33:57 2012
@@ -21,8 +21,6 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.Arrays;
 import java.util.Iterator;
 
-import junit.framework.Assert;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -40,6 +38,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Assert;
 import org.junit.Test;
 
 public class TestFetchFailure {
@@ -144,6 +143,15 @@ public class TestFetchFailure {
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
     Assert.assertEquals("Event status not correct for reduce attempt1",
         TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+    TaskAttemptCompletionEvent mapEvents[] =
+        job.getMapAttemptCompletionEvents(0, 2);
+    Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+    Assert.assertArrayEquals("Unexpected map events",
+        Arrays.copyOfRange(events, 0, 2), mapEvents);
+    mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+    Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+    Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Sat Oct 20 00:33:57 2012
@@ -48,7 +48,6 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
-import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
@@ -56,11 +55,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
 import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
@@ -411,8 +412,8 @@ public class TestRMContainerAllocator {
     // Wait till all map-attempts request for containers
     for (Task t : job.getTasks().values()) {
       if (t.getType() == TaskType.MAP) {
-        mrApp.waitForState(t.getAttempts().values().iterator().next(),
-          TaskAttemptState.UNASSIGNED);
+        mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+            .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
       }
     }
     amDispatcher.await();
@@ -562,8 +563,8 @@ public class TestRMContainerAllocator {
     amDispatcher.await();
     // Wait till all map-attempts request for containers
     for (Task t : job.getTasks().values()) {
-      mrApp.waitForState(t.getAttempts().values().iterator().next(),
-        TaskAttemptState.UNASSIGNED);
+      mrApp.waitForInternalState((TaskAttemptImpl) t.getAttempts().values()
+          .iterator().next(), TaskAttemptStateInternal.UNASSIGNED);
     }
     amDispatcher.await();
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Sat Oct 20 00:33:57 2012
@@ -441,6 +441,12 @@ public class TestRuntimeEstimators {
     }
 
     @Override
+    public TaskAttemptCompletionEvent[]
+            getMapAttemptCompletionEvents(int startIndex, int maxEvents) {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
     public String getName() {
       throw new UnsupportedOperationException("Not supported yet.");
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Sat Oct 20 00:33:57 2012
@@ -42,8 +42,8 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
-import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -77,11 +77,11 @@ public class TestJobImpl {
     tasks.put(mockTask.getID(), mockTask);
     mockJob.tasks = tasks;
 
-    when(mockJob.getState()).thenReturn(JobState.ERROR);
+    when(mockJob.getInternalState()).thenReturn(JobStateInternal.ERROR);
     JobEvent mockJobEvent = mock(JobEvent.class);
-    JobState state = trans.transition(mockJob, mockJobEvent);
+    JobStateInternal state = trans.transition(mockJob, mockJobEvent);
     Assert.assertEquals("Incorrect state returned from JobNoTasksCompletedTransition",
-        JobState.ERROR, state);
+        JobStateInternal.ERROR, state);
   }
 
   @Test
@@ -96,9 +96,12 @@ public class TestJobImpl {
     when(mockJob.getCommitter()).thenReturn(mockCommitter);
     when(mockJob.getEventHandler()).thenReturn(mockEventHandler);
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
-    when(mockJob.finished(JobState.KILLED)).thenReturn(JobState.KILLED);
-    when(mockJob.finished(JobState.FAILED)).thenReturn(JobState.FAILED);
-    when(mockJob.finished(JobState.SUCCEEDED)).thenReturn(JobState.SUCCEEDED);
+    when(mockJob.finished(JobStateInternal.KILLED)).thenReturn(
+        JobStateInternal.KILLED);
+    when(mockJob.finished(JobStateInternal.FAILED)).thenReturn(
+        JobStateInternal.FAILED);
+    when(mockJob.finished(JobStateInternal.SUCCEEDED)).thenReturn(
+        JobStateInternal.SUCCEEDED);
 
     try {
       doThrow(new IOException()).when(mockCommitter).commitJob(any(JobContext.class));
@@ -106,11 +109,11 @@ public class TestJobImpl {
       // commitJob stubbed out, so this can't happen
     }
     doNothing().when(mockEventHandler).handle(any(JobHistoryEvent.class));
-    JobState jobState = JobImpl.checkJobCompleteSuccess(mockJob);
+    JobStateInternal jobState = JobImpl.checkJobCompleteSuccess(mockJob);
     Assert.assertNotNull("checkJobCompleteSuccess incorrectly returns null " +
       "for successful job", jobState);
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.FAILED, jobState);
+        JobStateInternal.FAILED, jobState);
     verify(mockJob).abortJob(
         eq(org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
   }
@@ -129,7 +132,8 @@ public class TestJobImpl {
     when(mockJob.getJobContext()).thenReturn(mockJobContext);
     doNothing().when(mockJob).setFinishTime();
     doNothing().when(mockJob).logJobHistoryFinishedEvent();
-    when(mockJob.finished(any(JobState.class))).thenReturn(JobState.SUCCEEDED);
+    when(mockJob.finished(any(JobStateInternal.class))).thenReturn(
+        JobStateInternal.SUCCEEDED);
 
     try {
       doNothing().when(mockCommitter).commitJob(any(JobContext.class));
@@ -141,7 +145,7 @@ public class TestJobImpl {
       "for successful job",
       JobImpl.checkJobCompleteSuccess(mockJob));
     Assert.assertEquals("checkJobCompleteSuccess returns incorrect state",
-        JobState.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
+        JobStateInternal.SUCCEEDED, JobImpl.checkJobCompleteSuccess(mockJob));
   }
 
   @Test

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Sat Oct 20 00:33:57 2012
@@ -26,7 +26,6 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -48,13 +47,13 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -338,7 +337,7 @@ public class TestTaskImpl {
    * {@link TaskState#KILL_WAIT}
    */
   private void assertTaskKillWaitState() {
-    assertEquals(TaskState.KILL_WAIT, mockTask.getState());
+    assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java Sat Oct 20 00:33:57 2012
@@ -46,6 +46,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
@@ -260,7 +262,8 @@ public class TestContainerLauncher {
           attempts.size());
 
     TaskAttempt attempt = attempts.values().iterator().next();
-    app.waitForState(attempt, TaskAttemptState.ASSIGNED);
+      app.waitForInternalState((TaskAttemptImpl) attempt,
+          TaskAttemptStateInternal.ASSIGNED);
 
     app.waitForState(job, JobState.FAILED);
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java Sat Oct 20 00:33:57 2012
@@ -128,14 +128,26 @@ public class TypeConverter {
     return taskId;
   }
 
-  public static TaskAttemptState toYarn(org.apache.hadoop.mapred.TaskStatus.State state) {
-    if (state == org.apache.hadoop.mapred.TaskStatus.State.KILLED_UNCLEAN) {
-      return TaskAttemptState.KILLED;
-    }
-    if (state == org.apache.hadoop.mapred.TaskStatus.State.FAILED_UNCLEAN) {
+  public static TaskAttemptState toYarn(
+      org.apache.hadoop.mapred.TaskStatus.State state) {
+    switch (state) {
+    case COMMIT_PENDING:
+      return TaskAttemptState.COMMIT_PENDING;
+    case FAILED:
+    case FAILED_UNCLEAN:
       return TaskAttemptState.FAILED;
+    case KILLED:
+    case KILLED_UNCLEAN:
+      return TaskAttemptState.KILLED;
+    case RUNNING:
+      return TaskAttemptState.RUNNING;
+    case SUCCEEDED:
+      return TaskAttemptState.SUCCEEDED;
+    case UNASSIGNED:
+      return TaskAttemptState.STARTING;
+    default:
+      throw new YarnException("Unrecognized State: " + state);
     }
-    return TaskAttemptState.valueOf(state.toString());
   }
 
   public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
@@ -309,7 +321,6 @@ public class TypeConverter {
       return org.apache.hadoop.mapred.JobStatus.PREP;
     case RUNNING:
       return org.apache.hadoop.mapred.JobStatus.RUNNING;
-    case KILL_WAIT:
     case KILLED:
       return org.apache.hadoop.mapred.JobStatus.KILLED;
     case SUCCEEDED:
@@ -329,7 +340,6 @@ public class TypeConverter {
       return org.apache.hadoop.mapred.TIPStatus.PENDING;
     case RUNNING:
       return org.apache.hadoop.mapred.TIPStatus.RUNNING;
-    case KILL_WAIT:
     case KILLED:
       return org.apache.hadoop.mapred.TIPStatus.KILLED;
     case SUCCEEDED:

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/JobState.java Sat Oct 20 00:33:57 2012
@@ -24,7 +24,6 @@ public enum JobState {
   RUNNING,
   SUCCEEDED,
   FAILED,
-  KILL_WAIT,
   KILLED,
   ERROR
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskAttemptState.java Sat Oct 20 00:33:57 2012
@@ -20,16 +20,10 @@ package org.apache.hadoop.mapreduce.v2.a
 
 public enum TaskAttemptState {
   NEW, 
-  UNASSIGNED, 
-  ASSIGNED, 
+  STARTING, 
   RUNNING, 
-  COMMIT_PENDING, 
-  SUCCESS_CONTAINER_CLEANUP, 
-  SUCCEEDED, 
-  FAIL_CONTAINER_CLEANUP, 
-  FAIL_TASK_CLEANUP, 
-  FAILED, 
-  KILL_CONTAINER_CLEANUP, 
-  KILL_TASK_CLEANUP, 
+  COMMIT_PENDING,  
+  SUCCEEDED,
+  FAILED,
   KILLED
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/api/records/TaskState.java Sat Oct 20 00:33:57 2012
@@ -19,5 +19,5 @@
 package org.apache.hadoop.mapreduce.v2.api.records;
 
 public enum TaskState {
-  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILL_WAIT, KILLED
+  NEW, SCHEDULED, RUNNING, SUCCEEDED, FAILED, KILLED
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Sat Oct 20 00:33:57 2012
@@ -49,8 +49,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.yarn.ContainerLogAppender;
 import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -100,15 +100,10 @@ public class MRApps extends Apps {
   public static enum TaskAttemptStateUI {
     NEW(
         new TaskAttemptState[] { TaskAttemptState.NEW,
-        TaskAttemptState.UNASSIGNED, TaskAttemptState.ASSIGNED }),
+            TaskAttemptState.STARTING }),
     RUNNING(
         new TaskAttemptState[] { TaskAttemptState.RUNNING,
-            TaskAttemptState.COMMIT_PENDING,
-            TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
-            TaskAttemptState.FAIL_CONTAINER_CLEANUP,
-            TaskAttemptState.FAIL_TASK_CLEANUP,
-            TaskAttemptState.KILL_CONTAINER_CLEANUP,
-            TaskAttemptState.KILL_TASK_CLEANUP }),
+            TaskAttemptState.COMMIT_PENDING }),
     SUCCESSFUL(new TaskAttemptState[] { TaskAttemptState.SUCCEEDED}),
     FAILED(new TaskAttemptState[] { TaskAttemptState.FAILED}),
     KILLED(new TaskAttemptState[] { TaskAttemptState.KILLED});

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/proto/mr_protos.proto Sat Oct 20 00:33:57 2012
@@ -50,8 +50,7 @@ enum TaskStateProto {
   TS_RUNNING = 3;
   TS_SUCCEEDED = 4;
   TS_FAILED = 5;
-  TS_KILL_WAIT = 6;
-  TS_KILLED = 7;
+  TS_KILLED = 6;
 }
 
 enum PhaseProto {
@@ -93,18 +92,12 @@ message TaskReportProto {
 
 enum TaskAttemptStateProto {
   TA_NEW = 1;
-  TA_UNASSIGNED = 2;
-  TA_ASSIGNED = 3;
-  TA_RUNNING = 4;
-  TA_COMMIT_PENDING = 5;
-  TA_SUCCESS_CONTAINER_CLEANUP = 6;
-  TA_SUCCEEDED = 7;
-  TA_FAIL_CONTAINER_CLEANUP = 8;
-  TA_FAIL_TASK_CLEANUP = 9;
-  TA_FAILED = 10;
-  TA_KILL_CONTAINER_CLEANUP = 11;
-  TA_KILL_TASK_CLEANUP = 12;
-  TA_KILLED = 13;
+  TA_STARTING = 2;
+  TA_RUNNING = 3;
+  TA_COMMIT_PENDING = 4;
+  TA_SUCCEEDED = 5;
+  TA_FAILED = 6;
+  TA_KILLED = 7;
 }
 
 message TaskAttemptReportProto {
@@ -131,9 +124,8 @@ enum JobStateProto {
   J_RUNNING = 3;
   J_SUCCEEDED = 4;
   J_FAILED = 5;
-  J_KILL_WAIT = 6;
-  J_KILLED = 7;
-  J_ERROR = 8;
+  J_KILLED = 6;
+  J_ERROR = 7;
 }
 
 message JobReportProto {

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1399946-1400349

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java Sat Oct 20 00:33:57 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.h
 
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -81,6 +82,7 @@ public class CompletedJob implements org
   private Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
   private Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
   private List<TaskAttemptCompletionEvent> completionEvents = null;
+  private List<TaskAttemptCompletionEvent> mapCompletionEvents = null;
   private JobACLsManager aclsMgr;
   
   
@@ -176,11 +178,28 @@ public class CompletedJob implements org
     if (completionEvents == null) {
       constructTaskAttemptCompletionEvents();
     }
+    return getAttemptCompletionEvents(completionEvents,
+        fromEventId, maxEvents);
+  }
+
+  @Override
+  public synchronized TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    if (mapCompletionEvents == null) {
+      constructTaskAttemptCompletionEvents();
+    }
+    return getAttemptCompletionEvents(mapCompletionEvents,
+        startIndex, maxEvents);
+  }
+
+  private static TaskAttemptCompletionEvent[] getAttemptCompletionEvents(
+      List<TaskAttemptCompletionEvent> eventList,
+      int startIndex, int maxEvents) {
     TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
-    if (completionEvents.size() > fromEventId) {
+    if (eventList.size() > startIndex) {
       int actualMax = Math.min(maxEvents,
-          (completionEvents.size() - fromEventId));
-      events = completionEvents.subList(fromEventId, actualMax + fromEventId)
+          (eventList.size() - startIndex));
+      events = eventList.subList(startIndex, actualMax + startIndex)
           .toArray(events);
     }
     return events;
@@ -190,11 +209,15 @@ public class CompletedJob implements org
     loadAllTasks();
     completionEvents = new LinkedList<TaskAttemptCompletionEvent>();
     List<TaskAttempt> allTaskAttempts = new LinkedList<TaskAttempt>();
+    int numMapAttempts = 0;
     for (TaskId taskId : tasks.keySet()) {
       Task task = tasks.get(taskId);
       for (TaskAttemptId taskAttemptId : task.getAttempts().keySet()) {
         TaskAttempt taskAttempt = task.getAttempts().get(taskAttemptId);
         allTaskAttempts.add(taskAttempt);
+        if (task.getType() == TaskType.MAP) {
+          ++numMapAttempts;
+        }
       }
     }
     Collections.sort(allTaskAttempts, new Comparator<TaskAttempt>() {
@@ -223,6 +246,8 @@ public class CompletedJob implements org
       }
     });
 
+    mapCompletionEvents =
+        new ArrayList<TaskAttemptCompletionEvent>(numMapAttempts);
     int eventId = 0;
     for (TaskAttempt taskAttempt : allTaskAttempts) {
 
@@ -253,6 +278,9 @@ public class CompletedJob implements org
           .getAssignedContainerMgrAddress());
       tace.setStatus(taceStatus);
       completionEvents.add(tace);
+      if (taskAttempt.getID().getTaskId().getTaskType() == TaskType.MAP) {
+        mapCompletionEvents.add(tace);
+      }
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/PartialJob.java Sat Oct 20 00:33:57 2012
@@ -154,6 +154,12 @@ public class PartialJob implements org.a
   }
 
   @Override
+  public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+      int startIndex, int maxEvents) {
+    return null;
+  }
+
+  @Override
   public boolean checkAccess(UserGroupInformation callerUGI, JobACL jobOperation) {
     return true;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/MockHistoryJobs.java Sat Oct 20 00:33:57 2012
@@ -126,6 +126,12 @@ public class MockHistoryJobs extends Moc
     }
 
     @Override
+    public TaskAttemptCompletionEvent[] getMapAttemptCompletionEvents(
+        int startIndex, int maxEvents) {
+      return job.getMapAttemptCompletionEvents(startIndex, maxEvents);
+    }
+
+    @Override
     public Map<TaskId, Task> getTasks() {
       return job.getTasks();
     }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java?rev=1400351&r1=1400350&r2=1400351&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/fs/TestDFSIO.java Sat Oct 20 00:33:57 2012
@@ -101,8 +101,7 @@ public class TestDFSIO implements Tool {
                     " -write | -append | -clean" +
                     " [-nrFiles N]" +
                     " [-size Size[B|KB|MB|GB|TB]]" +
-                    " [-resFile resultFileName] [-bufferSize Bytes]" +
-                    " [-rootDir]";
+                    " [-resFile resultFileName] [-bufferSize Bytes]";
 
   private Configuration config;
 
@@ -672,37 +671,38 @@ public class TestDFSIO implements Tool {
       return -1;
     }
 
-    for (int i = 0; i < args.length; i++) {       // parse command line
-      if (args[i].startsWith("-read")) {
+    for (int i = 0; i < args.length; i++) { // parse command line
+      if (args[i].toLowerCase().startsWith("-read")) {
         testType = TestType.TEST_TYPE_READ;
-      } else if (args[i].equals("-write")) {
+      } else if (args[i].equalsIgnoreCase("-write")) {
         testType = TestType.TEST_TYPE_WRITE;
-      } else if (args[i].equals("-append")) {
+      } else if (args[i].equalsIgnoreCase("-append")) {
         testType = TestType.TEST_TYPE_APPEND;
-      } else if (args[i].equals("-random")) {
-        if(testType != TestType.TEST_TYPE_READ) return -1;
+      } else if (args[i].equalsIgnoreCase("-random")) {
+        if (testType != TestType.TEST_TYPE_READ) return -1;
         testType = TestType.TEST_TYPE_READ_RANDOM;
-      } else if (args[i].equals("-backward")) {
-        if(testType != TestType.TEST_TYPE_READ) return -1;
+      } else if (args[i].equalsIgnoreCase("-backward")) {
+        if (testType != TestType.TEST_TYPE_READ) return -1;
         testType = TestType.TEST_TYPE_READ_BACKWARD;
-      } else if (args[i].equals("-skip")) {
-        if(testType != TestType.TEST_TYPE_READ) return -1;
+      } else if (args[i].equalsIgnoreCase("-skip")) {
+        if (testType != TestType.TEST_TYPE_READ) return -1;
         testType = TestType.TEST_TYPE_READ_SKIP;
-      } else if (args[i].equals("-clean")) {
+      } else if (args[i].equalsIgnoreCase("-clean")) {
         testType = TestType.TEST_TYPE_CLEANUP;
-      } else if (args[i].startsWith("-seq")) {
+      } else if (args[i].toLowerCase().startsWith("-seq")) {
         isSequential = true;
-      } else if (args[i].startsWith("-compression")) {
+      } else if (args[i].toLowerCase().startsWith("-compression")) {
         compressionClass = args[++i];
-      } else if (args[i].equals("-nrFiles")) {
+      } else if (args[i].equalsIgnoreCase("-nrfiles")) {
         nrFiles = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-fileSize") || args[i].equals("-size")) {
+      } else if (args[i].equalsIgnoreCase("-filesize")
+          || args[i].equalsIgnoreCase("-size")) {
         nrBytes = parseSize(args[++i]);
-      } else if (args[i].equals("-skipSize")) {
+      } else if (args[i].equalsIgnoreCase("-skipsize")) {
         skipSize = parseSize(args[++i]);
-      } else if (args[i].equals("-bufferSize")) {
+      } else if (args[i].equalsIgnoreCase("-buffersize")) {
         bufferSize = Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-resFile")) {
+      } else if (args[i].equalsIgnoreCase("-resfile")) {
         resFileName = args[++i];
       } else {
         System.err.println("Illegal argument: " + args[i]);



Mime
View raw message