tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject tez git commit: TEZ-3715. Differentiate between TaskAttempt submission and TaskAttempt started. (sseth)
Date Sat, 13 May 2017 00:06:03 GMT
Repository: tez
Updated Branches:
  refs/heads/master de21f990a -> dd9c517e3


TEZ-3715. Differentiate between TaskAttempt submission and TaskAttempt
started. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dd9c517e
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dd9c517e
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dd9c517e

Branch: refs/heads/master
Commit: dd9c517e3ae2f811e03242dd71891d1b2c7faf5e
Parents: de21f99
Author: Siddharth Seth <sseth@HW10890.local>
Authored: Fri May 12 17:05:47 2017 -0700
Committer: Siddharth Seth <sseth@HW10890.local>
Committed: Fri May 12 17:05:47 2017 -0700

----------------------------------------------------------------------
 .../tez/serviceplugins/api/TaskScheduler.java   |  15 ++
 .../dag/app/TaskCommunicatorContextImpl.java    |  13 +-
 .../tez/dag/app/TaskCommunicatorManager.java    |   9 +-
 .../tez/dag/app/TezTaskCommunicatorImpl.java    |   4 +-
 .../dag/app/dag/TaskAttemptStateInternal.java   |   1 +
 .../event/TaskAttemptEventStartedRemotely.java  |  33 +---
 .../dag/event/TaskAttemptEventSubmitted.java    |  49 +++++
 .../dag/app/dag/event/TaskAttemptEventType.java |   1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  94 +++++++++-
 .../app/rm/AMSchedulerEventTAStateUpdated.java  |  42 +++++
 .../tez/dag/app/rm/AMSchedulerEventType.java    |   1 +
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  20 ++
 .../tez/dag/app/rm/TaskSchedulerWrapper.java    |   4 +
 .../api/TaskCommunicatorContext.java            |  22 ++-
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 183 +++++++++++++------
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  19 +-
 .../TezTestServiceTaskCommunicatorImpl.java     |   5 +-
 17 files changed, 406 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
index 5875bd2..b28a684 100644
--- a/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
+++ b/tez-api/src/main/java/org/apache/tez/serviceplugins/api/TaskScheduler.java
@@ -45,6 +45,10 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
 
   private final TaskSchedulerContext taskSchedulerContext;
 
+  public enum SchedulerTaskState {
+    SUBMITTED, STARTED,
+  }
+
   public TaskScheduler(TaskSchedulerContext taskSchedulerContext) {
     this.taskSchedulerContext = taskSchedulerContext;
   }
