tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject git commit: TEZ-110. Port TaskAttempt and Task unit tests from trunk.
Date Thu, 09 May 2013 21:12:38 GMT
Updated Branches:
  refs/heads/TEZ-1 39d725c27 -> c86c279d2


TEZ-110. Port TaskAttempt and Task unit tests from trunk.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/c86c279d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/c86c279d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/c86c279d

Branch: refs/heads/TEZ-1
Commit: c86c279d23ca12a5e4d70dcb5ba01faa9ac6b6e8
Parents: 39d725c
Author: Siddharth Seth <sseth@apache.org>
Authored: Thu May 9 14:11:15 2013 -0700
Committer: Siddharth Seth <sseth@apache.org>
Committed: Thu May 9 14:11:15 2013 -0700

----------------------------------------------------------------------
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java      |   21 +-
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  |   25 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |    1 -
 .../tez/dag/app/dag/impl/TestTaskAttempt.java      |  581 +++++++++++++++
 .../apache/tez/dag/app/dag/impl/TestTaskImpl.java  |  461 ++++++++++++
 5 files changed, 1062 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6922b44..c907e48 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -31,7 +31,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -112,7 +111,6 @@ public class TaskAttemptImpl implements TaskAttempt,
   private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
 
   protected final TezConfiguration conf;
-  protected final Path jobFile;
   protected final int partition;
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
@@ -141,11 +139,11 @@ public class TaskAttemptImpl implements TaskAttempt,
   private TaskAttemptStatus reportedStatus;
 
   protected final TaskLocationHint locationHint;
-  private final Resource taskResource;
-  private final Map<String, LocalResource> localResources;
-  private final Map<String, String> environment;
-  private final String javaOpts;
-  private final boolean isRescheduled;
+  protected final Resource taskResource;
+  protected final Map<String, LocalResource> localResources;
+  protected final Map<String, String> environment;
+  protected final String javaOpts;
+  protected final boolean isRescheduled;
 
   private boolean speculatorContainerRequestSent = false;
   protected String processorName;
