tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjf...@apache.org
Subject tez git commit: TEZ-2311. AM can hang if kill received while recovering from previous attempt (zjffdu)
Date Fri, 31 Jul 2015 07:13:31 GMT
Repository: tez
Updated Branches:
  refs/heads/master 3347c94f8 -> 4f66cb4ba


TEZ-2311. AM can hang if kill received while recovering from previous attempt (zjffdu)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4f66cb4b
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4f66cb4b
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4f66cb4b

Branch: refs/heads/master
Commit: 4f66cb4baf926f9dcaf2ca4a47461f3ebf97679e
Parents: 3347c94
Author: Jeff Zhang <zjffdu@apache.org>
Authored: Fri Jul 31 15:13:21 2015 +0800
Committer: Jeff Zhang <zjffdu@apache.org>
Committed: Fri Jul 31 15:13:21 2015 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../java/org/apache/tez/client/LocalClient.java |   6 +-
 .../tez/dag/api/client/DAGClientHandler.java    |   2 +-
 ...DAGClientAMProtocolBlockingPBServerImpl.java |   8 +-
 .../org/apache/tez/dag/app/DAGAppMaster.java    |  38 ++++-
 .../org/apache/tez/dag/app/RecoveryParser.java  |  22 +++
 .../apache/tez/dag/app/dag/impl/DAGImpl.java    |  10 +-
 .../apache/tez/dag/app/dag/impl/VertexImpl.java |  63 ++++---
 .../tez/dag/history/HistoryEventType.java       |   1 +
 .../dag/history/events/DAGKillRequestEvent.java | 127 ++++++++++++++
 tez-dag/src/main/proto/HistoryEvents.proto      |   6 +
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   |  95 +++++++++++
 .../dag/app/dag/impl/TestVertexRecovery.java    | 164 ++++++++++++++++++-
 .../TestHistoryEventsProtoConversion.java       |  15 ++
 .../impl/TestHistoryEventJsonConversion.java    |   4 +
 .../ats/TestHistoryEventTimelineConversion.java |   4 +
 16 files changed, 531 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index deb874d..764303d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -277,6 +277,7 @@ Release 0.6.2: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2311. AM can hang if kill received while recovering from previous attempt.
   TEZ-2623. Fix module dependencies related to hadoop-auth.
   TEZ-2560. fix tex-ui build for maven 3.3+
   TEZ-2600. When used with HDFS federation(viewfs) ,tez will throw a error

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
index a5e8e15..1bb2002 100644
--- a/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
+++ b/tez-dag/src/main/java/org/apache/tez/client/LocalClient.java
@@ -134,7 +134,11 @@ public class LocalClient extends FrameworkClient {
 
   @Override
   public void killApplication(ApplicationId appId) {
-    clientHandler.shutdownAM();
+    try {
+      clientHandler.shutdownAM();
+    } catch (TezException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
index 13cd8f4..0f674f3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/DAGClientHandler.java
@@ -120,7 +120,7 @@ public class DAGClientHandler {
     return dagAppMaster.submitDAGToAppMaster(dagPlan, additionalAmResources);
   }
 
-  public synchronized void shutdownAM() {
+  public synchronized void shutdownAM() throws TezException {
     LOG.info("Received message to shutdown AM");
     if (dagAppMaster != null) {
       dagAppMaster.shutdownTezAM();

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
index a5f51c8..fc6b267 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/client/rpc/DAGClientAMProtocolBlockingPBServerImpl.java
@@ -176,8 +176,12 @@ public class DAGClientAMProtocolBlockingPBServerImpl implements DAGClientAMProto
     if (!real.getACLManager().checkAMModifyAccess(user)) {
       throw new AccessControlException("User " + user + " cannot perform AM modify operation");
     }
-    real.shutdownAM();
-    return ShutdownSessionResponseProto.newBuilder().build();
+    try {
+      real.shutdownAM();
+      return ShutdownSessionResponseProto.newBuilder().build();
+    } catch(TezException e) {
+      throw wrapException(e);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 7ad8f41..b91c3d1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -157,6 +157,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
 import org.apache.tez.dag.history.events.AMLaunchedEvent;
 import org.apache.tez.dag.history.events.AMStartedEvent;
 import org.apache.tez.dag.history.events.AppLaunchedEvent;
+import org.apache.tez.dag.history.events.DAGKillRequestEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.utils.DAGUtils;
 import org.apache.tez.dag.records.TezDAGID;
@@ -1173,7 +1174,7 @@ public class DAGAppMaster extends AbstractService {
         + oldState + " new state: " + state);
   }
 
-  public void shutdownTezAM() {
+  public void shutdownTezAM() throws TezException {
     sessionStopped.set(true);
     synchronized (this) {
       this.taskSchedulerEventHandler.setShouldUnregisterFlag();
@@ -1182,6 +1183,11 @@ public class DAGAppMaster extends AbstractService {
         //send a DAG_KILL message
         LOG.info("Sending a kill event to the current DAG"
             + ", dagId=" + currentDAG.getID());
+        try {
+          logDAGKillRequestEvent(currentDAG.getID(), true);
+        } catch (IOException e) {
+          throw new TezException(e);
+        }
         sendEvent(new DAGEvent(currentDAG.getID(), DAGEventType.DAG_KILL));
       } else {
         LOG.info("No current running DAG, shutting down the AM");
@@ -1193,6 +1199,12 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
+  void logDAGKillRequestEvent(TezDAGID dagId, boolean isSessionStopped) throws IOException {
+    DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent(dagId, clock.getTime(), isSessionStopped);
+    historyEventHandler.handleCriticalEvent(
+        new DAGHistoryEvent(dagId, killRequestEvent));
+  }
+
   public String submitDAGToAppMaster(DAGPlan dagPlan,
       Map<String, LocalResource> additionalResources) throws TezException {
     if (sessionStopped.get()) {
@@ -1228,7 +1240,12 @@ public class DAGAppMaster extends AbstractService {
   }
 
   @SuppressWarnings("unchecked")
-  public void tryKillDAG(DAG dag){
+  public void tryKillDAG(DAG dag) throws TezException {
+    try {
+      logDAGKillRequestEvent(dag.getID(), false);
+    } catch (IOException e) {
+      throw new TezException(e);
+    }
     dispatcher.getEventHandler().handle(new DAGEvent(dag.getID(), DAGEventType.DAG_KILL));
   }
   
@@ -1756,7 +1773,14 @@ public class DAGAppMaster extends AbstractService {
         amResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
         cumulativeAdditionalResources.putAll(recoveredDAGData.cumulativeAdditionalResources);
       }
-      
+
+      if (recoveredDAGData.isSessionStopped) {
+        LOG.info("AM crashed when shutting down in the previous attempt"
+            + ", continue the shutdown and recover it to SUCCEEDED");
+        this.sessionStopped.set(true);
+        return;
+      }
+
       if (recoveredDAGData.isCompleted
           || recoveredDAGData.nonRecoverable) {
         LOG.info("Found previous DAG in completed or non-recoverable state"
@@ -1822,7 +1846,11 @@ public class DAGAppMaster extends AbstractService {
       this.dagSubmissionTimer.scheduleAtFixedRate(new TimerTask() {
         @Override
         public void run() {
-          checkAndHandleSessionTimeout();
+          try {
+            checkAndHandleSessionTimeout();
+          } catch (TezException e) {
+            LOG.error("Error when check AM session timeout", e);
+          }
         }
       }, sessionTimeoutInterval, sessionTimeoutInterval / 10);
     }
@@ -1973,7 +2001,7 @@ public class DAGAppMaster extends AbstractService {
     }
   }
 
-  private synchronized void checkAndHandleSessionTimeout() {
+  private synchronized void checkAndHandleSessionTimeout() throws TezException {
     if (EnumSet.of(DAGAppMasterState.RUNNING,
         DAGAppMasterState.RECOVERING).contains(this.state)
         || sessionStopped.get()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
index 8922d6a..8ce835b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/RecoveryParser.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGKillRequestEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
 import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
@@ -105,6 +106,7 @@ public class RecoveryParser {
     public DAGState dagState = null;
     public boolean isCompleted = false;
     public boolean nonRecoverable = false;
+    public boolean isSessionStopped = false;
     public String reason = null;
     public Map<String, LocalResource> cumulativeAdditionalResources = null;
   }
@@ -164,6 +166,9 @@ public class RecoveryParser {
       case DAG_FINISHED:
         event = new DAGFinishedEvent();
         break;
+      case DAG_KILL_REQUEST:
+        event = new DAGKillRequestEvent();
+        break;
       case CONTAINER_LAUNCHED:
         event = new ContainerLaunchedEvent();
         break;
@@ -350,6 +355,11 @@ public class RecoveryParser {
           dagFinishedEvent.fromSummaryProtoStream(proto);
           dagState = dagFinishedEvent.getState();
           break;
+        case DAG_KILL_REQUEST:
+          DAGKillRequestEvent killRequestEvent = new DAGKillRequestEvent();
+          killRequestEvent.fromSummaryProtoStream(proto);
+          bufferedSummaryEvents.add(killRequestEvent);
+          break;
         case DAG_COMMIT_STARTED:
           dagCommitCompleted = false;
           break;
@@ -663,6 +673,13 @@ public class RecoveryParser {
             recoveredDAGData.recoveredDAG.restoreFromEvent(event);
             break;
           }
+          case DAG_KILL_REQUEST:
+          {
+            LOG.info("Recovering from event"
+                + ", eventType=" + eventType
+                + ", event=" + event.toString());
+            break;
+          }
           case DAG_FINISHED:
           {
             LOG.info("Recovering from event"
@@ -842,6 +859,11 @@ public class RecoveryParser {
                 vertex.restoreFromEvent(vertexFinishedEvent);
               }
               break;
+            case DAG_KILL_REQUEST:
+              DAGKillRequestEvent killRequestEvent = (DAGKillRequestEvent)bufferedEvent;
+              recoveredDAGData.isSessionStopped = killRequestEvent.isSessionStopped();
+              recoveredDAGData.recoveredDAG.restoreFromEvent(bufferedEvent);
+              break;
             default:
               throw new RuntimeException("Invalid data found in buffered summary events"
                   + ", unknown event type "

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
index a1c6368..3d44ba6 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java
@@ -82,6 +82,7 @@ import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
 import org.apache.tez.dag.api.records.DAGProtos.PlanVertexGroupInfo;
 import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
 import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.DAGAppMasterState;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAG;
@@ -446,6 +447,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
   Map<String, VertexGroupInfo> vertexGroups = Maps.newHashMap();
   Map<String, List<VertexGroupInfo>> vertexGroupInfo = Maps.newHashMap();
   private DAGState recoveredState = DAGState.NEW;
+
   @VisibleForTesting
   boolean recoveryCommitInProgress = false;
   Map<String, Boolean> recoveredGroupCommits = new HashMap<String, Boolean>();
@@ -660,6 +662,10 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
           recoveredGroupCommits.put(
               vertexGroupCommitFinishedEvent.getVertexGroupName(), true);
           return recoveredState;
+        case DAG_KILL_REQUEST:
+          trySetTerminationCause(DAGTerminationCause.DAG_KILL);
+          this.recoveredState = DAGState.KILLED;
+          return recoveredState;
         case DAG_FINISHED:
           recoveryCommitInProgress = false;
           DAGFinishedEvent finishedEvent = (DAGFinishedEvent) historyEvent;
@@ -1098,7 +1104,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
       //notify the eventhandler of state change
       if (oldState != getInternalState()) {
         LOG.info(dagId + " transitioned from " + oldState + " to "
-                 + getInternalState());
+                 + getInternalState() + " due to event " + event.getType());
       }
     }
 
@@ -1644,7 +1650,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG,
         case FAILED:
         case KILLED:
           // Completed
-
+          
           // Recover all other data for all vertices
           // send recover event to all vertices with a final end state
           for (Vertex v : dag.vertices.values()) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 8d8a2de..3888c7a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -2652,10 +2652,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           }
           break;
         case SUCCEEDED:
-        case FAILED:
-        case KILLED:
-          if (vertex.recoveredState == VertexState.SUCCEEDED
-              && vertex.hasCommitter
+          if (vertex.hasCommitter
               && vertex.summaryCompleteSeen && !vertex.vertexCompleteSeen) {
             String msg = "Cannot recover vertex as all recovery events not"
                 + " found, vertex=" + vertex.logIdentifier
@@ -2669,14 +2666,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           } else {
             // recover tasks
             if (vertex.tasks != null && vertex.numTasks != 0) {
-              TaskState taskState = TaskState.KILLED;
-              if (vertex.recoveredState == VertexState.SUCCEEDED) {
-                taskState = TaskState.SUCCEEDED;
-              } else if (vertex.recoveredState == VertexState.KILLED) {
-                taskState = TaskState.KILLED;
-              } else if (vertex.recoveredState == VertexState.FAILED) {
-                taskState = TaskState.FAILED;
-              }
+              TaskState taskState = TaskState.SUCCEEDED;
               for (Task task : vertex.tasks.values()) {
                 vertex.eventHandler.handle(
                     new TaskEventRecoverTask(task.getTaskId(),
@@ -2699,6 +2689,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             }
           }
           break;
+        case FAILED:
+        case KILLED:
+          // vertex may be killed/failed before its tasks are scheduled. so here just recover vertex
+          // to the recovered state without waiting for its tasks' feedback and recover tasks to
+          // the corresponding state without recover its data.
+          if (vertex.tasks != null && vertex.numTasks != 0) {
+            TaskState taskState = TaskState.FAILED;
+            if (vertex.recoveredState == VertexState.KILLED) {
+              taskState = TaskState.KILLED;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState, false));
+            }
+          }
+          endState = vertex.recoveredState;
+          vertex.finished(endState);
+          break;
         default:
           LOG.warn("Invalid recoveredState found when trying to recover"
               + " vertex"
@@ -3067,19 +3076,10 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
           }
           break;
         case SUCCEEDED:
-        case FAILED:
-        case KILLED:
           // recover tasks
           assert vertex.tasks.size() == vertex.numTasks;
           if (vertex.tasks != null  && vertex.numTasks != 0) {
-            TaskState taskState = TaskState.KILLED;
-            if (vertex.recoveredState == VertexState.SUCCEEDED) {
-              taskState = TaskState.SUCCEEDED;
-            } else if (vertex.recoveredState == VertexState.KILLED) {
-              taskState = TaskState.KILLED;
-            } else if (vertex.recoveredState == VertexState.FAILED) {
-              taskState = TaskState.FAILED;
-            }
+            TaskState taskState = TaskState.SUCCEEDED;
             for (Task task : vertex.tasks.values()) {
               vertex.eventHandler.handle(
                   new TaskEventRecoverTask(task.getTaskId(),
@@ -3102,6 +3102,25 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
             vertex.finished(endState);
           }
           break;
+        case FAILED:
+        case KILLED:
+          // vertex may be killed/failed before its tasks are scheduled. so here just recover vertex
+          // to the recovered state without waiting for its tasks' feedback and recover tasks to
+          // the corresponding state without recover its data.
+          if (vertex.tasks != null && vertex.numTasks != 0) {
+            TaskState taskState = TaskState.FAILED;
+            if (vertex.recoveredState == VertexState.KILLED) {
+              taskState = TaskState.KILLED;
+            }
+            for (Task task : vertex.tasks.values()) {
+              vertex.eventHandler.handle(
+                  new TaskEventRecoverTask(task.getTaskId(),
+                      taskState, false));
+            }
+          }
+          endState = vertex.recoveredState;
+          vertex.finished(endState);
+          break;
         default:
           LOG.warn("Invalid recoveredState found when trying to recover"
               + " vertex, recoveredState=" + vertex.recoveredState);

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
index 6949d21..d791d9e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/HistoryEventType.java
@@ -26,6 +26,7 @@ public enum HistoryEventType {
   DAG_INITIALIZED,
   DAG_STARTED,
   DAG_FINISHED,
+  DAG_KILL_REQUEST,
   VERTEX_INITIALIZED,
   VERTEX_STARTED,
   VERTEX_PARALLELISM_UPDATED,

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
new file mode 100644
index 0000000..525e361
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/DAGKillRequestEvent.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.dag.history.events;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TezDAGID;
+import org.apache.tez.dag.recovery.records.RecoveryProtos;
+import org.apache.tez.dag.recovery.records.RecoveryProtos.SummaryEventProto;
+import org.apache.tez.dag.utils.ProtoUtils;
+
+public class DAGKillRequestEvent implements HistoryEvent, SummaryEvent {
+
+  private TezDAGID dagID;
+  private long killRequestTime;
+  private boolean isSessionStopped;
+
+  public DAGKillRequestEvent() {
+  }
+
+  public DAGKillRequestEvent(TezDAGID dagID, long killRequestTime, boolean isSessionStopped) {
+    this.dagID = dagID;
+    this.killRequestTime = killRequestTime;
+    this.isSessionStopped = isSessionStopped;
+  }
+
+  @Override
+  public HistoryEventType getEventType() {
+    return HistoryEventType.DAG_KILL_REQUEST;
+  }
+
+  @Override
+  public boolean isRecoveryEvent() {
+    return true;
+  }
+
+  @Override
+  public boolean isHistoryEvent() {
+    return false;
+  }
+
+  @Override
+  public void toProtoStream(OutputStream outputStream) throws IOException {
+    toProto().writeDelimitedTo(outputStream);
+  }
+
+  public RecoveryProtos.DAGKillRequestProto toProto() {
+    return RecoveryProtos.DAGKillRequestProto.newBuilder()
+        .setDagId(dagID.toString())
+        .setKillRequestTime(killRequestTime)
+        .setIsSessionStopped(isSessionStopped)
+        .build();
+  }
+
+  @Override
+  public void fromProtoStream(InputStream inputStream) throws IOException {
+    RecoveryProtos.DAGKillRequestProto proto =
+        RecoveryProtos.DAGKillRequestProto.parseDelimitedFrom(inputStream);
+    if (proto == null) {
+      throw new IOException("No data found in stream");
+    }
+    fromProto(proto);
+  }
+  
+  public void fromProto(RecoveryProtos.DAGKillRequestProto proto) {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.killRequestTime = proto.getKillRequestTime();
+    this.isSessionStopped = proto.getIsSessionStopped();
+  }
+
+  @Override
+  public void toSummaryProtoStream(OutputStream outputStream)
+      throws IOException {
+    ProtoUtils.toSummaryEventProto(dagID, killRequestTime,
+        HistoryEventType.DAG_KILL_REQUEST, isSessionStopped ? new byte[]{1} : new byte[]{0})
+        .writeDelimitedTo(outputStream);
+  }
+
+  @Override
+  public void fromSummaryProtoStream(SummaryEventProto proto)
+      throws IOException {
+    this.dagID = TezDAGID.fromString(proto.getDagId());
+    this.killRequestTime = proto.getTimestamp();
+    if (proto.getEventPayload().byteAt(0) == 1) {
+      this.isSessionStopped = true;
+    } else {
+      this.isSessionStopped = false;
+    }
+  }
+
+  @Override
+  public boolean writeToRecoveryImmediately() {
+    return false;
+  }
+
+  public TezDAGID getDagID() {
+    return dagID;
+  }
+  
+  public long getKillRequestTime() {
+    return killRequestTime;
+  }
+
+  public boolean isSessionStopped() {
+    return isSessionStopped;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 617a644..8af48b6 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -77,6 +77,12 @@ message DAGFinishedProto {
   optional TezCountersProto counters = 5;
 }
 
+message DAGKillRequestProto {
+  optional string dag_id = 1;
+  optional int64 kill_request_time = 2;
+  optional bool isSessionStopped = 3;
+}
+
 message VertexInitializedProto {
   optional string vertex_name = 1;
   optional string vertex_id = 2;

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index a0d5fb5..792fa63 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.tez.common.TezAbstractEvent;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.AppContext;
@@ -39,6 +40,7 @@ import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.DAGTerminationCause;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
@@ -49,6 +51,7 @@ import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGKillRequestEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitFinishedEvent;
 import org.apache.tez.dag.history.events.VertexGroupCommitStartedEvent;
@@ -155,6 +158,10 @@ public class TestDAGRecovery {
     assertEquals(tezCounters, dag.fullCounters);
   }
 
+  private void restoreFromDAGKillRequestEvent() {
+    dag.restoreFromEvent(new DAGKillRequestEvent(dag.getID(), 0L, false));
+  }
+
   /**
    * New -> RecoverTransition
    */
@@ -174,6 +181,18 @@ public class TestDAGRecovery {
   }
 
   /**
+   * New -> restoreFromDAGKillRequested -> RecoverTransition
+   */
+  @Test(timeout = 5000)
+  public void testDAGRecovery_FromNewToKilled() {
+    restoreFromDAGKillRequestEvent();
+    assertNewState();
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.KILLED, dag.getState());
+    assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+  }
+
+  /**
    * restoreFromDAGInitializedEvent -> RecoverTransition
    */
   @Test(timeout = 5000)
@@ -198,6 +217,18 @@ public class TestDAGRecovery {
   }
 
   /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGKillRequested -> RecoverTransition
+   */
+  @Test(timeout = 5000)
+  public void testDAGRecovery_FromInitedToKilled() {
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGKillRequestEvent();
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.KILLED, dag.getState());
+    assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+  }
+
+  /**
    * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
    * RecoverTransition
    */
@@ -224,6 +255,34 @@ public class TestDAGRecovery {
   }
 
   /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> restoreFromDAGKillRequested
+   * RecoverTransition
+   */
+  @Test(timeout = 5000)
+  public void testDAGRecovery_FromStartedtoKilled() {
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGKillRequestEvent();
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.KILLED, dag.getState());
+    assertEquals(DAGTerminationCause.DAG_KILL, dag.getTerminationCause());
+    // send recover event to all the vertices with desired state of KILLED
+    ArgumentCaptor<TezAbstractEvent> eventCaptor =
+        ArgumentCaptor.forClass(TezAbstractEvent.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<TezAbstractEvent> events = eventCaptor.getAllValues();
+    assertEquals(7, events.size());
+    for (int i=0;i<6;++i) {
+      TezAbstractEvent vEvent = events.get(i);
+      assertTrue(vEvent instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent = (VertexEventRecoverVertex) vEvent;
+      assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
+    }
+    assertTrue(events.get(6) instanceof DAGAppMasterEventDAGFinished);
+  }
+
+  /**
    * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent ->
    * restoreFromDAGFinishedEvent (SUCCEEDED) -> RecoverTransition
    */
@@ -323,6 +382,42 @@ public class TestDAGRecovery {
   }
 
   /**
+   * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent
+   * --> restoreFromDAGKillRequestEvent -->
+   * restoreFromDAGFinishedEvent -> RecoverTransition
+   */
+  @Test(timeout = 5000)
+  public void testDAGRecovery_Finished_KILLED_WithKillRequest() {
+    // same behavior as without DAGKillRequestEvent because DAGFinishedEvent is seen
+    assertNewState();
+    restoreFromDAGInitializedEvent();
+    restoreFromDAGStartedEvent();
+    restoreFromDAGKillRequestEvent();
+    restoreFromDAGFinishedEvent(DAGState.KILLED);
+
+    dag.handle(new DAGEventRecoverEvent(dagId, new ArrayList<URL>()));
+    assertEquals(DAGState.KILLED, dag.getState());
+    assertEquals(tezCounters, dag.getAllCounters());
+    // recover all the vertices to KILLED
+    ArgumentCaptor<Event> eventCaptor = ArgumentCaptor.forClass(Event.class);
+    verify(mockEventHandler, times(7)).handle(eventCaptor.capture());
+    List<Event> events = eventCaptor.getAllValues();
+    int i = 0;
+    for (; i < 6; ++i) {
+      assertTrue(events.get(i) instanceof VertexEventRecoverVertex);
+      VertexEventRecoverVertex recoverEvent =
+          (VertexEventRecoverVertex) events.get(i);
+      assertEquals(VertexState.KILLED, recoverEvent.getDesiredState());
+    }
+
+    // send DAGAppMasterEventDAGFinished at last
+    assertTrue(events.get(i) instanceof DAGAppMasterEventDAGFinished);
+    DAGAppMasterEventDAGFinished dagFinishedEvent =
+        (DAGAppMasterEventDAGFinished) events.get(i);
+    assertEquals(DAGState.KILLED, dagFinishedEvent.getDAGState());
+  }
+
+  /**
    * restoreFromDAGInitializedEvent -> restoreFromDAGStartedEvent -> ->
    * restoreFromDAGFinishedEvent -> RecoverTransition
    */

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
index 2d03c60..0f532fb 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexRecovery.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.dag.api.oldrecords.TaskState;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
@@ -56,9 +57,13 @@ import org.apache.tez.dag.app.AppContext;
 import org.apache.tez.dag.app.ClusterInfo;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.TaskHeartbeatHandler;
+import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.Task;
 import org.apache.tez.dag.app.dag.VertexState;
 import org.apache.tez.dag.app.dag.VertexTerminationCause;
+import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
 import org.apache.tez.dag.app.dag.event.DAGEvent;
+import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
@@ -72,6 +77,8 @@ import org.apache.tez.dag.app.dag.event.VertexEventType;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
 import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
 import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
 import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
 import org.apache.tez.dag.history.events.VertexFinishedEvent;
@@ -354,6 +361,13 @@ public class TestVertexRecovery {
     return dag;
   }
 
+  class DAGEventHandler implements EventHandler<DAGEvent> {
+    @Override
+    public void handle(DAGEvent event) {
+      dag.handle(event);
+    }
+  }
+
   class VertexEventHanlder implements EventHandler<VertexEvent> {
 
     private List<VertexEvent> events = new ArrayList<VertexEvent>();
@@ -395,6 +409,7 @@ public class TestVertexRecovery {
     }
   }
 
+  private DAGEventHandler dagEventHandler;
   private VertexEventHanlder vertexEventHandler;
   private TaskEventHandler taskEventHandler;
 
@@ -402,7 +417,9 @@ public class TestVertexRecovery {
   public void setUp() throws IOException {
 
     dispatcher = new DrainDispatcher();
-    dispatcher.register(DAGEventType.class, mock(EventHandler.class));
+    dispatcher.register(DAGAppMasterEventType.class, mock(EventHandler.class));
+    dagEventHandler = new DAGEventHandler();
+    dispatcher.register(DAGEventType.class, dagEventHandler);
     vertexEventHandler = new VertexEventHanlder();
     dispatcher.register(VertexEventType.class, vertexEventHandler);
     taskEventHandler = new TaskEventHandler();
@@ -424,7 +441,8 @@ public class TestVertexRecovery {
     ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
     doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
 
-    dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
+    dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null));
+    dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName"));
     LOG.info("finish setUp");
   }
 
@@ -931,6 +949,33 @@ public class TestVertexRecovery {
 
   @Test(timeout = 5000)
   public void testRecovery_VertexManagerErrorOnRecovery() {
+    // In order to simulate the behavior that VertexManagerError happens in recovering stage, need to start the recovering from
+    // vertex and disable the the eventhandling of DAG (use mock here).
+    dispatcher = new DrainDispatcher();
+    dispatcher.register(DAGEventType.class, mock(EventHandler.class));
+    vertexEventHandler = new VertexEventHanlder();
+    dispatcher.register(VertexEventType.class, vertexEventHandler);
+    taskEventHandler = new TaskEventHandler();
+    dispatcher.register(TaskEventType.class, taskEventHandler);
+    dispatcher.register(TaskAttemptEventType.class,
+        new TaskAttemptEventHandler());
+    dispatcher.init(new Configuration());
+    dispatcher.start();
+    mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    DAGPlan dagPlan = createDAGPlan();
+    dag =
+        new DAGImpl(dagId, new Configuration(), dagPlan,
+            dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
+            new Credentials(), new SystemClock(), user,
+            mock(TaskHeartbeatHandler.class), mockAppContext);
+    when(mockAppContext.getCurrentDAG()).thenReturn(dag);
+    ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
+    doReturn(clusterInfo).when(mockAppContext).getClusterInfo();
+    dag.restoreFromEvent(new DAGInitializedEvent(dag.getID(), 0L, "user", "dagName", null));
+    dag.restoreFromEvent(new DAGStartedEvent(dag.getID(), 0L, "user", "dagName"));
+    LOG.info("finish setUp");
+
+    /////////////////// Start the recover ////////////////////////
     VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
     restoreFromInitializedEvent(vertex1);
     vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
@@ -1177,4 +1222,119 @@ public class TestVertexRecovery {
     assertTaskRecoveredEventSent(vertex2);
     assertTaskRecoveredEventSent(vertex3);
   }
+
+  /**
+   * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
+   * restoreFromVertexFinished (KILLED)
+   * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
+   * restoreFromVertexFinished (KILLED)
+   * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
+   * restoreFromVertexFinished (KILLED)
+  */
+  @Test(timeout = 5000)
+  public void testRecovery_KilledBeforeTaskStarted() {
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
+    restoreFromInitializedEvent(vertex1);
+    VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
+        initRequestedTime + 100L, initRequestedTime + 200L));
+    recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
+        "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+        initRequestedTime + 400L, initRequestedTime + 500L,
+        VertexState.KILLED, "", new TezCounters(), new VertexStats(), null));
+    assertEquals(VertexState.KILLED, recoveredState);
+
+    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
+    restoreFromInitializedEvent(vertex2);
+    recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(),
+        initRequestedTime + 100L, initRequestedTime + 200L));
+    recoveredState = vertex2.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
+        "vertex2", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+        initRequestedTime + 400L, initRequestedTime + 500L,
+        VertexState.KILLED, "", new TezCounters(), new VertexStats(), null));
+    assertEquals(VertexState.KILLED, recoveredState);
+
+    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
+    restoreFromInitializedEvent(vertex3);
+    recoveredState = vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(),
+        initRequestedTime + 100L, initRequestedTime + 200L));
+    recoveredState = vertex3.restoreFromEvent(new VertexFinishedEvent(vertex3.getVertexId(),
+        "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+        initRequestedTime + 400L, initRequestedTime + 500L,
+        VertexState.KILLED, "", new TezCounters(), new VertexStats(), null));
+    assertEquals(VertexState.KILLED, recoveredState);
+
+    // start the recovering, send RecoverEvent to its root vertices (v1, v2)
+    dag.handle(new DAGEventRecoverEvent(dag.getID(), null));
+    dispatcher.await();
+    // recover v1 to KILLED directly and also its tasks are recovered to KILLED
+    assertEquals(VertexState.KILLED, vertex1.getState());
+    for (Task task : vertex1.tasks.values()) {
+      assertEquals(TaskState.KILLED, task.getState());
+    }
+    // recover v2 to KILLED directly and also its tasks are recovered to KILLED
+    assertEquals(VertexState.KILLED, vertex2.getState());
+    for (Task task : vertex2.tasks.values()) {
+      assertEquals(TaskState.KILLED, task.getState());
+    }
+    // recover v3 to KILLED directly and also its tasks are recovered to KILLED
+    assertEquals(VertexState.KILLED, vertex3.getState());
+    for (Task task : vertex3.tasks.values()) {
+      assertEquals(TaskState.KILLED, task.getState());
+    }
+  }
+
+  /**
+   * vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
+   * restoreFromVertexFinished (FAILED)
+   * vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted
+   * vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
+   * restoreFromVertexFinished (FAILED)
+  */
+  @Test(timeout = 5000)
+  public void testRecovery_FailedBeforeTaskStarted() {
+    VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
+    restoreFromInitializedEvent(vertex1);
+    VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
+        initRequestedTime + 100L, initRequestedTime + 200L));
+    recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
+        "vertex1", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+        initRequestedTime + 400L, initRequestedTime + 500L,
+        VertexState.FAILED, "", new TezCounters(), new VertexStats(), null));
+    assertEquals(VertexState.FAILED, recoveredState);
+
+    VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
+    restoreFromInitializedEvent(vertex2);
+    recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(),
+        initRequestedTime + 100L, initRequestedTime + 200L));
+    assertEquals(VertexState.RUNNING, recoveredState);
+
+    VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
+    restoreFromInitializedEvent(vertex3);
+    recoveredState = vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(),
+        initRequestedTime + 100L, initRequestedTime + 200L));
+    recoveredState = vertex3.restoreFromEvent(new VertexFinishedEvent(vertex3.getVertexId(),
+        "vertex3", 1, initRequestedTime, initedTime, initRequestedTime + 300L,
+        initRequestedTime + 400L, initRequestedTime + 500L,
+        VertexState.FAILED, "", new TezCounters(), new VertexStats(), null));
+    assertEquals(VertexState.FAILED, recoveredState);
+
+    // start the recovering from DAG
+    dag.handle(new DAGEventRecoverEvent(dag.getID(), null));
+    dispatcher.await();
+    // recover v1 to KILLED directly and also its tasks are recovered to KILLED
+    assertEquals(VertexState.FAILED, vertex1.getState());
+    for (Task task : vertex1.tasks.values()) {
+      assertEquals(TaskState.FAILED, task.getState());
+    }
+    // recover v2 to KILLED finally due to v1/v3 failed will cause dag failed which result in
+    // dag kill all its vertices
+    assertEquals(VertexState.KILLED, vertex2.getState());
+
+    // recover v3 to KILLED directly and also its tasks are recovered to KILLED
+    assertEquals(VertexState.FAILED, vertex3.getState());
+    for (Task task : vertex3.tasks.values()) {
+      assertEquals(TaskState.FAILED, task.getState());
+    }
+    assertEquals(DAGState.FAILED, dag.getState());
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 1575adc..b52a4f9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -213,6 +213,18 @@ public class TestHistoryEventsProtoConversion {
     logEvents(event, deserializedEvent);
   }
 
+  private void testDAGKillRequestEvent() throws Exception {
+    DAGKillRequestEvent event = 
+        new DAGKillRequestEvent(TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 100334l,false);
+    DAGKillRequestEvent deserializedEvent = (DAGKillRequestEvent)
+        testProtoConversion(event);
+    Assert.assertEquals(event.getDagID(),
+        deserializedEvent.getDagID());
+    Assert.assertEquals(event.getKillRequestTime(), deserializedEvent.getKillRequestTime());
+    Assert.assertEquals(event.isSessionStopped(), deserializedEvent.isSessionStopped());
+    logEvents(event, deserializedEvent);
+  }
+
   private void testDAGFinishedEvent() throws Exception {
     {
       DAGFinishedEvent event = new DAGFinishedEvent(
@@ -720,6 +732,9 @@ public class TestHistoryEventsProtoConversion {
         case DAG_RECOVERED:
           testDAGRecoveredEvent();
           break;
+        case DAG_KILL_REQUEST:
+          testDAGKillRequestEvent();
+          break;
         default:
           throw new Exception("Unhandled Event type in Unit tests: " + eventType);
         }

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 6469e78..3ab204a 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -50,6 +50,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGKillRequestEvent;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -191,6 +192,9 @@ public class TestHistoryEventJsonConversion {
           event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(), user,
               1l);
           break;
+        case DAG_KILL_REQUEST:
+          event = new DAGKillRequestEvent();
+          break;
         default:
           Assert.fail("Unhandled event type " + eventType);
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/4f66cb4b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index f5f3ae7..e324d1b 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.history.events.ContainerStoppedEvent;
 import org.apache.tez.dag.history.events.DAGCommitStartedEvent;
 import org.apache.tez.dag.history.events.DAGFinishedEvent;
 import org.apache.tez.dag.history.events.DAGInitializedEvent;
+import org.apache.tez.dag.history.events.DAGKillRequestEvent;
 import org.apache.tez.dag.history.events.DAGRecoveredEvent;
 import org.apache.tez.dag.history.events.DAGStartedEvent;
 import org.apache.tez.dag.history.events.DAGSubmittedEvent;
@@ -196,6 +197,9 @@ public class TestHistoryEventTimelineConversion {
           event = new DAGRecoveredEvent(applicationAttemptId, tezDAGID, dagPlan.getName(),
               user, random.nextLong());
           break;
+        case DAG_KILL_REQUEST:
+          event = new DAGKillRequestEvent();
+          break;
         default:
           Assert.fail("Unhandled event type " + eventType);
       }


Mime
View raw message