@@ -192,6 +196,17 @@ public abstract class TaskScheduler implements ServicePluginLifecycle {
                                     Object clientCookie) throws ServicePluginException;
 
   /**
+   * Information about the state of a previously allocated task.
+   *
+   * @param task  the task for which an update is being provided
+   * @param state the updated state
+   * @throws ServicePluginException
+   */
+  public void taskStateUpdated(Object task, SchedulerTaskState state) throws
+      ServicePluginException {
+  }
+
+  /**
    * A request to deallocate a task. This is typically a result of a task completing - with success
    * or failure. It could also be the result of a decision to not run the task, before it is
    * allocated or started.

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
index 2709787..1adbf6e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorContextImpl.java
@@ -123,8 +123,19 @@ public class TaskCommunicatorContextImpl implements TaskCommunicatorContext, Ver
   }
 
   @Override
+  public void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
+    taskCommunicatorManager.taskSubmitted(taskAttemptId, containerId);
+  }
+
+  @Override
+  public void taskStartedRemotely(TezTaskAttemptID taskAttemptId) {
+    taskCommunicatorManager.taskStartedRemotely(taskAttemptId);
+  }
+
+  @Override
   public void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
-    taskCommunicatorManager.taskStartedRemotely(taskAttemptId, containerId);
+    taskSubmitted(taskAttemptId, containerId);
+    taskStartedRemotely(taskAttemptId);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index c9d1f2e..af82c29 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections4.ListUtils;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.tez.Utils;
 import org.apache.tez.dag.api.NamedEntityDescriptor;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
 import org.apache.tez.serviceplugins.api.DagInfo;
@@ -374,11 +375,15 @@ public class TaskCommunicatorManager extends AbstractService implements
     pingContainerHeartbeatHandler(containerId);
   }
 
-  public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
-    sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
+  public void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
+    sendEvent(new TaskAttemptEventSubmitted(taskAttemptId, containerId));
     pingContainerHeartbeatHandler(containerId);
   }
 
+  public void taskStartedRemotely(TezTaskAttemptID taskAttemptID) {
+    sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID));
+  }
+
   public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
                          String diagnostics) {
     // Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index 4563ba6..9b700f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -315,8 +315,8 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
         }
         task = getContainerTask(containerId);
         if (task != null && !task.shouldDie()) {
-          getContext()
-              .taskStartedRemotely(task.getTaskSpec().getTaskAttemptID(), containerId);
+          getContext().taskSubmitted(task.getTaskSpec().getTaskAttemptID(), containerId);
+          getContext().taskStartedRemotely(task.getTaskSpec().getTaskAttemptID());
         }
       }
       if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
index 8d0d83e..6ddfabb 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttemptStateInternal.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 public enum TaskAttemptStateInternal {
   NEW,
   START_WAIT,
+  SUBMITTED,
   RUNNING,
   KILL_IN_PROGRESS, 
   FAIL_IN_PROGRESS,

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
index e700c6c..d83eda9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventStartedRemotely.java
@@ -18,43 +18,16 @@
 
 package org.apache.tez.dag.app.dag.event;
 
-import java.util.Map;
-
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 
 public class TaskAttemptEventStartedRemotely extends TaskAttemptEvent implements RecoveryEvent {
 
-  private final ContainerId containerId;
-  // TODO Can appAcls be handled elsewhere ?
-  private final Map<ApplicationAccessType, String> applicationACLs;
-  private boolean fromRecovery = false;
-
-  public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId containerId,
-      Map<ApplicationAccessType, String> appAcls) {
+  public TaskAttemptEventStartedRemotely(TezTaskAttemptID id) {
     super(id, TaskAttemptEventType.TA_STARTED_REMOTELY);
-    this.containerId = containerId;
-    this.applicationACLs = appAcls;
-  }
-
-  public TaskAttemptEventStartedRemotely(TezTaskAttemptID id, ContainerId containerId,
-      Map<ApplicationAccessType, String> appAcls, boolean fromRecovery) {
-    this(id, containerId, appAcls);
-    this.fromRecovery = fromRecovery;
-  }
-
-  public ContainerId getContainerId() {
-    return containerId;
-  }
-
-  public Map<ApplicationAccessType, String> getApplicationACLs() {
-    return applicationACLs;
   }
 
   @Override
   public boolean isFromRecovery() {
-    return fromRecovery;
+    return false;
   }
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java
new file mode 100644
index 0000000..9fae772
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventSubmitted.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.event;
+
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventSubmitted extends TaskAttemptEvent implements RecoveryEvent {
+
+  private final ContainerId containerId;
+  private boolean fromRecovery = false;
+
+  public TaskAttemptEventSubmitted(TezTaskAttemptID id, ContainerId containerId) {
+    super(id, TaskAttemptEventType.TA_SUBMITTED);
+    this.containerId = containerId;
+  }
+
+  public TaskAttemptEventSubmitted(TezTaskAttemptID id, ContainerId containerId,
+                                   boolean fromRecovery) {
+    this(id, containerId);
+    this.fromRecovery = fromRecovery;
+  }
+
+  public ContainerId getContainerId() {
+    return containerId;
+  }
+
+  @Override
+  public boolean isFromRecovery() {
+    return fromRecovery;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index dacb0c2..63779fd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -27,6 +27,7 @@ public enum TaskAttemptEventType {
   TA_SCHEDULE,
 
   //Producer: TaskAttemptListener | Vertex after routing events
+  TA_SUBMITTED,
   TA_STARTED_REMOTELY,
   TA_STATUS_UPDATE,
   TA_TEZ_EVENT_UPDATE,  // for recovery

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 8a81575..07aed5e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -31,12 +31,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
 import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.dag.app.rm.AMSchedulerEventTAStateUpdated;
 import org.apache.tez.runtime.api.TaskFailureType;
+import org.apache.tez.serviceplugins.api.TaskScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -276,8 +279,8 @@ public class TaskAttemptImpl implements TaskAttempt,
           new SucceededTransition())
 
       .addTransition(TaskAttemptStateInternal.START_WAIT,
-          TaskAttemptStateInternal.RUNNING,
-          TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
+          TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptEventType.TA_SUBMITTED, new SubmittedTransition())
       .addTransition(TaskAttemptStateInternal.START_WAIT,
           TaskAttemptStateInternal.KILL_IN_PROGRESS,
           TaskAttemptEventType.TA_KILL_REQUEST,
@@ -303,6 +306,62 @@ public class TaskAttemptImpl implements TaskAttempt,
           TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
           new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
 
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.RUNNING,
+          TaskAttemptEventType.TA_STARTED_REMOTELY, new StartedTransition())
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
+      // Optional, may not come in for all tasks.
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DONE,
+          new SucceededTransition())
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_FAILED,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_TIMED_OUT,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_KILL_REQUEST,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_KILLED,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.KILL_IN_PROGRESS,
+          TaskAttemptEventType.TA_NODE_FAILED,
+          new TerminatedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATING,
+          new TerminatedWhileRunningTransition(FAILED_HELPER))
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.FAILED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED,
+          new ContainerCompletedWhileRunningTransition())
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.KILLED,
+          TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
+          new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
+      .addTransition(
+          TaskAttemptStateInternal.SUBMITTED,
+          EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS,
+              TaskAttemptStateInternal.SUBMITTED),
+          TaskAttemptEventType.TA_OUTPUT_FAILED,
+          new OutputReportedFailedTransition())
+      // for recovery, needs to log the TA generated events in TaskAttemptFinishedEvent
+      .addTransition(TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptStateInternal.SUBMITTED,
+          TaskAttemptEventType.TA_TEZ_EVENT_UPDATE,
+          new TezEventUpdaterTransition())
+
+
+
       .addTransition(TaskAttemptStateInternal.RUNNING,
           TaskAttemptStateInternal.RUNNING,
           TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
@@ -852,6 +911,7 @@ public class TaskAttemptImpl implements TaskAttempt,
     switch (smState) {
     case NEW:
     case START_WAIT:
+    case SUBMITTED:
       return TaskAttemptState.STARTING;
     case RUNNING:
       return TaskAttemptState.RUNNING;
@@ -1350,13 +1410,14 @@ public class TaskAttemptImpl implements TaskAttempt,
     }
   }
 
-  protected static class StartedTransition implements
+  protected static class SubmittedTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent origEvent) {
-      TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely) origEvent;
+      TaskAttemptEventSubmitted event = (TaskAttemptEventSubmitted) origEvent;
 
-      AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId()); 
+      AMContainer amContainer = ta.appContext.getAllContainers().get(event.getContainerId());
       Container container = amContainer.getContainer();
 
       ta.allocationTime = amContainer.getCurrentTaskAttemptAllocationTime();
@@ -1376,11 +1437,12 @@ public class TaskAttemptImpl implements TaskAttempt,
       ta.httpPort = nodeHttpInetAddr.getPort();
       ta.sendEvent(createDAGCounterUpdateEventTALaunched(ta));
 
-      LOG.info("TaskAttempt: [" + ta.attemptId + "] started."
+      LOG.info("TaskAttempt: [" + ta.attemptId + "] submitted."
           + " Is using containerId: [" + ta.containerId + "]" + " on NM: ["
           + ta.containerNodeId + "]");
 
-      // JobHistoryEvent
+      // JobHistoryEvent.
+      // The started event represents when the attempt was submitted to the executor.
       ta.logJobHistoryAttemptStarted();
 
       // TODO Remove after HDFS-5098
@@ -1398,15 +1460,31 @@ public class TaskAttemptImpl implements TaskAttempt,
 
       // Inform the Task
       ta.sendEvent(new TaskEventTALaunched(ta.attemptId));
-      
+
       if (ta.isSpeculationEnabled()) {
         ta.sendEvent(new SpeculatorEventTaskAttemptStatusUpdate(ta.attemptId, TaskAttemptState.RUNNING,
             ta.launchTime, true));
       }
 
+      ta.sendEvent(
+          new AMSchedulerEventTAStateUpdated(ta, TaskScheduler.SchedulerTaskState.SUBMITTED,
+              ta.getVertex().getTaskSchedulerIdentifier()));
       ta.taskHeartbeatHandler.register(ta.attemptId);
     }
   }
+
+  protected static class StartedTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttemptImpl ta, TaskAttemptEvent taskAttemptEvent) {
+      ta.sendEvent(
+          new AMSchedulerEventTAStateUpdated(ta, TaskScheduler.SchedulerTaskState.STARTED,
+              ta.getVertex().getTaskSchedulerIdentifier()));
+      // Nothing specific required for recovery, since recovery processes the START/END events
+      // only and moves the attempt to a final state, or an initial state.
+    }
+  }
   
   private boolean isSpeculationEnabled() {
     return conf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java
new file mode 100644
index 0000000..95a56e5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventTAStateUpdated.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.tez.dag.app.rm;
+
+import org.apache.tez.dag.app.dag.TaskAttempt;
+import org.apache.tez.serviceplugins.api.TaskScheduler.SchedulerTaskState;
+
+public class AMSchedulerEventTAStateUpdated extends AMSchedulerEvent {
+
+  private final TaskAttempt taskAttempt;
+  private final SchedulerTaskState state;
+
+  public AMSchedulerEventTAStateUpdated(TaskAttempt taskAttempt, SchedulerTaskState state,
+                                        int schedulerId) {
+    super(AMSchedulerEventType.S_TA_STATE_UPDATED, schedulerId);
+    this.taskAttempt = taskAttempt;
+    this.state = state;
+  }
+
+  public TaskAttempt getTaskAttempt() {
+    return taskAttempt;
+  }
+
+  public SchedulerTaskState getState() {
+    return state;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
index 053146d..cc52ef6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/AMSchedulerEventType.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.rm;
 public enum AMSchedulerEventType {
   //Producer: TaskAttempt
   S_TA_LAUNCH_REQUEST,
+  S_TA_STATE_UPDATED,
   S_TA_ENDED, // Annotated with FAILED/KILLED/SUCCEEDED.
 
   //Producer: Node

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index d32261f..57afbfc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -259,6 +259,9 @@ public class TaskSchedulerManager extends AbstractService implements
     case S_TA_LAUNCH_REQUEST:
       handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
       break;
+    case S_TA_STATE_UPDATED:
+      handleTAStateUpdated((AMSchedulerEventTAStateUpdated) sEvent);
+      break;
     case S_TA_ENDED: // TaskAttempt considered complete.
       AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
       switch(event.getState()) {
@@ -521,6 +524,23 @@ public class TaskSchedulerManager extends AbstractService implements
     }
   }
 
+  private void handleTAStateUpdated(AMSchedulerEventTAStateUpdated event) {
+    try {
+      taskSchedulers[event.getSchedulerId()].taskStateUpdated(event.getTaskAttempt(), event.getState());
+    } catch (Exception e) {
+      String msg = "Error in TaskScheduler for handling Task State Update"
+          + ", eventType=" + event.getType()
+          + ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+          + ", taskAttemptId=" + event.getTaskAttempt().getID()
+          + ", state=" + event.getState();
+      LOG.error(msg, e);
+      sendEvent(
+          new DAGAppMasterEventUserServiceFatalError(
+              DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
+              msg, e));
+    }
+  }
+
   @VisibleForTesting
   TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
                                     AppContext appContext,

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
index 43cf045..9e0d2ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerWrapper.java
@@ -63,6 +63,10 @@ public class TaskSchedulerWrapper {
     real.allocateTask(task, capability, containerId, priority, containerSignature, clientCookie);
   }
 
+  public void taskStateUpdated(Object task, TaskScheduler.SchedulerTaskState state) throws Exception {
+    real.taskStateUpdated(task, state);
+  }
+
   public boolean deallocateTask(Object task, boolean taskSucceeded, TaskAttemptEndReason endReason,
                                 @Nullable String diagnostics) throws Exception {
     return real.deallocateTask(task, taskSucceeded, endReason, diagnostics);

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
index 4c6e846..b6f3a54 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicatorContext.java
@@ -109,11 +109,31 @@ public interface TaskCommunicatorContext extends ServicePluginContextBase {
   void containerAlive(ContainerId containerId);
 
   /**
-   * Inform the framework that the task has started execution
+   * Inform the framework that the task has been submitted for execution. The expectation is that
+   * the implementing TaskCommunicator will inform the framework about task submission, followed
+   * by the task starting.
    *
    * @param taskAttemptId the relevant task attempt id
    * @param containerId   the containerId in which the task attempt is running
    */