@@ -255,7 +253,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   // TODO Remove TaskAttemptListener from the constructor.
   @SuppressWarnings("rawtypes")
   public TaskAttemptImpl(TezTaskID taskId, int attemptNumber, EventHandler eventHandler,
-      TaskAttemptListener tal, Path jobFile, int partition, 
+      TaskAttemptListener tal, int partition, 
       TezConfiguration conf,
       Token<JobTokenIdentifier> jobToken, Credentials credentials, Clock clock,
       TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
@@ -269,7 +267,6 @@ public class TaskAttemptImpl implements TaskAttempt,
     this.attemptId = TezBuilderUtils.newTaskAttemptId(taskId, attemptNumber);
     this.eventHandler = eventHandler;
     //Reported status
-    this.jobFile = jobFile;
     this.partition = partition;
     this.conf = conf;
     this.jobToken = jobToken;
@@ -787,7 +784,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @SuppressWarnings("unchecked")
-  private void logJobHistoryAttemptStarted() {
+  protected void logJobHistoryAttemptStarted() {
     TaskAttemptStartedEvent startEvt = new TaskAttemptStartedEvent(
         attemptId, getTask().getVertex().getName(),
         launchTime, containerId, containerNodeId);
@@ -797,7 +794,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @SuppressWarnings("unchecked")
-  private void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) {
+  protected void logJobHistoryAttemptFinishedEvent(TaskAttemptStateInternal state) {
     //Log finished events only if an attempt started.
     if (getLaunchTime() == 0) return;
     
@@ -811,7 +808,7 @@ public class TaskAttemptImpl implements TaskAttempt,
   }
 
   @SuppressWarnings("unchecked")
-  private void logJobHistoryAttemptUnsuccesfulCompletion(
+  protected void logJobHistoryAttemptUnsuccesfulCompletion(
       TaskAttemptState state) {
     TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
         attemptId, getTask().getVertex().getName(),

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index e4538aa..cf49dbd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -28,12 +28,11 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -56,8 +55,8 @@ import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventDiagnosticsUpdate;
-import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -85,7 +84,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private static final Log LOG = LogFactory.getLog(TaskImpl.class);
 
   protected final TezConfiguration conf;
-  protected final Path jobFile;
   protected final int partition;
   protected final TaskAttemptListener taskAttemptListener;
   protected final TaskHeartbeatHandler taskHeartbeatHandler;
@@ -106,9 +104,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   protected Token<JobTokenIdentifier> jobToken;
   protected String processorName;
   protected TaskLocationHint locationHint;
-  private Resource taskResource;
-  private Map<String, LocalResource> localResources;
-  private Map<String, String> environment;
+  protected Resource taskResource;
+  protected Map<String, LocalResource> localResources;
+  protected Map<String, String> environment;
   
   // counts the number of attempts that are either running or in a state where
   //  they will come to be running when they get a Container
@@ -256,7 +254,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   private final boolean leafVertex;
 
-  private String javaOpts;
+  protected String javaOpts;
 
   @Override
   public TaskState getState() {
@@ -269,7 +267,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   public TaskImpl(TezVertexID vertexId, int partition,
-      EventHandler eventHandler, Path remoteJobConfFile, TezConfiguration conf,
+      EventHandler eventHandler, TezConfiguration conf,
       TaskAttemptListener taskAttemptListener,
       Token<JobTokenIdentifier> jobToken,
       Credentials credentials, Clock clock,
@@ -286,7 +284,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       String javaOpts) {
     this.conf = conf;
     this.clock = clock;
-    this.jobFile = remoteJobConfFile;
     ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
     readLock = readWriteLock.readLock();
     writeLock = readWriteLock.writeLock();
@@ -605,7 +602,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
   TaskAttemptImpl createAttempt(int attemptNumber) {
     return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
-        taskAttemptListener, null, 0, conf,
+        taskAttemptListener, 0, conf,
         jobToken, credentials, clock, taskHeartbeatHandler,
         appContext, processorName, locationHint, taskResource,
         localResources, environment, javaOpts, (failedAttempts>0));
@@ -797,14 +794,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 	  return "";
   }
 
-  private void logJobHistoryTaskStartedEvent() {
+  protected void logJobHistoryTaskStartedEvent() {
     TaskStartedEvent startEvt = new TaskStartedEvent(taskId,
         getVertex().getName(), scheduledTime, getLaunchTime());
     this.eventHandler.handle(new DAGHistoryEvent(
         taskId.getVertexID().getDAGId(), startEvt));
   }
   
-  private void logJobHistoryTaskFinishedEvent() {
+  protected void logJobHistoryTaskFinishedEvent() {
     // FIXME need to handle getting finish time as this function
     // is called from within a transition
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
@@ -813,7 +810,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         taskId.getVertexID().getDAGId(), finishEvt));
   }
   
-  private void logJobHistoryTaskFailedEvent(TaskState finalState) {
+  protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
     TaskFinishedEvent finishEvt = new TaskFinishedEvent(taskId,
         getVertex().getName(), clock.getTime(), finalState);
     this.eventHandler.handle(new DAGHistoryEvent(

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index f74b172..80de958 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -822,7 +822,6 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
         TaskImpl task =
             new TaskImpl(vertex.getVertexId(), i,
                 vertex.eventHandler,
-                null,
                 conf,
                 vertex.taskAttemptListener,
                 vertex.jobToken,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
new file mode 100644
index 0000000..12ddcf0
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.common.TezTaskContext;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventType;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.junit.Test;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestTaskAttempt {
+
+  private static final String MAP_PROCESSOR_NAME =
+      "org.apache.tez.mapreduce.processor.map.MapProcessor";
+
+  static public class StubbedFS extends RawLocalFileSystem {
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+      return new FileStatus(1, false, 1, 1, 1, f);
+    }
+  }
+
+  // @Test
+  // // Verifies # tasks, attempts and diagnostics for a failing job.
+  // // TODO Move to TestTask - to verify # retries
+  // public void testMRAppHistoryForMap() throws Exception {
+  // MRApp app = new FailingAttemptsMRApp(1, 0);
+  // testMRAppHistory(app);
+  // }
+  //
+  // @Test
+  // // Verifies # tasks, attempts and diagnostics for a failing job.
+  // // Move to TestTask - to verify # retries
+  // public void testMRAppHistoryForReduce() throws Exception {
+  // MRApp app = new FailingAttemptsMRApp(0, 1);
+  // testMRAppHistory(app);
+  // }
+
+  // @Test
+  // // Verifies that the launch request is based on the hosts.
+  // // TODO Move to the client.
+  // // TODO Add a test that verifies that the LocationHint is used as it should
+  // be.
+  // public void testSingleRackRequest() throws Exception {
+  // TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+  // new TaskAttemptImpl.ScheduleTaskattemptTransition();
+  //
+  // EventHandler eventHandler = mock(EventHandler.class);
+  // String[] hosts = new String[3];
+  // hosts[0] = "host1";
+  // hosts[1] = "host2";
+  // hosts[2] = "host3";
+  // TaskSplitMetaInfo splitInfo = new TaskSplitMetaInfo(hosts, 0,
+  // 128 * 1024 * 1024l);
+  //
+  // TaskAttemptImpl mockTaskAttempt = createMapTaskAttemptImpl2ForTest(
+  // eventHandler, splitInfo);
+  // TaskAttemptEventSchedule mockTAEvent =
+  // mock(TaskAttemptEventSchedule.class);
+  // doReturn(false).when(mockTAEvent).isRescheduled();
+  //
+  // sta.transition(mockTaskAttempt, mockTAEvent);
+  //
+  // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+  // verify(eventHandler, times(2)).handle(arg.capture());
+  // if (!(arg.getAllValues().get(1) instanceof
+  // AMSchedulerTALaunchRequestEvent)) {
+  // Assert.fail("Second Event not of type ContainerRequestEvent");
+  // }
+  // AMSchedulerTALaunchRequestEvent tlrE = (AMSchedulerTALaunchRequestEvent)
+  // arg
+  // .getAllValues().get(1);
+  // String[] requestedRacks = tlrE.getRacks();
+  // // Only a single occurrence of /DefaultRack
+  // assertEquals(1, requestedRacks.length);
+  // }
+
+  // @Test
+  // // Tests that an attempt is made to resolve the localized hosts to racks.
+  // // TODO Move to the client.
+  // public void testHostResolveAttempt() throws Exception {
+  // TaskAttemptImpl.ScheduleTaskattemptTransition sta =
+  // new TaskAttemptImpl.ScheduleTaskattemptTransition();
+  //
+  // EventHandler eventHandler = mock(EventHandler.class);
+  // String hosts[] = new String[] {"192.168.1.1", "host2", "host3"};
+  // String resolved[] = new String[] {"host1", "host2", "host3"};
+  // TaskSplitMetaInfo splitInfo =
+  // new TaskSplitMetaInfo(hosts, 0, 128 * 1024 * 1024l);
+  //
+  // TaskAttemptImpl mockTaskAttempt =
+  // createMapTaskAttemptImpl2ForTest(eventHandler, splitInfo);
+  // TaskAttemptImpl spyTa = spy(mockTaskAttempt);
+  // when(spyTa.resolveHosts(hosts)).thenReturn(resolved);
+  //
+  // TaskAttemptEventSchedule mockTAEvent =
+  // mock(TaskAttemptEventSchedule.class);
+  // doReturn(false).when(mockTAEvent).isRescheduled();
+  //
+  // sta.transition(spyTa, mockTAEvent);
+  // verify(spyTa).resolveHosts(hosts);
+  // ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+  // verify(eventHandler, times(2)).handle(arg.capture());
+  // if (!(arg.getAllValues().get(1) instanceof
+  // AMSchedulerTALaunchRequestEvent)) {
+  // Assert.fail("Second Event not of type ContainerRequestEvent");
+  // }
+  // Map<String, Boolean> expected = new HashMap<String, Boolean>();
+  // expected.put("host1", true);
+  // expected.put("host2", true);
+  // expected.put("host3", true);
+  // AMSchedulerTALaunchRequestEvent cre =
+  // (AMSchedulerTALaunchRequestEvent) arg.getAllValues().get(1);
+  // String[] requestedHosts = cre.getHosts();
+  // for (String h : requestedHosts) {
+  // expected.remove(h);
+  // }
+  // assertEquals(0, expected.size());
+  // }
+  //
+
+  // @Test
+  // // Verifies accounting of slot_milli counters. Time spent in running tasks.
+  // // TODO Fix this test to work without MRApp.
+  // public void testSlotMillisCounterUpdate() throws Exception {
+  // verifySlotMillis(2048, 2048, 1024);
+  // verifySlotMillis(2048, 1024, 1024);
+  // verifySlotMillis(10240, 1024, 2048);
+  // }
+
+  // public void verifySlotMillis(int mapMemMb, int reduceMemMb,
+  // int minContainerSize) throws Exception {
+  // Clock actualClock = new SystemClock();
+  // ControlledClock clock = new ControlledClock(actualClock);
+  // clock.setTime(10);
+  // MRApp app =
+  // new MRApp(1, 1, false, "testSlotMillisCounterUpdate", true, clock);
+  // Configuration conf = new Configuration();
+  // conf.setInt(MRJobConfig.MAP_MEMORY_MB, mapMemMb);
+  // conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, reduceMemMb);
+  // app.setClusterInfo(new ClusterInfo(BuilderUtils
+  // .newResource(minContainerSize, 1), BuilderUtils.newResource(10240,1)));
+  //
+  // Job job = app.submit(conf);
+  // app.waitForState(job, JobState.RUNNING);
+  // Map<TaskId, Task> tasks = job.getTasks();
+  // Assert.assertEquals("Num tasks is not correct", 2, tasks.size());
+  // Iterator<Task> taskIter = tasks.values().iterator();
+  // Task mTask = taskIter.next();
+  // app.waitForState(mTask, TaskState.RUNNING);
+  // Task rTask = taskIter.next();
+  // app.waitForState(rTask, TaskState.RUNNING);
+  // Map<TaskAttemptId, TaskAttempt> mAttempts = mTask.getAttempts();
+  // Assert.assertEquals("Num attempts is not correct", 1, mAttempts.size());
+  // Map<TaskAttemptId, TaskAttempt> rAttempts = rTask.getAttempts();
+  // Assert.assertEquals("Num attempts is not correct", 1, rAttempts.size());
+  // TaskAttempt mta = mAttempts.values().iterator().next();
+  // TaskAttempt rta = rAttempts.values().iterator().next();
+  // app.waitForState(mta, TaskAttemptState.RUNNING);
+  // app.waitForState(rta, TaskAttemptState.RUNNING);
+  //
+  // clock.setTime(11);
+  // app.getContext()
+  // .getEventHandler()
+  // .handle(new TaskAttemptEvent(mta.getID(), TaskAttemptEventType.TA_DONE));
+  // app.getContext()
+  // .getEventHandler()
+  // .handle(new TaskAttemptEvent(rta.getID(), TaskAttemptEventType.TA_DONE));
+  // app.waitForState(job, JobState.SUCCEEDED);
+  // Assert.assertEquals(mta.getFinishTime(), 11);
+  // Assert.assertEquals(mta.getLaunchTime(), 10);
+  // Assert.assertEquals(rta.getFinishTime(), 11);
+  // Assert.assertEquals(rta.getLaunchTime(), 10);
+  // Assert.assertEquals((int) Math.ceil((float) mapMemMb / minContainerSize),
+  // job.getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_MAPS)
+  // .getValue());
+  // Assert.assertEquals(
+  // (int) Math.ceil((float) reduceMemMb / minContainerSize), job
+  // .getAllCounters().findCounter(JobCounter.SLOTS_MILLIS_REDUCES)
+  // .getValue());
+  // }
+  //
+
+  // private void testMRAppHistory(MRApp app) throws Exception {
+  // Configuration conf = new Configuration();
+  // Job job = app.submit(conf);
+  // app.waitForState(job, JobState.FAILED);
+  // Map<TaskId, Task> tasks = job.getTasks();
+  //
+  // Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+  // Task task = tasks.values().iterator().next();
+  // Assert.assertEquals("Task state not correct", TaskState.FAILED, task
+  // .getReport().getTaskState());
+  // Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator().next()
+  // .getAttempts();
+  // Assert.assertEquals("Num attempts is not correct", 4, attempts.size());
+  //
+  // Iterator<TaskAttempt> it = attempts.values().iterator();
+  // TaskAttemptReport report = it.next().getReport();
+  // Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+  // report.getTaskAttemptState());
+  // Assert.assertEquals("Diagnostic Information is not Correct",
+  // "Test Diagnostic Event", report.getDiagnosticInfo());
+  // report = it.next().getReport();
+  // Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+  // report.getTaskAttemptState());
+  // }
+
+  @Test
+  // Ensure the dag does not go into an error state if a attempt kill is
+  // received while STARTING
+  public void testLaunchFailedWhileKilling() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    TezDAGID dagID = new TezDAGID(appId, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new String[] { "127.0.0.1" }, null);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    Map<String, String> environment = new HashMap<String, String>();
+    String javaOpts = "";
+
+    AppContext mockAppContext = mock(AppContext.class);
+    doReturn(new ClusterInfo()).when(mockAppContext).getClusterInfo();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        new SystemClock(), mock(TaskHeartbeatHandler.class), mockAppContext,
+        MAP_PROCESSOR_NAME, locationHint, resource, localResources,
+        environment, javaOpts, false);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, BuilderUtils
+        .newPriority(3)));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+    // At some KILLING state.
+    taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+    // taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
+    // null));
+    assertFalse(eventHandler.internalError);
+  }
+
+  // TODO Add a similar test for TERMINATING.
+  // Ensure ContainerTerminated is handled correctly by the TaskAttempt
+  @Test
+  public void testContainerTerminatedWhileRunning() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    TezDAGID dagID = new TezDAGID(appId, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new String[] { "127.0.0.1" }, null);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    Map<String, String> environment = new HashMap<String, String>();
+    String javaOpts = "";
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
+        MAP_PROCESSOR_NAME, locationHint, resource, localResources,
+        environment, javaOpts, false);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null, -1));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null));
+    assertFalse(
+        "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
+        eventHandler.internalError);
+    // TODO Verify diagnostics
+  }
+
+  @Test
+  // Ensure ContainerTerminated is handled correctly by the TaskAttempt
+  public void testContainerTerminatedWhileCommitting() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    TezDAGID dagID = new TezDAGID(appId, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new String[] { "127.0.0.1" }, null);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    Map<String, String> environment = new HashMap<String, String>();
+    String javaOpts = "";
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
+        MAP_PROCESSOR_NAME, locationHint, resource, localResources,
+        environment, javaOpts, false);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null, -1));
+    assertEquals("Task attempt is not in running state", taImpl.getState(),
+        TaskAttemptState.RUNNING);
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+        TaskAttemptEventType.TA_COMMIT_PENDING));
+    assertEquals("Task attempt is not in commit pending state",
+        taImpl.getState(), TaskAttemptState.COMMIT_PENDING);
+    taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, null));
+    assertFalse(
+        "InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
+        eventHandler.internalError);
+    // TODO Verify diagnostics
+  }
+
+  @Test
+  // Verifies that multiple TooManyFetchFailures are handled correctly by the
+  // TaskAttempt.
+  public void testMultipleTooManyFetchFailures() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    TezDAGID dagID = new TezDAGID(appId, 1);
+    TezVertexID vertexID = new TezVertexID(dagID, 1);
+    TezTaskID taskID = new TezTaskID(vertexID, 1);
+    TezTaskAttemptID taskAttemptID = new TezTaskAttemptID(taskID, 0);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    TezConfiguration tezConf = new TezConfiguration();
+    tezConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    tezConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    TaskLocationHint locationHint = new TaskLocationHint(
+        new String[] { "127.0.0.1" }, null);
+    Resource resource = BuilderUtils.newResource(1024, 1);
+    Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+    Map<String, String> environment = new HashMap<String, String>();
+    String javaOpts = "";
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AppContext appCtx = mock(AppContext.class);
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+        appCtx);
+    containers.addContainerIfNew(container);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, 1, tezConf, mock(Token.class), new Credentials(),
+        new SystemClock(), mock(TaskHeartbeatHandler.class), appCtx,
+        MAP_PROCESSOR_NAME, locationHint, resource, localResources,
+        environment, javaOpts, false);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+    // At state STARTING.
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+        null, -1));
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+        TaskAttemptEventType.TA_DONE));
+    assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+    assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
+        TaskAttemptState.FAILED);
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID,
+        TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURES));
+    assertEquals("Task attempt is not in FAILED state, still",
+        taImpl.getState(), TaskAttemptState.FAILED);
+    assertFalse(
+        "InternalError occurred trying to handle TA_TOO_MANY_FETCH_FAILURES",
+        eventHandler.internalError);
+  }
+
+  public static class MockEventHandler implements EventHandler {
+    public boolean internalError;
+
+    @Override
+    public void handle(Event event) {
+      if (event instanceof DAGEvent) {
+        DAGEvent je = ((DAGEvent) event);
+        if (DAGEventType.INTERNAL_ERROR == je.getType()) {
+          internalError = true;
+        }
+      }
+    }
+  };
+
+  private class MockTaskAttemptImpl extends TaskAttemptImpl {
+
+    public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
+        EventHandler eventHandler, TaskAttemptListener tal, int partition,
+        TezConfiguration conf, Token<JobTokenIdentifier> jobToken,
+        Credentials credentials, Clock clock,
+        TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
+        String processorName, TaskLocationHint locationHint, Resource resource,
+        Map<String, LocalResource> localResources,
+        Map<String, String> environment, String javaOpts, boolean isRescheduled) {
+      super(taskId, attemptNumber, eventHandler, tal, partition, conf,
+          jobToken, credentials, clock, taskHeartbeatHandler, appContext,
+          processorName, locationHint, resource, localResources, environment,
+          javaOpts, isRescheduled);
+    }
+
+    @Override
+    protected TezTaskContext createRemoteTask() {
+      // FIXME
+      return null;
+    }
+
+    @Override
+    protected void logJobHistoryAttemptStarted() {
+    }
+
+    @Override
+    protected void logJobHistoryAttemptFinishedEvent(
+        TaskAttemptStateInternal state) {
+
+    }
+
+    @Override
+    protected void logJobHistoryAttemptUnsuccesfulCompletion(
+        TaskAttemptState state) {
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/c86c279d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
new file mode 100644
index 0000000..13a19b9
--- /dev/null
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
+import org.apache.tez.dag.api.records.TaskAttemptState;
+import org.apache.tez.dag.api.records.TaskState;
+import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.TaskAttemptListener;
+import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.TaskStateInternal;
+import org.apache.tez.dag.app.dag.event.TaskEvent;
+import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
+import org.apache.tez.dag.app.dag.event.TaskEventType;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.records.TezDAGID;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskID;
+import org.apache.tez.engine.records.TezVertexID;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestTaskImpl {
+
+  private static final Log LOG = LogFactory.getLog(TestTaskImpl.class);
+
+  private int taskCounter = 0;
+  private final int partition = 1;
+
+  private InlineDispatcher dispatcher;
+
+  private TezConfiguration conf;
+  private TaskAttemptListener taskAttemptListener;
+  private TaskHeartbeatHandler taskHeartbeatHandler;
+  private Token<JobTokenIdentifier> jobToken;
+  private Credentials credentials;
+  private Clock clock;
+  private TaskLocationHint locationHint;
+
+  private ApplicationId appId;
+  private TezDAGID dagId;
+  private TezVertexID vertexId;
+  private AppContext appContext;
+  private Resource taskResource;
+  private Map<String, LocalResource> localResources;
+  private Map<String, String> environment;
+  private String javaOpts;
+  private boolean leafVertex;
+
+  private MockTaskImpl mockTask;
+
+  @SuppressWarnings("unchecked")
+  @Before
+  public void setup() {
+    dispatcher = new InlineDispatcher();
+    conf = new TezConfiguration();
+    taskAttemptListener = mock(TaskAttemptListener.class);
+    taskHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    jobToken = (Token<JobTokenIdentifier>) mock(Token.class);
+    credentials = null;
+    clock = new SystemClock();
+    locationHint = new TaskLocationHint(new String[1], new String[1]);
+
+    appId = BuilderUtils.newApplicationId(System.currentTimeMillis(), 1);
+    dagId = new TezDAGID(appId, 1);
+    vertexId = new TezVertexID(dagId, 1);
+    appContext = mock(AppContext.class);
+    taskResource = BuilderUtils.newResource(1024, 1);
+    localResources = new HashMap<String, LocalResource>();
+    environment = new HashMap<String, String>();
+    javaOpts = "";
+    leafVertex = false;
+
+    mockTask = new MockTaskImpl(vertexId, partition,
+        dispatcher.getEventHandler(), conf, taskAttemptListener, jobToken,
+        credentials, clock, taskHeartbeatHandler, appContext,
+        "org.apache.tez.mapreduce.processor.map.MapProcessor", leafVertex,
+        locationHint, taskResource, localResources, environment, javaOpts);
+  }
+
+  private TezTaskID getNewTaskID() {
+    TezTaskID taskID = new TezTaskID(vertexId, ++taskCounter);
+    return taskID;
+  }
+
+  private void scheduleTaskAttempt(TezTaskID taskId) {
+    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_SCHEDULE));
+    assertTaskScheduledState();
+  }
+
+  private void killTask(TezTaskID taskId) {
+    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+    assertTaskKillWaitState();
+  }
+
+  private void killScheduledTaskAttempt(TezTaskAttemptID attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId,
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskScheduledState();
+  }
+
+  private void launchTaskAttempt(TezTaskAttemptID attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId,
+        TaskEventType.T_ATTEMPT_LAUNCHED));
+    assertTaskRunningState();
+  }
+
+  private void commitTaskAttempt(TezTaskAttemptID attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId,
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    assertTaskRunningState();
+  }
+
+  private void updateAttemptProgress(MockTaskAttemptImpl attempt, float p) {
+    attempt.setProgress(p);
+  }
+
+  private void updateAttemptState(MockTaskAttemptImpl attempt,
+      TaskAttemptState s) {
+    attempt.setState(s);
+  }
+
+  private void killRunningTaskAttempt(TezTaskAttemptID attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId,
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskRunningState();
+  }
+
+  private void failRunningTaskAttempt(TezTaskAttemptID attemptId) {
+    mockTask.handle(new TaskEventTAUpdate(attemptId,
+        TaskEventType.T_ATTEMPT_FAILED));
+    assertTaskRunningState();
+  }
+
+  /**
+   * {@link TaskState#NEW}
+   */
+  private void assertTaskNewState() {
+    assertEquals(TaskState.NEW, mockTask.getState());
+  }
+
+  /**
+   * {@link TaskState#SCHEDULED}
+   */
+  private void assertTaskScheduledState() {
+    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+  }
+
+  /**
+   * {@link TaskState#RUNNING}
+   */
+  private void assertTaskRunningState() {
+    assertEquals(TaskState.RUNNING, mockTask.getState());
+  }
+
+  /**
+   * {@link TaskState#KILL_WAIT}
+   */
+  private void assertTaskKillWaitState() {
+    assertEquals(TaskStateInternal.KILL_WAIT, mockTask.getInternalState());
+  }
+
+  /**
+   * {@link TaskState#SUCCEEDED}
+   */
+  private void assertTaskSucceededState() {
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+  }
+
+  @Test
+  public void testInit() {
+    LOG.info("--- START: testInit ---");
+    assertTaskNewState();
+    assert (mockTask.getAttemptList().size() == 0);
+  }
+
+  @Test
+  /**
+   * {@link TaskState#NEW}->{@link TaskState#SCHEDULED}
+   */
+  public void testScheduleTask() {
+    LOG.info("--- START: testScheduleTask ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+  }
+
+  @Test
+  /**
+   * {@link TaskState#SCHEDULED}->{@link TaskState#KILL_WAIT}
+   */
+  public void testKillScheduledTask() {
+    LOG.info("--- START: testKillScheduledTask ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    killTask(taskId);
+  }
+
+  @Test
+  /**
+   * Kill attempt
+   * {@link TaskState#SCHEDULED}->{@link TaskState#SCHEDULED}
+   */
+  public void testKillScheduledTaskAttempt() {
+    LOG.info("--- START: testKillScheduledTaskAttempt ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    killScheduledTaskAttempt(mockTask.getLastAttempt().getID());
+  }
+
+  @Test
+  /**
+   * Launch attempt
+   * {@link TaskState#SCHEDULED}->{@link TaskState#RUNNING}
+   */
+  public void testLaunchTaskAttempt() {
+    LOG.info("--- START: testLaunchTaskAttempt ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+  }
+
+  @Test
+  /**
+   * Kill running attempt
+   * {@link TaskState#RUNNING}->{@link TaskState#RUNNING} 
+   */
+  public void testKillRunningTaskAttempt() {
+    LOG.info("--- START: testKillRunningTaskAttempt ---");
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+  }
+
+  @Test
+  public void testTaskProgress() {
+    LOG.info("--- START: testTaskProgress ---");
+
+    // launch task
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    float progress = 0f;
+    assert (mockTask.getProgress() == progress);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+
+    // update attempt1
+    progress = 50f;
+    updateAttemptProgress(mockTask.getLastAttempt(), progress);
+    assert (mockTask.getProgress() == progress);
+    progress = 100f;
+    updateAttemptProgress(mockTask.getLastAttempt(), progress);
+    assert (mockTask.getProgress() == progress);
+
+    progress = 0f;
+    // mark first attempt as killed
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.KILLED);
+    assert (mockTask.getProgress() == progress);
+
+    // kill first attempt
+    // should trigger a new attempt
+    // as no successful attempts
+    killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+    assert (mockTask.getAttemptList().size() == 2);
+
+    assert (mockTask.getProgress() == 0f);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    progress = 50f;
+    updateAttemptProgress(mockTask.getLastAttempt(), progress);
+    assert (mockTask.getProgress() == progress);
+  }
+
+  @Test
+  public void testFailureDuringTaskAttemptCommit() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(),
+        TaskAttemptState.COMMIT_PENDING);
+    commitTaskAttempt(mockTask.getLastAttempt().getID());
+
+    // During the task attempt commit there is an exception which causes
+    // the attempt to fail
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.FAILED);
+    failRunningTaskAttempt(mockTask.getLastAttempt().getID());
+
+    assertEquals(2, mockTask.getAttemptList().size());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.SUCCEEDED);
+    commitTaskAttempt(mockTask.getLastAttempt().getID());
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    assertFalse("First attempt should not commit",
+        mockTask.canCommit(mockTask.getAttemptList().get(0).getID()));
+    assertTrue("Second attempt should commit",
+        mockTask.canCommit(mockTask.getLastAttempt().getID()));
+
+    assertTaskSucceededState();
+  }
+
+  @Test
+  public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt that succeeds
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    commitTaskAttempt(mockTask.getLastAttempt().getID());
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    // The task should now have succeeded
+    assertTaskSucceededState();
+
+    // Now fail the first task attempt, after the second has succeeded
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getAttemptList().get(0)
+        .getID(), TaskEventType.T_ATTEMPT_FAILED));
+
+    // The task should still be in the succeeded state
+    assertTaskSucceededState();
+
+  }
+
+  @SuppressWarnings("rawtypes")
+  private class MockTaskImpl extends TaskImpl {
+
+    private List<MockTaskAttemptImpl> taskAttempts = new LinkedList<MockTaskAttemptImpl>();
+
+    public MockTaskImpl(TezVertexID vertexId, int partition,
+        EventHandler eventHandler, TezConfiguration conf,
+        TaskAttemptListener taskAttemptListener,
+        Token<JobTokenIdentifier> jobToken, Credentials credentials,
+        Clock clock, TaskHeartbeatHandler thh, AppContext appContext,
+        String processorName, boolean leafVertex,
+        TaskLocationHint locationHint, Resource resource,
+        Map<String, LocalResource> localResources,
+        Map<String, String> environment, String javaOpts) {
+      super(vertexId, partition, eventHandler, conf, taskAttemptListener,
+          jobToken, credentials, clock, thh, appContext, processorName,
+          leafVertex, locationHint, resource, localResources, environment,
+          javaOpts);
+    }
+
+    @Override
+    protected TaskAttemptImpl createAttempt(int attemptNumber) {
+      MockTaskAttemptImpl attempt = new MockTaskAttemptImpl(getTaskId(),
+          attemptNumber, eventHandler, taskAttemptListener, attemptNumber,
+          conf, jobToken, credentials, clock, taskHeartbeatHandler, appContext,
+          processorName, locationHint, taskResource, localResources,
+          environment, javaOpts, true);
+      taskAttempts.add(attempt);
+      return attempt;
+    }
+
+    @Override
+    protected void internalError(TaskEventType type) {
+      super.internalError(type);
+      fail("Internal error: " + type);
+    }
+
+    MockTaskAttemptImpl getLastAttempt() {
+      return taskAttempts.get(taskAttempts.size() - 1);
+    }
+
+    List<MockTaskAttemptImpl> getAttemptList() {
+      return taskAttempts;
+    }
+
+    protected void logJobHistoryTaskStartedEvent() {
+    }
+
+    protected void logJobHistoryTaskFinishedEvent() {
+    }
+
+    protected void logJobHistoryTaskFailedEvent(TaskState finalState) {
+    }
+  }
+
+  @SuppressWarnings("rawtypes")
+  public class MockTaskAttemptImpl extends TaskAttemptImpl {
+
+    private float progress = 0;
+    private TaskAttemptState state = TaskAttemptState.NEW;
+
+    public MockTaskAttemptImpl(TezTaskID taskId, int attemptNumber,
+        EventHandler eventHandler, TaskAttemptListener tal, int partition,
+        TezConfiguration conf, Token<JobTokenIdentifier> jobToken,
+        Credentials credentials, Clock clock, TaskHeartbeatHandler thh,
+        AppContext appContext, String processorName,
+        TaskLocationHint locationHing, Resource resource,
+        Map<String, LocalResource> localResources,
+        Map<String, String> environment, String javaOpts, boolean isRescheduled) {
+      super(taskId, attemptNumber, eventHandler, tal, partition, conf,
+          jobToken, credentials, clock, thh, appContext, processorName,
+          locationHing, resource, localResources, environment, javaOpts,
+          isRescheduled);
+    }
+
+    @Override
+    public float getProgress() {
+      return progress;
+    }
+
+    public void setProgress(float progress) {
+      this.progress = progress;
+    }
+
+    public void setState(TaskAttemptState state) {
+      this.state = state;
+    }
+
+    @Override
+    public TaskAttemptState getState() {
+      return state;
+    }
+  }
+
+}


Mime
View raw message