+  void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId);
+
+  /**
+   * Inform the framework that the task has started execution
+   *
+   * @param taskAttemptId the relevant task attempt id
+   */
+  void taskStartedRemotely(TezTaskAttemptID taskAttemptId);
+
+  /**
+   * Inform the framework that the task has started execution
+   *
+   * Use {@link #taskSubmitted(TezTaskAttemptID, ContainerId)}
+   * and {@link #taskStartedRemotely(TezTaskAttemptID)} instead
+   *
+   * @param taskAttemptId the relevant task attempt id
+   */
+  @Deprecated
   void taskStartedRemotely(TezTaskAttemptID taskAttemptId, ContainerId containerId);
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 44d8213..acf8f23 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -63,6 +63,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.MockDNSToSwitchMapping;
 import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
@@ -405,14 +406,18 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    assertEquals("Task attempt is not in the STARTING state", taImpl.getState(),
+        TaskAttemptState.STARTING);
+    assertEquals("Task attempt internal state is not at SUBMITTED", taImpl.getInternalState(),
+        TaskAttemptStateInternal.SUBMITTED);
     // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 3;
+    int expectedEventsAtRunning = 5;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
@@ -505,10 +510,9 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID = taImpl.getID();
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
-    int expectedEventsAtRunning = 3;
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
+    int expectedEventsAtRunning = 5;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     assertEquals("Task attempt is not in running state", taImpl.getState(),
         TaskAttemptState.RUNNING);
@@ -596,14 +600,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 3;
+    int expectedEventsAtRunning = 5;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
@@ -686,9 +689,8 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID = taImpl.getID();
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     
@@ -773,14 +775,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 4;
+    int expectedEventsAtRunning = 6;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     verifyEventType(
         arg.getAllValues().subList(0,
@@ -881,14 +882,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 4;
+    int expectedEventsAtRunning = 6;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     verifyEventType(
         arg.getAllValues().subList(0,
@@ -926,7 +926,6 @@ public class TestTaskAttempt {
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
 
-
     Event e = verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), TaskEventTAFailed.class, 1);
@@ -991,9 +990,8 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
@@ -1066,9 +1064,8 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
@@ -1136,7 +1133,89 @@ public class TestTaskAttempt {
     // events from different tasks may not have the same value
     assertFalse(tEventFail1.getSerializingHash() == tEventFail2.getSerializingHash());
   }
-  
+
+  @Test(timeout = 5000)
+  public void testCompletedAtSubmitted() throws ServicePluginException {
+    ApplicationId appId = ApplicationId.newInstance(1, 2);
+    ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+        appId, 0);
+    TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+    TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+    TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+
+    MockEventHandler eventHandler = spy(new MockEventHandler());
+    TaskCommunicatorManagerInterface taListener = createMockTaskAttemptListener();
+
+    Configuration taskConf = new Configuration();
+    taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+    locationHint = TaskLocationHint.createTaskLocationHint(
+        new HashSet<String>(Arrays.asList(new String[]{"127.0.0.1"})), null);
+    Resource resource = Resource.newInstance(1024, 1);
+
+    NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+    @SuppressWarnings("deprecation")
+    ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+    AMContainerMap containers = new AMContainerMap(
+        mock(ContainerHeartbeatHandler.class), mock(TaskCommunicatorManagerInterface.class),
+        new ContainerContextMatcher(), appCtx);
+    containers.addContainerIfNew(container, 0, 0, 0);
+
+    doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+    doReturn(containers).when(appCtx).getAllContainers();
+
+    TaskHeartbeatHandler mockHeartbeatHandler = mock(TaskHeartbeatHandler.class);
+    TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID = taImpl.getID();
+    ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+    taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
+        TaskAttemptState.STARTING);
+
+    verify(mockHeartbeatHandler).register(taskAttemptID);
+
+    int expectedEventsAtStarting = 4;
+    verify(eventHandler, times(expectedEventsAtStarting)).handle(arg.capture());
+
+    // Ensure status_updates are handled in the submitted state.
+    taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID,
+        new TaskStatusUpdateEvent(null, 0.1f, null, false)));
+
+    taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+    assertEquals("Task attempt is not in the  SUCCEEDED state", taImpl.getState(),
+        TaskAttemptState.SUCCEEDED);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID);
+    assertEquals(0, taImpl.getDiagnostics().size());
+
+    int expectedEvenstAfterTerminating = expectedEventsAtStarting + 3;
+    arg = ArgumentCaptor.forClass(Event.class);
+    verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+
+    Event e = verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtStarting,
+            expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1);
+    assertEquals(TaskEventType.T_ATTEMPT_SUCCEEDED, e.getType());
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtStarting,
+            expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+    verifyEventType(
+        arg.getAllValues().subList(expectedEventsAtStarting,
+            expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
+  }
+
   @Test(timeout = 5000)
   public void testSuccess() throws Exception {
     ApplicationId appId = ApplicationId.newInstance(1, 2);
@@ -1183,14 +1262,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 4;
+    int expectedEventsAtRunning = 6;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
     verifyEventType(
         arg.getAllValues().subList(0,
@@ -1210,7 +1288,6 @@ public class TestTaskAttempt {
     arg = ArgumentCaptor.forClass(Event.class);
     verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
 
-
     Event e = verifyEventType(
         arg.getAllValues().subList(expectedEventsAtRunning,
             expectedEvenstAfterTerminating), TaskEventTASucceeded.class, 1);
@@ -1274,14 +1351,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", taImpl.getState(),
         TaskAttemptState.RUNNING);
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 3;
+    int expectedEventsAtRunning = 5;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
@@ -1367,14 +1443,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING,
         taImpl.getState());
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 3;
+    int expectedEventsAtRunning = 5;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
@@ -1475,14 +1550,13 @@ public class TestTaskAttempt {
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING,
         taImpl.getState());
     verify(mockHeartbeatHandler).register(taskAttemptID);
 
-    int expectedEventsAtRunning = 3;
+    int expectedEventsAtRunning = 5;
     verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
 
     taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
@@ -1573,9 +1647,8 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID = taImpl.getID();
 
     taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
-    // At state STARTING.
-    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
-        null));
+    taImpl.handle(new TaskAttemptEventSubmitted(taskAttemptID, contId));
+    taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID));
     verify(mockHeartbeatHandler).register(taskAttemptID);
     taImpl.handle(new TaskAttemptEvent(taskAttemptID,
         TaskAttemptEventType.TA_DONE));
@@ -1583,7 +1656,7 @@ public class TestTaskAttempt {
         TaskAttemptState.SUCCEEDED);
     verify(mockHeartbeatHandler).unregister(taskAttemptID);
 
-    int expectedEventsTillSucceeded = 6;
+    int expectedEventsTillSucceeded = 8;
     ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
     ArgumentCaptor<DAGHistoryEvent> histArg = ArgumentCaptor.forClass(DAGHistoryEvent.class);
     verify(eventHandler, times(expectedEventsTillSucceeded)).handle(arg.capture());
@@ -1657,8 +1730,8 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID2 = taImpl2.getID();
 
     taImpl2.handle(new TaskAttemptEventSchedule(taskAttemptID2, 0, 0));
-    // At state STARTING.
-    taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2, contId, null));
+    taImpl2.handle(new TaskAttemptEventSubmitted(taskAttemptID2, contId));
+    taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2));
     verify(mockHeartbeatHandler).register(taskAttemptID2);
     taImpl2.handle(new TaskAttemptEvent(taskAttemptID2, TaskAttemptEventType.TA_DONE));
     assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
@@ -1691,8 +1764,8 @@ public class TestTaskAttempt {
     TezTaskAttemptID taskAttemptID3 = taImpl3.getID();
 
     taImpl3.handle(new TaskAttemptEventSchedule(taskAttemptID3, 0, 0));
-    // At state STARTING.
-    taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3, contId, null));
+    taImpl3.handle(new TaskAttemptEventSubmitted(taskAttemptID3, contId));
+    taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3));
     verify(mockHeartbeatHandler).register(taskAttemptID3);
     taImpl3.handle(new TaskAttemptEvent(taskAttemptID3, TaskAttemptEventType.TA_DONE));
     assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 76ccf91..b3dd60a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -52,8 +52,6 @@ import com.google.protobuf.ByteString;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.tez.common.DrainDispatcher;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
@@ -62,7 +60,7 @@ import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResource;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceType;
 import org.apache.tez.dag.api.records.DAGProtos.PlanLocalResourceVisibility;
-import org.apache.tez.dag.app.ContainerContext;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
@@ -3653,7 +3651,8 @@ public class TestVertexImpl {
     containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
-    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+    ta.handle(new TaskAttemptEventSubmitted(ta.getID(), contId));
+    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID()));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
     ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
         TaskFailureType.NON_FATAL,
@@ -3687,7 +3686,8 @@ public class TestVertexImpl {
     containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
-    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+    ta.handle(new TaskAttemptEventSubmitted(ta.getID(), contId));
+    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID()));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
 
     ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
@@ -3723,7 +3723,8 @@ public class TestVertexImpl {
     containers.addContainerIfNew(container, 0, 0, 0);
     doReturn(containers).when(appContext).getAllContainers();
 
-    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+    ta.handle(new TaskAttemptEventSubmitted(ta.getID(), contId));
+    ta.handle(new TaskAttemptEventStartedRemotely(ta.getID()));
     Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
 
     ta.handle(new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED,
@@ -7103,14 +7104,16 @@ public class TestVertexImpl {
     Assert.assertEquals(v.getLastTaskFinishTime(), -1);
 
     taskAttempt0.handle(new TaskAttemptEventSchedule(taskAttemptId0, 0, 0));
-    taskAttempt0.handle(new TaskAttemptEventStartedRemotely(taskAttemptId0, contId, null));
+    taskAttempt0.handle(new TaskAttemptEventSubmitted(taskAttemptId0, contId));
+    taskAttempt0.handle(new TaskAttemptEventStartedRemotely(taskAttemptId0));
     taskAttempt0.handle(new TaskAttemptEvent(taskAttemptId0, TaskAttemptEventType.TA_DONE));
     //task0.handle(new TaskEventTAUpdate(taskAttemptId0, TaskEventType.T_ATTEMPT_SUCCEEDED));
 
     Assert.assertEquals(v.getLastTaskFinishTime(), -1);
 
     taskAttempt1.handle(new TaskAttemptEventSchedule(taskAttemptId1, 0, 0));
-    taskAttempt1.handle(new TaskAttemptEventStartedRemotely(taskAttemptId1, contId, null));
+    taskAttempt1.handle(new TaskAttemptEventSubmitted(taskAttemptId1, contId));
+    taskAttempt1.handle(new TaskAttemptEventStartedRemotely(taskAttemptId1));
     taskAttempt1.handle(new TaskAttemptEvent(taskAttemptId1, TaskAttemptEventType.TA_DONE));
     //task1.handle(new TaskEventTAUpdate(taskAttemptId1, TaskEventType.T_ATTEMPT_SUCCEEDED));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/dd9c517e/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index 6c07107..732c81a 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -132,8 +132,9 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
     }
     // Have to register this up front right now. Otherwise, it's possible for the task to start
     // sending out status/DONE/KILLED/FAILED messages before TAImpl knows how to handle them.
-    getContext()
-        .taskStartedRemotely(taskSpec.getTaskAttemptID(), containerId);
+
+    getContext().taskSubmitted(taskSpec.getTaskAttemptID(), containerId);
+    getContext().taskStartedRemotely(taskSpec.getTaskAttemptID());
     communicator.submitWork(requestProto, host, port,
         new TezTestServiceCommunicator.ExecuteRequestCallback<SubmitWorkResponseProto>() {
           @Override


Mime
View raw message